drbd: fix potential deadlock during "restart" of conflicting writes

w_restart_write(), run from worker context, calls __drbd_make_request()
and further drbd_al_begin_io(, delegate=true), which then
potentially deadlocks.  The previous patch moved a BUG_ON to expose
such call paths, which would now be triggered.

Also, if we call __drbd_make_request() from resource worker context,
like w_restart_write() did, and that should block for whatever reason
(!drbd_state_is_stable(), resource suspended, ...),
we potentially deadlock the whole resource, as the worker
is needed for state changes and other things.

Create a dedicated retry workqueue for this instead.

Also make sure that inc_ap_bio()/dec_ap_bio() are properly paired,
even if do_retry() needs to retry itself,
in case __drbd_make_request() returns != 0.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index a0045ac..5529d39 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2383,6 +2383,73 @@
 	kref_put(&tconn->kref, &conn_destroy);
 }
 
+/* One global retry thread, if we need to push back some bio and have it
+ * reinserted through our make request function.
+ */
+static struct retry_worker {
+	struct workqueue_struct *wq;
+	struct work_struct worker;
+
+	spinlock_t lock;
+	struct list_head writes;
+} retry;
+
+static void do_retry(struct work_struct *ws)
+{
+	struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
+	LIST_HEAD(writes);
+	struct drbd_request *req, *tmp;
+
+	spin_lock_irq(&retry->lock);
+	list_splice_init(&retry->writes, &writes);
+	spin_unlock_irq(&retry->lock);
+
+	list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+		struct drbd_conf *mdev = req->w.mdev;
+		struct bio *bio = req->master_bio;
+		unsigned long start_time = req->start_time;
+
+		/* We have exclusive access to this request object.
+		 * If it had not been RQ_POSTPONED, the code path which queued
+		 * it here would have completed and freed it already.
+		 */
+		mempool_free(req, drbd_request_mempool);
+
+		/* A single suspended or otherwise blocking device may stall
+		 * all others as well.  Fortunately, this code path is to
+		 * recover from a situation that "should not happen":
+		 * concurrent writes in multi-primary setup.
+		 * In a "normal" lifecycle, this workqueue is supposed to be
+		 * destroyed without ever doing anything.
+		 * If it turns out to be an issue anyways, we can do per
+		 * resource (replication group) or per device (minor) retry
+		 * workqueues instead.
+		 */
+
+		/* We are not just doing generic_make_request(),
+		 * as we want to keep the start_time information. */
+		do {
+			inc_ap_bio(mdev);
+		} while(__drbd_make_request(mdev, bio, start_time));
+	}
+}
+
+void drbd_restart_write(struct drbd_request *req)
+{
+	unsigned long flags;
+	spin_lock_irqsave(&retry.lock, flags);
+	list_move_tail(&req->tl_requests, &retry.writes);
+	spin_unlock_irqrestore(&retry.lock, flags);
+
+	/* Drop the extra reference that would otherwise
+	 * have been dropped by complete_master_bio.
+	 * do_retry() needs to grab a new one. */
+	dec_ap_bio(req->w.mdev);
+
+	queue_work(retry.wq, &retry.worker);
+}
+
+
 static void drbd_cleanup(void)
 {
 	unsigned int i;
@@ -2402,6 +2469,9 @@
 	if (drbd_proc)
 		remove_proc_entry("drbd", NULL);
 
+	if (retry.wq)
+		destroy_workqueue(retry.wq);
+
 	drbd_genl_unregister();
 
 	idr_for_each_entry(&minors, mdev, i) {
@@ -2851,6 +2921,15 @@
 	rwlock_init(&global_state_lock);
 	INIT_LIST_HEAD(&drbd_tconns);
 
+	retry.wq = create_singlethread_workqueue("drbd-reissue");
+	if (!retry.wq) {
+		printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+		goto fail;
+	}
+	INIT_WORK(&retry.worker, do_retry);
+	spin_lock_init(&retry.lock);
+	INIT_LIST_HEAD(&retry.writes);
+
 	printk(KERN_INFO "drbd: initialized. "
 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);