workqueue: implement NUMA affinity for unbound workqueues

Currently, an unbound workqueue has single current, or first, pwq
(pool_workqueue) to which all new work items are queued.  This often
isn't optimal on NUMA machines as workers may jump around across node
boundaries and work items get assigned to workers without any regard
to NUMA affinity.

This patch implements NUMA affinity for unbound workqueues.  Instead
of mapping all entries of numa_pwq_tbl[] to the same pwq,
apply_workqueue_attrs() now creates a separate pwq covering the
intersecting CPUs for each NUMA node which has online CPUs in
@attrs->cpumask.  Nodes which don't have intersecting possible CPUs
are mapped to pwqs covering whole @attrs->cpumask.

As CPUs come up and go down, the pool association is changed
accordingly.  Changing pool association may involve allocating new
pools which may fail.  To avoid failing CPU_DOWN, each workqueue
always keeps a default pwq which covers whole attrs->cpumask which is
used as fallback if pool creation fails during a CPU hotplug
operation.

This ensures that all work items issued on a NUMA node is executed on
the same node as long as the workqueue allows execution on the CPUs of
the node.

As this maps a workqueue to multiple pwqs and max_active is per-pwq,
this change the behavior of max_active.  The limit is now per NUMA
node instead of global.  While this is an actual change, max_active is
already per-cpu for per-cpu workqueues and primarily used as safety
mechanism rather than for active concurrency control.  Concurrency is
usually limited from workqueue users by the number of concurrently
active work items and this change shouldn't matter much.

v2: Fixed pwq freeing in apply_workqueue_attrs() error path.  Spotted
    by Lai.

v3: The previous version incorrectly made a workqueue spanning
    multiple nodes spread work items over all online CPUs when some of
    its nodes don't have any desired cpus.  Reimplemented so that NUMA
    affinity is properly updated as CPUs go up and down.  This problem
    was spotted by Lai Jiangshan.

v4: destroy_workqueue() was putting wq->dfl_pwq and then clearing it;
    however, wq may be freed at any time after dfl_pwq is put making
    the clearing use-after-free.  Clear wq->dfl_pwq before putting it.

v5: apply_workqueue_attrs() was leaking @tmp_attrs, @new_attrs and
    @pwq_tbl after success.  Fixed.

    Retry loop in wq_update_unbound_numa_attrs() isn't necessary as
    application of new attrs is excluded via CPU hotplug.  Removed.

    Documentation on CPU affinity guarantee on CPU_DOWN added.

    All changes are suggested by Lai Jiangshan.

Signed-off-by: Tejun Heo <tj@kernel.org>
Reviewed-by: Lai Jiangshan <laijs@cn.fujitsu.com>
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index d9a4aeb..57cd77d 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -45,6 +45,7 @@
 #include <linux/hashtable.h>
 #include <linux/rculist.h>
 #include <linux/nodemask.h>
+#include <linux/moduleparam.h>
 
 #include "workqueue_internal.h"
 
@@ -245,6 +246,7 @@
 	int			saved_max_active; /* WQ: saved pwq max_active */
 
 	struct workqueue_attrs	*unbound_attrs;	/* WQ: only for unbound wqs */
+	struct pool_workqueue	*dfl_pwq;	/* WQ: only for unbound wqs */
 
 #ifdef CONFIG_SYSFS
 	struct wq_device	*wq_dev;	/* I: for sysfs interface */
@@ -268,6 +270,9 @@
 
 static bool wq_numa_enabled;		/* unbound NUMA affinity enabled */
 
+/* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
+static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
+
 static DEFINE_MUTEX(wq_pool_mutex);	/* protects pools and workqueues list */
 static DEFINE_SPINLOCK(wq_mayday_lock);	/* protects wq->maydays list */
 
@@ -3710,6 +3715,61 @@
 	return pwq;
 }
 
+/* undo alloc_unbound_pwq(), used only in the error path */
+static void free_unbound_pwq(struct pool_workqueue *pwq)
+{
+	lockdep_assert_held(&wq_pool_mutex);
+
+	if (pwq) {
+		put_unbound_pool(pwq->pool);
+		kfree(pwq);
+	}
+}
+
+/**
+ * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
+ * @attrs: the wq_attrs of interest
+ * @node: the target NUMA node
+ * @cpu_going_down: if >= 0, the CPU to consider as offline
+ * @cpumask: outarg, the resulting cpumask
+ *
+ * Calculate the cpumask a workqueue with @attrs should use on @node.  If
+ * @cpu_going_down is >= 0, that cpu is considered offline during
+ * calculation.  The result is stored in @cpumask.  This function returns
+ * %true if the resulting @cpumask is different from @attrs->cpumask,
+ * %false if equal.
+ *
+ * If NUMA affinity is not enabled, @attrs->cpumask is always used.  If
+ * enabled and @node has online CPUs requested by @attrs, the returned
+ * cpumask is the intersection of the possible CPUs of @node and
+ * @attrs->cpumask.
+ *
+ * The caller is responsible for ensuring that the cpumask of @node stays
+ * stable.
+ */
+static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
+				 int cpu_going_down, cpumask_t *cpumask)
+{
+	if (!wq_numa_enabled)
+		goto use_dfl;
+
+	/* does @node have any online CPUs @attrs wants? */
+	cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
+	if (cpu_going_down >= 0)
+		cpumask_clear_cpu(cpu_going_down, cpumask);
+
+	if (cpumask_empty(cpumask))
+		goto use_dfl;
+
+	/* yeap, return possible CPUs in @node that @attrs wants */
+	cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
+	return !cpumask_equal(cpumask, attrs->cpumask);
+
+use_dfl:
+	cpumask_copy(cpumask, attrs->cpumask);
+	return false;
+}
+
 /* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
 static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
 						   int node,
@@ -3732,11 +3792,12 @@
  * @wq: the target workqueue
  * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
  *
- * Apply @attrs to an unbound workqueue @wq.  If @attrs doesn't match the
- * current attributes, a new pwq is created and made the first pwq which
- * will serve all new work items.  Older pwqs are released as in-flight
- * work items finish.  Note that a work item which repeatedly requeues
- * itself back-to-back will stay on its current pwq.
+ * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on.  Older pwqs are released as in-flight work
+ * items finish.  Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
  *
  * Performs GFP_KERNEL allocations.  Returns 0 on success and -errno on
  * failure.
@@ -3744,8 +3805,8 @@
 int apply_workqueue_attrs(struct workqueue_struct *wq,
 			  const struct workqueue_attrs *attrs)
 {
-	struct workqueue_attrs *new_attrs;
-	struct pool_workqueue *pwq, *last_pwq = NULL;
+	struct workqueue_attrs *new_attrs, *tmp_attrs;
+	struct pool_workqueue **pwq_tbl, *dfl_pwq;
 	int node, ret;
 
 	/* only unbound workqueues can change attributes */
@@ -3756,40 +3817,191 @@
 	if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
 		return -EINVAL;
 
-	/* make a copy of @attrs and sanitize it */
+	pwq_tbl = kzalloc(wq_numa_tbl_len * sizeof(pwq_tbl[0]), GFP_KERNEL);
 	new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
-	if (!new_attrs)
+	tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+	if (!pwq_tbl || !new_attrs || !tmp_attrs)
 		goto enomem;
 
+	/* make a copy of @attrs and sanitize it */
 	copy_workqueue_attrs(new_attrs, attrs);
 	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
 
-	mutex_lock(&wq_pool_mutex);
-	pwq = alloc_unbound_pwq(wq, new_attrs);
-	mutex_unlock(&wq_pool_mutex);
-	if (!pwq)
-		goto enomem;
+	/*
+	 * We may create multiple pwqs with differing cpumasks.  Make a
+	 * copy of @new_attrs which will be modified and used to obtain
+	 * pools.
+	 */
+	copy_workqueue_attrs(tmp_attrs, new_attrs);
 
+	/*
+	 * CPUs should stay stable across pwq creations and installations.
+	 * Pin CPUs, determine the target cpumask for each node and create
+	 * pwqs accordingly.
+	 */
+	get_online_cpus();
+
+	mutex_lock(&wq_pool_mutex);
+
+	/*
+	 * If something goes wrong during CPU up/down, we'll fall back to
+	 * the default pwq covering whole @attrs->cpumask.  Always create
+	 * it even if we don't use it immediately.
+	 */
+	dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+	if (!dfl_pwq)
+		goto enomem_pwq;
+
+	for_each_node(node) {
+		if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
+			pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+			if (!pwq_tbl[node])
+				goto enomem_pwq;
+		} else {
+			dfl_pwq->refcnt++;
+			pwq_tbl[node] = dfl_pwq;
+		}
+	}
+
+	mutex_unlock(&wq_pool_mutex);
+
+	/* all pwqs have been created successfully, let's install'em */
 	mutex_lock(&wq->mutex);
 
 	copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+
+	/* save the previous pwq and install the new one */
 	for_each_node(node)
-		last_pwq = numa_pwq_tbl_install(wq, node, pwq);
+		pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+
+	/* @dfl_pwq might not have been used, ensure it's linked */
+	link_pwq(dfl_pwq);
+	swap(wq->dfl_pwq, dfl_pwq);
 
 	mutex_unlock(&wq->mutex);
 
-	put_pwq_unlocked(last_pwq);
+	/* put the old pwqs */
+	for_each_node(node)
+		put_pwq_unlocked(pwq_tbl[node]);
+	put_pwq_unlocked(dfl_pwq);
+
+	put_online_cpus();
 	ret = 0;
 	/* fall through */
 out_free:
+	free_workqueue_attrs(tmp_attrs);
 	free_workqueue_attrs(new_attrs);
+	kfree(pwq_tbl);
 	return ret;
 
+enomem_pwq:
+	free_unbound_pwq(dfl_pwq);
+	for_each_node(node)
+		if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
+			free_unbound_pwq(pwq_tbl[node]);
+	mutex_unlock(&wq_pool_mutex);
+	put_online_cpus();
 enomem:
 	ret = -ENOMEM;
 	goto out_free;
 }
 
+/**
+ * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
+ * @wq: the target workqueue
+ * @cpu: the CPU coming up or going down
+ * @online: whether @cpu is coming up or going down
+ *
+ * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
+ * %CPU_DOWN_FAILED.  @cpu is being hot[un]plugged, update NUMA affinity of
+ * @wq accordingly.
+ *
+ * If NUMA affinity can't be adjusted due to memory allocation failure, it
+ * falls back to @wq->dfl_pwq which may not be optimal but is always
+ * correct.
+ *
+ * Note that when the last allowed CPU of a NUMA node goes offline for a
+ * workqueue with a cpumask spanning multiple nodes, the workers which were
+ * already executing the work items for the workqueue will lose their CPU
+ * affinity and may execute on any CPU.  This is similar to how per-cpu
+ * workqueues behave on CPU_DOWN.  If a workqueue user wants strict
+ * affinity, it's the user's responsibility to flush the work item from
+ * CPU_DOWN_PREPARE.
+ */
+static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
+				   bool online)
+{
+	int node = cpu_to_node(cpu);
+	int cpu_off = online ? -1 : cpu;
+	struct pool_workqueue *old_pwq = NULL, *pwq;
+	struct workqueue_attrs *target_attrs;
+	cpumask_t *cpumask;
+
+	lockdep_assert_held(&wq_pool_mutex);
+
+	if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND))
+		return;
+
+	/*
+	 * We don't wanna alloc/free wq_attrs for each wq for each CPU.
+	 * Let's use a preallocated one.  The following buf is protected by
+	 * CPU hotplug exclusion.
+	 */
+	target_attrs = wq_update_unbound_numa_attrs_buf;
+	cpumask = target_attrs->cpumask;
+
+	mutex_lock(&wq->mutex);
+
+	copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
+	pwq = unbound_pwq_by_node(wq, node);
+
+	/*
+	 * Let's determine what needs to be done.  If the target cpumask is
+	 * different from wq's, we need to compare it to @pwq's and create
+	 * a new one if they don't match.  If the target cpumask equals
+	 * wq's, the default pwq should be used.  If @pwq is already the
+	 * default one, nothing to do; otherwise, install the default one.
+	 */
+	if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
+		if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
+			goto out_unlock;
+	} else {
+		if (pwq == wq->dfl_pwq)
+			goto out_unlock;
+		else
+			goto use_dfl_pwq;
+	}
+
+	mutex_unlock(&wq->mutex);
+
+	/* create a new pwq */
+	pwq = alloc_unbound_pwq(wq, target_attrs);
+	if (!pwq) {
+		pr_warning("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
+			   wq->name);
+		goto out_unlock;
+	}
+
+	/*
+	 * Install the new pwq.  As this function is called only from CPU
+	 * hotplug callbacks and applying a new attrs is wrapped with
+	 * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
+	 * inbetween.
+	 */
+	mutex_lock(&wq->mutex);
+	old_pwq = numa_pwq_tbl_install(wq, node, pwq);
+	goto out_unlock;
+
+use_dfl_pwq:
+	spin_lock_irq(&wq->dfl_pwq->pool->lock);
+	get_pwq(wq->dfl_pwq);
+	spin_unlock_irq(&wq->dfl_pwq->pool->lock);
+	old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
+out_unlock:
+	mutex_unlock(&wq->mutex);
+	put_pwq_unlocked(old_pwq);
+}
+
 static int alloc_and_link_pwqs(struct workqueue_struct *wq)
 {
 	bool highpri = wq->flags & WQ_HIGHPRI;
@@ -3942,6 +4154,7 @@
 void destroy_workqueue(struct workqueue_struct *wq)
 {
 	struct pool_workqueue *pwq;
+	int node;
 
 	/* drain it before proceeding with destruction */
 	drain_workqueue(wq);
@@ -3993,11 +4206,21 @@
 	} else {
 		/*
 		 * We're the sole accessor of @wq at this point.  Directly
-		 * access the first pwq and put the base ref.  @wq will be
-		 * freed when the last pwq is released.
+		 * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
+		 * @wq will be freed when the last pwq is released.
 		 */
-		pwq = list_first_entry(&wq->pwqs, struct pool_workqueue,
-				       pwqs_node);
+		for_each_node(node) {
+			pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
+			RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
+			put_pwq_unlocked(pwq);
+		}
+
+		/*
+		 * Put dfl_pwq.  @wq may be freed any time after dfl_pwq is
+		 * put.  Don't access it afterwards.
+		 */
+		pwq = wq->dfl_pwq;
+		wq->dfl_pwq = NULL;
 		put_pwq_unlocked(pwq);
 	}
 }
@@ -4285,6 +4508,7 @@
 {
 	int cpu = (unsigned long)hcpu;
 	struct worker_pool *pool;
+	struct workqueue_struct *wq;
 	int pi;
 
 	switch (action & ~CPU_TASKS_FROZEN) {
@@ -4317,6 +4541,10 @@
 			mutex_unlock(&pool->manager_mutex);
 		}
 
+		/* update NUMA affinity of unbound workqueues */
+		list_for_each_entry(wq, &workqueues, list)
+			wq_update_unbound_numa(wq, cpu, true);
+
 		mutex_unlock(&wq_pool_mutex);
 		break;
 	}
@@ -4333,12 +4561,21 @@
 {
 	int cpu = (unsigned long)hcpu;
 	struct work_struct unbind_work;
+	struct workqueue_struct *wq;
 
 	switch (action & ~CPU_TASKS_FROZEN) {
 	case CPU_DOWN_PREPARE:
-		/* unbinding should happen on the local CPU */
+		/* unbinding per-cpu workers should happen on the local CPU */
 		INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
 		queue_work_on(cpu, system_highpri_wq, &unbind_work);
+
+		/* update NUMA affinity of unbound workqueues */
+		mutex_lock(&wq_pool_mutex);
+		list_for_each_entry(wq, &workqueues, list)
+			wq_update_unbound_numa(wq, cpu, false);
+		mutex_unlock(&wq_pool_mutex);
+
+		/* wait for per-cpu unbinding to finish */
 		flush_work(&unbind_work);
 		break;
 	}
@@ -4526,6 +4763,9 @@
 	if (num_possible_nodes() <= 1)
 		return;
 
+	wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs(GFP_KERNEL);
+	BUG_ON(!wq_update_unbound_numa_attrs_buf);
+
 	/*
 	 * We want masks of possible CPUs of each node which isn't readily
 	 * available.  Build one from cpu_to_node() which should have been