drbd: Take a more conservative approach when deciding max_bio_size

The old (optimistic) implementation could shrink the bio size
on an primary device.

Shrinking the bio size on a primary device is bad. Since there
we might get BIOs with the old (bigger) size shortly after
we published the new size.

The new implementation is more conservative, and eventually
increases the max_bio_size on a primary device (which is valid).
It does so, when it knows the local limit AND the remote limit.

 We cache the last seen max_bio_size of the peer in the meta
 data, and rely on that, to make the operation of single
 nodes more efficient.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index ce6a764..cfeb13b 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2071,7 +2071,7 @@
 {
 	struct p_sizes p;
 	sector_t d_size, u_size;
-	int q_order_type;
+	int q_order_type, max_bio_size;
 	int ok;
 
 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -2079,17 +2079,20 @@
 		d_size = drbd_get_max_capacity(mdev->ldev);
 		u_size = mdev->ldev->dc.disk_size;
 		q_order_type = drbd_queue_order_type(mdev);
+		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
+		max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
 		put_ldev(mdev);
 	} else {
 		d_size = 0;
 		u_size = 0;
 		q_order_type = QUEUE_ORDERED_NONE;
+		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
 	}
 
 	p.d_size = cpu_to_be64(d_size);
 	p.u_size = cpu_to_be64(u_size);
 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
-	p.max_bio_size = cpu_to_be32(queue_max_hw_sectors(mdev->rq_queue) << 9);
+	p.max_bio_size = cpu_to_be32(max_bio_size);
 	p.queue_order_type = cpu_to_be16(q_order_type);
 	p.dds_flags = cpu_to_be16(flags);
 
@@ -3048,6 +3051,8 @@
 	mdev->agreed_pro_version = PRO_VERSION_MAX;
 	mdev->write_ordering = WO_bdev_flush;
 	mdev->resync_wenr = LC_FREE;
+	mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
+	mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
 }
 
 void drbd_mdev_cleanup(struct drbd_conf *mdev)
@@ -3422,7 +3427,9 @@
 	q->backing_dev_info.congested_data = mdev;
 
 	blk_queue_make_request(q, drbd_make_request);
-	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE >> 9);
+	/* Setting the max_hw_sectors to an odd value of 8kibyte here
+	   This triggers a max_bio_size message upon first attach or connect */
+	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
 	blk_queue_merge_bvec(q, drbd_merge_bvec);
 	q->queue_lock = &mdev->req_lock;
@@ -3634,7 +3641,8 @@
 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
 	u32 bm_offset;         /* offset to the bitmap, from here */
 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
-	u32 reserved_u32[4];
+	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
+	u32 reserved_u32[3];
 
 } __packed;
 
@@ -3675,6 +3683,7 @@
 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
 
 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
+	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
 
 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
 	sector = mdev->ldev->md.md_offset;
@@ -3758,6 +3767,15 @@
 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
 
+	spin_lock_irq(&mdev->req_lock);
+	if (mdev->state.conn < C_CONNECTED) {
+		int peer;
+		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
+		peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
+		mdev->peer_max_bio_size = peer;
+	}
+	spin_unlock_irq(&mdev->req_lock);
+
 	if (mdev->sync_conf.al_extents < 7)
 		mdev->sync_conf.al_extents = 127;