Merge branch 'bcache-for-upstream' of http://evilpiepirate.org/git/linux-bcache into for-3.10/drivers
diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index 92510f8..6afe173 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -104,7 +104,6 @@
 	int err;
 };
 
-static int al_write_transaction(struct drbd_conf *mdev);
 
 void *drbd_md_get_buffer(struct drbd_conf *mdev)
 {
@@ -168,7 +167,11 @@
 	bio->bi_end_io = drbd_md_io_complete;
 	bio->bi_rw = rw;
 
-	if (!get_ldev_if_state(mdev, D_ATTACHING)) {  /* Corresponding put_ldev in drbd_md_io_complete() */
+	if (!(rw & WRITE) && mdev->state.disk == D_DISKLESS && mdev->ldev == NULL)
+		/* special case, drbd_md_read() during drbd_adm_attach(): no get_ldev */
+		;
+	else if (!get_ldev_if_state(mdev, D_ATTACHING)) {
+		/* Corresponding put_ldev in drbd_md_io_complete() */
 		dev_err(DEV, "ASSERT FAILED: get_ldev_if_state() == 1 in _drbd_md_sync_page_io()\n");
 		err = -ENODEV;
 		goto out;
@@ -199,9 +202,10 @@
 
 	BUG_ON(!bdev->md_bdev);
 
-	dev_dbg(DEV, "meta_data io: %s [%d]:%s(,%llus,%s)\n",
+	dev_dbg(DEV, "meta_data io: %s [%d]:%s(,%llus,%s) %pS\n",
 	     current->comm, current->pid, __func__,
-	     (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ");
+	     (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ",
+	     (void*)_RET_IP_ );
 
 	if (sector < drbd_md_first_sector(bdev) ||
 	    sector + 7 > drbd_md_last_sector(bdev))
@@ -209,7 +213,8 @@
 		     current->comm, current->pid, __func__,
 		     (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ");
 
-	err = _drbd_md_sync_page_io(mdev, bdev, iop, sector, rw, MD_BLOCK_SIZE);
+	/* we do all our meta data IO in aligned 4k blocks. */
+	err = _drbd_md_sync_page_io(mdev, bdev, iop, sector, rw, 4096);
 	if (err) {
 		dev_err(DEV, "drbd_md_sync_page_io(,%llus,%s) failed with error %d\n",
 		    (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ", err);
@@ -217,44 +222,99 @@
 	return err;
 }
 
-static struct lc_element *_al_get(struct drbd_conf *mdev, unsigned int enr)
+static struct bm_extent *find_active_resync_extent(struct drbd_conf *mdev, unsigned int enr)
 {
-	struct lc_element *al_ext;
 	struct lc_element *tmp;
-	int wake;
-
-	spin_lock_irq(&mdev->al_lock);
 	tmp = lc_find(mdev->resync, enr/AL_EXT_PER_BM_SECT);
 	if (unlikely(tmp != NULL)) {
 		struct bm_extent  *bm_ext = lc_entry(tmp, struct bm_extent, lce);
-		if (test_bit(BME_NO_WRITES, &bm_ext->flags)) {
-			wake = !test_and_set_bit(BME_PRIORITY, &bm_ext->flags);
-			spin_unlock_irq(&mdev->al_lock);
-			if (wake)
-				wake_up(&mdev->al_wait);
-			return NULL;
-		}
+		if (test_bit(BME_NO_WRITES, &bm_ext->flags))
+			return bm_ext;
 	}
-	al_ext = lc_get(mdev->act_log, enr);
+	return NULL;
+}
+
+static struct lc_element *_al_get(struct drbd_conf *mdev, unsigned int enr, bool nonblock)
+{
+	struct lc_element *al_ext;
+	struct bm_extent *bm_ext;
+	int wake;
+
+	spin_lock_irq(&mdev->al_lock);
+	bm_ext = find_active_resync_extent(mdev, enr);
+	if (bm_ext) {
+		wake = !test_and_set_bit(BME_PRIORITY, &bm_ext->flags);
+		spin_unlock_irq(&mdev->al_lock);
+		if (wake)
+			wake_up(&mdev->al_wait);
+		return NULL;
+	}
+	if (nonblock)
+		al_ext = lc_try_get(mdev->act_log, enr);
+	else
+		al_ext = lc_get(mdev->act_log, enr);
 	spin_unlock_irq(&mdev->al_lock);
 	return al_ext;
 }
 
-void drbd_al_begin_io(struct drbd_conf *mdev, struct drbd_interval *i)
+bool drbd_al_begin_io_fastpath(struct drbd_conf *mdev, struct drbd_interval *i)
+{
+	/* for bios crossing activity log extent boundaries,
+	 * we may need to activate two extents in one go */
+	unsigned first = i->sector >> (AL_EXTENT_SHIFT-9);
+	unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9);
+
+	D_ASSERT((unsigned)(last - first) <= 1);
+	D_ASSERT(atomic_read(&mdev->local_cnt) > 0);
+
+	/* FIXME figure out a fast path for bios crossing AL extent boundaries */
+	if (first != last)
+		return false;
+
+	return _al_get(mdev, first, true);
+}
+
+bool drbd_al_begin_io_prepare(struct drbd_conf *mdev, struct drbd_interval *i)
 {
 	/* for bios crossing activity log extent boundaries,
 	 * we may need to activate two extents in one go */
 	unsigned first = i->sector >> (AL_EXTENT_SHIFT-9);
 	unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9);
 	unsigned enr;
-	bool locked = false;
-
+	bool need_transaction = false;
 
 	D_ASSERT(first <= last);
 	D_ASSERT(atomic_read(&mdev->local_cnt) > 0);
 
-	for (enr = first; enr <= last; enr++)
-		wait_event(mdev->al_wait, _al_get(mdev, enr) != NULL);
+	for (enr = first; enr <= last; enr++) {
+		struct lc_element *al_ext;
+		wait_event(mdev->al_wait,
+				(al_ext = _al_get(mdev, enr, false)) != NULL);
+		if (al_ext->lc_number != enr)
+			need_transaction = true;
+	}
+	return need_transaction;
+}
+
+static int al_write_transaction(struct drbd_conf *mdev, bool delegate);
+
+/* When called through generic_make_request(), we must delegate
+ * activity log I/O to the worker thread: a further request
+ * submitted via generic_make_request() within the same task
+ * would be queued on current->bio_list, and would only start
+ * after this function returns (see generic_make_request()).
+ *
+ * However, if we *are* the worker, we must not delegate to ourselves.
+ */
+
+/*
+ * @delegate:   delegate activity log I/O to the worker thread
+ */
+void drbd_al_begin_io_commit(struct drbd_conf *mdev, bool delegate)
+{
+	bool locked = false;
+
+	BUG_ON(delegate && current == mdev->tconn->worker.task);
 
 	/* Serialize multiple transactions.
 	 * This uses test_and_set_bit, memory barrier is implicit.
@@ -264,13 +324,6 @@
 			(locked = lc_try_lock_for_transaction(mdev->act_log)));
 
 	if (locked) {
-		/* drbd_al_write_transaction(mdev,al_ext,enr);
-		 * recurses into generic_make_request(), which
-		 * disallows recursion, bios being serialized on the
-		 * current->bio_tail list now.
-		 * we have to delegate updates to the activity log
-		 * to the worker thread. */
-
 		/* Double check: it may have been committed by someone else,
 		 * while we have been waiting for the lock. */
 		if (mdev->act_log->pending_changes) {
@@ -280,11 +333,8 @@
 			write_al_updates = rcu_dereference(mdev->ldev->disk_conf)->al_updates;
 			rcu_read_unlock();
 
-			if (write_al_updates) {
-				al_write_transaction(mdev);
-				mdev->al_writ_cnt++;
-			}
-
+			if (write_al_updates)
+				al_write_transaction(mdev, delegate);
 			spin_lock_irq(&mdev->al_lock);
 			/* FIXME
 			if (err)
@@ -298,6 +348,66 @@
 	}
 }
 
+/*
+ * @delegate:   delegate activity log I/O to the worker thread
+ */
+void drbd_al_begin_io(struct drbd_conf *mdev, struct drbd_interval *i, bool delegate)
+{
+	BUG_ON(delegate && current == mdev->tconn->worker.task);
+
+	if (drbd_al_begin_io_prepare(mdev, i))
+		drbd_al_begin_io_commit(mdev, delegate);
+}
+
+int drbd_al_begin_io_nonblock(struct drbd_conf *mdev, struct drbd_interval *i)
+{
+	struct lru_cache *al = mdev->act_log;
+	/* for bios crossing activity log extent boundaries,
+	 * we may need to activate two extents in one go */
+	unsigned first = i->sector >> (AL_EXTENT_SHIFT-9);
+	unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9);
+	unsigned nr_al_extents;
+	unsigned available_update_slots;
+	unsigned enr;
+
+	D_ASSERT(first <= last);
+
+	nr_al_extents = 1 + last - first; /* worst case: all touched extends are cold. */
+	available_update_slots = min(al->nr_elements - al->used,
+				al->max_pending_changes - al->pending_changes);
+
+	/* We want all necessary updates for a given request within the same transaction
+	 * We could first check how many updates are *actually* needed,
+	 * and use that instead of the worst-case nr_al_extents */
+	if (available_update_slots < nr_al_extents)
+		return -EWOULDBLOCK;
+
+	/* Is resync active in this area? */
+	for (enr = first; enr <= last; enr++) {
+		struct lc_element *tmp;
+		tmp = lc_find(mdev->resync, enr/AL_EXT_PER_BM_SECT);
+		if (unlikely(tmp != NULL)) {
+			struct bm_extent  *bm_ext = lc_entry(tmp, struct bm_extent, lce);
+			if (test_bit(BME_NO_WRITES, &bm_ext->flags)) {
+				if (!test_and_set_bit(BME_PRIORITY, &bm_ext->flags));
+					return -EBUSY;
+				return -EWOULDBLOCK;
+			}
+		}
+	}
+
+	/* Checkout the refcounts.
+	 * Given that we checked for available elements and update slots above,
+	 * this has to be successful. */
+	for (enr = first; enr <= last; enr++) {
+		struct lc_element *al_ext;
+		al_ext = lc_get_cumulative(mdev->act_log, enr);
+		if (!al_ext)
+			dev_info(DEV, "LOGIC BUG for enr=%u\n", enr);
+	}
+	return 0;
+}
+
 void drbd_al_complete_io(struct drbd_conf *mdev, struct drbd_interval *i)
 {
 	/* for bios crossing activity log extent boundaries,
@@ -350,6 +460,24 @@
 		 (BM_EXT_SHIFT - BM_BLOCK_SHIFT));
 }
 
+static sector_t al_tr_number_to_on_disk_sector(struct drbd_conf *mdev)
+{
+	const unsigned int stripes = mdev->ldev->md.al_stripes;
+	const unsigned int stripe_size_4kB = mdev->ldev->md.al_stripe_size_4k;
+
+	/* transaction number, modulo on-disk ring buffer wrap around */
+	unsigned int t = mdev->al_tr_number % (mdev->ldev->md.al_size_4k);
+
+	/* ... to aligned 4k on disk block */
+	t = ((t % stripes) * stripe_size_4kB) + t/stripes;
+
+	/* ... to 512 byte sector in activity log */
+	t *= 8;
+
+	/* ... plus offset to the on disk position */
+	return mdev->ldev->md.md_offset + mdev->ldev->md.al_offset + t;
+}
+
 static int
 _al_write_transaction(struct drbd_conf *mdev)
 {
@@ -432,23 +560,27 @@
 	if (mdev->al_tr_cycle >= mdev->act_log->nr_elements)
 		mdev->al_tr_cycle = 0;
 
-	sector =  mdev->ldev->md.md_offset
-		+ mdev->ldev->md.al_offset
-		+ mdev->al_tr_pos * (MD_BLOCK_SIZE>>9);
+	sector = al_tr_number_to_on_disk_sector(mdev);
 
 	crc = crc32c(0, buffer, 4096);
 	buffer->crc32c = cpu_to_be32(crc);
 
 	if (drbd_bm_write_hinted(mdev))
 		err = -EIO;
-		/* drbd_chk_io_error done already */
-	else if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
-		err = -EIO;
-		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
-	} else {
-		/* advance ringbuffer position and transaction counter */
-		mdev->al_tr_pos = (mdev->al_tr_pos + 1) % (MD_AL_SECTORS*512/MD_BLOCK_SIZE);
-		mdev->al_tr_number++;
+	else {
+		bool write_al_updates;
+		rcu_read_lock();
+		write_al_updates = rcu_dereference(mdev->ldev->disk_conf)->al_updates;
+		rcu_read_unlock();
+		if (write_al_updates) {
+			if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
+				err = -EIO;
+				drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
+			} else {
+				mdev->al_tr_number++;
+				mdev->al_writ_cnt++;
+			}
+		}
 	}
 
 	drbd_md_put_buffer(mdev);
@@ -474,20 +606,18 @@
 /* Calls from worker context (see w_restart_disk_io()) need to write the
    transaction directly. Others came through generic_make_request(),
    those need to delegate it to the worker. */
-static int al_write_transaction(struct drbd_conf *mdev)
+static int al_write_transaction(struct drbd_conf *mdev, bool delegate)
 {
-	struct update_al_work al_work;
-
-	if (current == mdev->tconn->worker.task)
+	if (delegate) {
+		struct update_al_work al_work;
+		init_completion(&al_work.event);
+		al_work.w.cb = w_al_write_transaction;
+		al_work.w.mdev = mdev;
+		drbd_queue_work_front(&mdev->tconn->sender_work, &al_work.w);
+		wait_for_completion(&al_work.event);
+		return al_work.err;
+	} else
 		return _al_write_transaction(mdev);
-
-	init_completion(&al_work.event);
-	al_work.w.cb = w_al_write_transaction;
-	al_work.w.mdev = mdev;
-	drbd_queue_work_front(&mdev->tconn->sender_work, &al_work.w);
-	wait_for_completion(&al_work.event);
-
-	return al_work.err;
 }
 
 static int _try_lc_del(struct drbd_conf *mdev, struct lc_element *al_ext)
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index 8dc2950..64fbb83 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -612,6 +612,17 @@
 	}
 }
 
+/* For the layout, see comment above drbd_md_set_sector_offsets(). */
+static u64 drbd_md_on_disk_bits(struct drbd_backing_dev *ldev)
+{
+	u64 bitmap_sectors;
+	if (ldev->md.al_offset == 8)
+		bitmap_sectors = ldev->md.md_size_sect - ldev->md.bm_offset;
+	else
+		bitmap_sectors = ldev->md.al_offset - ldev->md.bm_offset;
+	return bitmap_sectors << (9 + 3);
+}
+
 /*
  * make sure the bitmap has enough room for the attached storage,
  * if necessary, resize.
@@ -668,7 +679,7 @@
 	words = ALIGN(bits, 64) >> LN2_BPL;
 
 	if (get_ldev(mdev)) {
-		u64 bits_on_disk = ((u64)mdev->ldev->md.md_size_sect-MD_BM_OFFSET) << 12;
+		u64 bits_on_disk = drbd_md_on_disk_bits(mdev->ldev);
 		put_ldev(mdev);
 		if (bits > bits_on_disk) {
 			dev_info(DEV, "bits = %lu\n", bits);
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 6b51afa..f943aac 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -753,13 +753,16 @@
 	u32 flags;
 	u32 md_size_sect;
 
-	s32 al_offset;	/* signed relative sector offset to al area */
+	s32 al_offset;	/* signed relative sector offset to activity log */
 	s32 bm_offset;	/* signed relative sector offset to bitmap */
 
-	/* u32 al_nr_extents;	   important for restoring the AL
-	 * is stored into  ldev->dc.al_extents, which in turn
-	 * gets applied to act_log->nr_elements
-	 */
+	/* cached value of bdev->disk_conf->meta_dev_idx (see below) */
+	s32 meta_dev_idx;
+
+	/* see al_tr_number_to_on_disk_sector() */
+	u32 al_stripes;
+	u32 al_stripe_size_4k;
+	u32 al_size_4k; /* cached product of the above */
 };
 
 struct drbd_backing_dev {
@@ -891,6 +894,14 @@
 	} send;
 };
 
+struct submit_worker {
+	struct workqueue_struct *wq;
+	struct work_struct worker;
+
+	spinlock_t lock;
+	struct list_head writes;
+};
+
 struct drbd_conf {
 	struct drbd_tconn *tconn;
 	int vnr;			/* volume number within the connection */
@@ -1009,7 +1020,6 @@
 	struct lru_cache *act_log;	/* activity log */
 	unsigned int al_tr_number;
 	int al_tr_cycle;
-	int al_tr_pos;   /* position of the next transaction in the journal */
 	wait_queue_head_t seq_wait;
 	atomic_t packet_seq;
 	unsigned int peer_seq;
@@ -1032,6 +1042,10 @@
 	atomic_t ap_in_flight; /* App sectors in flight (waiting for ack) */
 	unsigned int peer_max_bio_size;
 	unsigned int local_max_bio_size;
+
+	/* any requests that would block in drbd_make_request()
+	 * are deferred to this single-threaded work queue */
+	struct submit_worker submit;
 };
 
 static inline struct drbd_conf *minor_to_mdev(unsigned int minor)
@@ -1148,25 +1162,44 @@
 		char *why, enum bm_flag flags);
 extern int drbd_bmio_set_n_write(struct drbd_conf *mdev);
 extern int drbd_bmio_clear_n_write(struct drbd_conf *mdev);
-extern void drbd_go_diskless(struct drbd_conf *mdev);
 extern void drbd_ldev_destroy(struct drbd_conf *mdev);
 
 /* Meta data layout
-   We reserve a 128MB Block (4k aligned)
-   * either at the end of the backing device
-   * or on a separate meta data device. */
+ *
+ * We currently have two possible layouts.
+ * Offsets in (512 byte) sectors.
+ * external:
+ *   |----------- md_size_sect ------------------|
+ *   [ 4k superblock ][ activity log ][  Bitmap  ]
+ *   | al_offset == 8 |
+ *   | bm_offset = al_offset + X      |
+ *  ==> bitmap sectors = md_size_sect - bm_offset
+ *
+ *  Variants:
+ *     old, indexed fixed size meta data:
+ *
+ * internal:
+ *            |----------- md_size_sect ------------------|
+ * [data.....][  Bitmap  ][ activity log ][ 4k superblock ][padding*]
+ *                        | al_offset < 0 |
+ *            | bm_offset = al_offset - Y |
+ *  ==> bitmap sectors = Y = al_offset - bm_offset
+ *
+ *  [padding*] are zero or up to 7 unused 512 Byte sectors to the
+ *  end of the device, so that the [4k superblock] will be 4k aligned.
+ *
+ *  The activity log consists of 4k transaction blocks,
+ *  which are written in a ring-buffer, or striped ring-buffer like fashion,
+ *  which are writtensize used to be fixed 32kB,
+ *  but is about to become configurable.
+ */
 
-/* The following numbers are sectors */
-/* Allows up to about 3.8TB, so if you want more,
+/* Our old fixed size meta data layout
+ * allows up to about 3.8TB, so if you want more,
  * you need to use the "flexible" meta data format. */
-#define MD_RESERVED_SECT (128LU << 11)  /* 128 MB, unit sectors */
-#define MD_AL_OFFSET	8    /* 8 Sectors after start of meta area */
-#define MD_AL_SECTORS	64   /* = 32 kB on disk activity log ring buffer */
-#define MD_BM_OFFSET (MD_AL_OFFSET + MD_AL_SECTORS)
-
-/* we do all meta data IO in 4k blocks */
-#define MD_BLOCK_SHIFT	12
-#define MD_BLOCK_SIZE	(1<<MD_BLOCK_SHIFT)
+#define MD_128MB_SECT (128LLU << 11)  /* 128 MB, unit sectors */
+#define MD_4kB_SECT	 8
+#define MD_32kB_SECT	64
 
 /* One activity log extent represents 4M of storage */
 #define AL_EXTENT_SHIFT 22
@@ -1256,7 +1289,6 @@
 
 /* in one sector of the bitmap, we have this many activity_log extents. */
 #define AL_EXT_PER_BM_SECT  (1 << (BM_EXT_SHIFT - AL_EXTENT_SHIFT))
-#define BM_WORDS_PER_AL_EXT (1 << (AL_EXTENT_SHIFT-BM_BLOCK_SHIFT-LN2_BPL))
 
 #define BM_BLOCKS_PER_BM_EXT_B (BM_EXT_SHIFT - BM_BLOCK_SHIFT)
 #define BM_BLOCKS_PER_BM_EXT_MASK  ((1<<BM_BLOCKS_PER_BM_EXT_B) - 1)
@@ -1276,16 +1308,18 @@
  */
 
 #define DRBD_MAX_SECTORS_32 (0xffffffffLU)
-#define DRBD_MAX_SECTORS_BM \
-	  ((MD_RESERVED_SECT - MD_BM_OFFSET) * (1LL<<(BM_EXT_SHIFT-9)))
-#if DRBD_MAX_SECTORS_BM < DRBD_MAX_SECTORS_32
-#define DRBD_MAX_SECTORS      DRBD_MAX_SECTORS_BM
-#define DRBD_MAX_SECTORS_FLEX DRBD_MAX_SECTORS_BM
-#elif !defined(CONFIG_LBDAF) && BITS_PER_LONG == 32
+/* we have a certain meta data variant that has a fixed on-disk size of 128
+ * MiB, of which 4k are our "superblock", and 32k are the fixed size activity
+ * log, leaving this many sectors for the bitmap.
+ */
+
+#define DRBD_MAX_SECTORS_FIXED_BM \
+	  ((MD_128MB_SECT - MD_32kB_SECT - MD_4kB_SECT) * (1LL<<(BM_EXT_SHIFT-9)))
+#if !defined(CONFIG_LBDAF) && BITS_PER_LONG == 32
 #define DRBD_MAX_SECTORS      DRBD_MAX_SECTORS_32
 #define DRBD_MAX_SECTORS_FLEX DRBD_MAX_SECTORS_32
 #else
-#define DRBD_MAX_SECTORS      DRBD_MAX_SECTORS_BM
+#define DRBD_MAX_SECTORS      DRBD_MAX_SECTORS_FIXED_BM
 /* 16 TB in units of sectors */
 #if BITS_PER_LONG == 32
 /* adjust by one page worth of bitmap,
@@ -1418,6 +1452,7 @@
 extern int proc_details;
 
 /* drbd_req */
+extern void do_submit(struct work_struct *ws);
 extern void __drbd_make_request(struct drbd_conf *, struct bio *, unsigned long);
 extern void drbd_make_request(struct request_queue *q, struct bio *bio);
 extern int drbd_read_remote(struct drbd_conf *mdev, struct drbd_request *req);
@@ -1576,7 +1611,10 @@
 extern const char *drbd_role_str(enum drbd_role s);
 
 /* drbd_actlog.c */
-extern void drbd_al_begin_io(struct drbd_conf *mdev, struct drbd_interval *i);
+extern int drbd_al_begin_io_nonblock(struct drbd_conf *mdev, struct drbd_interval *i);
+extern void drbd_al_begin_io_commit(struct drbd_conf *mdev, bool delegate);
+extern bool drbd_al_begin_io_fastpath(struct drbd_conf *mdev, struct drbd_interval *i);
+extern void drbd_al_begin_io(struct drbd_conf *mdev, struct drbd_interval *i, bool delegate);
 extern void drbd_al_complete_io(struct drbd_conf *mdev, struct drbd_interval *i);
 extern void drbd_rs_complete_io(struct drbd_conf *mdev, sector_t sector);
 extern int drbd_rs_begin_io(struct drbd_conf *mdev, sector_t sector);
@@ -1755,9 +1793,9 @@
  * BTW, for internal meta data, this happens to be the maximum capacity
  * we could agree upon with our peer node.
  */
-static inline sector_t _drbd_md_first_sector(int meta_dev_idx, struct drbd_backing_dev *bdev)
+static inline sector_t drbd_md_first_sector(struct drbd_backing_dev *bdev)
 {
-	switch (meta_dev_idx) {
+	switch (bdev->md.meta_dev_idx) {
 	case DRBD_MD_INDEX_INTERNAL:
 	case DRBD_MD_INDEX_FLEX_INT:
 		return bdev->md.md_offset + bdev->md.bm_offset;
@@ -1767,36 +1805,19 @@
 	}
 }
 
-static inline sector_t drbd_md_first_sector(struct drbd_backing_dev *bdev)
-{
-	int meta_dev_idx;
-
-	rcu_read_lock();
-	meta_dev_idx = rcu_dereference(bdev->disk_conf)->meta_dev_idx;
-	rcu_read_unlock();
-
-	return _drbd_md_first_sector(meta_dev_idx, bdev);
-}
-
 /**
  * drbd_md_last_sector() - Return the last sector number of the meta data area
  * @bdev:	Meta data block device.
  */
 static inline sector_t drbd_md_last_sector(struct drbd_backing_dev *bdev)
 {
-	int meta_dev_idx;
-
-	rcu_read_lock();
-	meta_dev_idx = rcu_dereference(bdev->disk_conf)->meta_dev_idx;
-	rcu_read_unlock();
-
-	switch (meta_dev_idx) {
+	switch (bdev->md.meta_dev_idx) {
 	case DRBD_MD_INDEX_INTERNAL:
 	case DRBD_MD_INDEX_FLEX_INT:
-		return bdev->md.md_offset + MD_AL_OFFSET - 1;
+		return bdev->md.md_offset + MD_4kB_SECT -1;
 	case DRBD_MD_INDEX_FLEX_EXT:
 	default:
-		return bdev->md.md_offset + bdev->md.md_size_sect;
+		return bdev->md.md_offset + bdev->md.md_size_sect -1;
 	}
 }
 
@@ -1818,18 +1839,13 @@
 static inline sector_t drbd_get_max_capacity(struct drbd_backing_dev *bdev)
 {
 	sector_t s;
-	int meta_dev_idx;
 
-	rcu_read_lock();
-	meta_dev_idx = rcu_dereference(bdev->disk_conf)->meta_dev_idx;
-	rcu_read_unlock();
-
-	switch (meta_dev_idx) {
+	switch (bdev->md.meta_dev_idx) {
 	case DRBD_MD_INDEX_INTERNAL:
 	case DRBD_MD_INDEX_FLEX_INT:
 		s = drbd_get_capacity(bdev->backing_bdev)
 			? min_t(sector_t, DRBD_MAX_SECTORS_FLEX,
-				_drbd_md_first_sector(meta_dev_idx, bdev))
+				drbd_md_first_sector(bdev))
 			: 0;
 		break;
 	case DRBD_MD_INDEX_FLEX_EXT:
@@ -1848,39 +1864,24 @@
 }
 
 /**
- * drbd_md_ss__() - Return the sector number of our meta data super block
- * @mdev:	DRBD device.
+ * drbd_md_ss() - Return the sector number of our meta data super block
  * @bdev:	Meta data block device.
  */
-static inline sector_t drbd_md_ss__(struct drbd_conf *mdev,
-				    struct drbd_backing_dev *bdev)
+static inline sector_t drbd_md_ss(struct drbd_backing_dev *bdev)
 {
-	int meta_dev_idx;
+	const int meta_dev_idx = bdev->md.meta_dev_idx;
 
-	rcu_read_lock();
-	meta_dev_idx = rcu_dereference(bdev->disk_conf)->meta_dev_idx;
-	rcu_read_unlock();
-
-	switch (meta_dev_idx) {
-	default: /* external, some index */
-		return MD_RESERVED_SECT * meta_dev_idx;
-	case DRBD_MD_INDEX_INTERNAL:
-		/* with drbd08, internal meta data is always "flexible" */
-	case DRBD_MD_INDEX_FLEX_INT:
-		/* sizeof(struct md_on_disk_07) == 4k
-		 * position: last 4k aligned block of 4k size */
-		if (!bdev->backing_bdev) {
-			if (__ratelimit(&drbd_ratelimit_state)) {
-				dev_err(DEV, "bdev->backing_bdev==NULL\n");
-				dump_stack();
-			}
-			return 0;
-		}
-		return (drbd_get_capacity(bdev->backing_bdev) & ~7ULL)
-			- MD_AL_OFFSET;
-	case DRBD_MD_INDEX_FLEX_EXT:
+	if (meta_dev_idx == DRBD_MD_INDEX_FLEX_EXT)
 		return 0;
-	}
+
+	/* Since drbd08, internal meta data is always "flexible".
+	 * position: last 4k aligned block of 4k size */
+	if (meta_dev_idx == DRBD_MD_INDEX_INTERNAL ||
+	    meta_dev_idx == DRBD_MD_INDEX_FLEX_INT)
+		return (drbd_get_capacity(bdev->backing_bdev) & ~7ULL) - 8;
+
+	/* external, some index; this is the old fixed size layout */
+	return MD_128MB_SECT * bdev->md.meta_dev_idx;
 }
 
 static inline void
@@ -2053,9 +2054,11 @@
 		if (mdev->state.disk == D_DISKLESS)
 			/* even internal references gone, safe to destroy */
 			drbd_ldev_destroy(mdev);
-		if (mdev->state.disk == D_FAILED)
+		if (mdev->state.disk == D_FAILED) {
 			/* all application IO references gone. */
-			drbd_go_diskless(mdev);
+			if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
+				drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
+		}
 		wake_up(&mdev->misc_wait);
 	}
 }
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index e98da67..a150b59 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -45,7 +45,7 @@
 #include <linux/reboot.h>
 #include <linux/notifier.h>
 #include <linux/kthread.h>
-
+#include <linux/workqueue.h>
 #define __KERNEL_SYSCALLS__
 #include <linux/unistd.h>
 #include <linux/vmalloc.h>
@@ -2300,6 +2300,7 @@
 	idr_for_each_entry(&minors, mdev, i) {
 		idr_remove(&minors, mdev_to_minor(mdev));
 		idr_remove(&mdev->tconn->volumes, mdev->vnr);
+		destroy_workqueue(mdev->submit.wq);
 		del_gendisk(mdev->vdisk);
 		/* synchronize_rcu(); No other threads running at this point */
 		kref_put(&mdev->kref, &drbd_minor_destroy);
@@ -2589,6 +2590,21 @@
 	kfree(tconn);
 }
 
+int init_submitter(struct drbd_conf *mdev)
+{
+	/* opencoded create_singlethread_workqueue(),
+	 * to be able to say "drbd%d", ..., minor */
+	mdev->submit.wq = alloc_workqueue("drbd%u_submit",
+			WQ_UNBOUND | WQ_MEM_RECLAIM, 1, mdev->minor);
+	if (!mdev->submit.wq)
+		return -ENOMEM;
+
+	INIT_WORK(&mdev->submit.worker, do_submit);
+	spin_lock_init(&mdev->submit.lock);
+	INIT_LIST_HEAD(&mdev->submit.writes);
+	return 0;
+}
+
 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
 {
 	struct drbd_conf *mdev;
@@ -2678,6 +2694,12 @@
 		goto out_idr_remove_minor;
 	}
 
+	if (init_submitter(mdev)) {
+		err = ERR_NOMEM;
+		drbd_msg_put_info("unable to create submit workqueue");
+		goto out_idr_remove_vol;
+	}
+
 	add_disk(disk);
 	kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
 
@@ -2688,6 +2710,8 @@
 
 	return NO_ERROR;
 
+out_idr_remove_vol:
+	idr_remove(&tconn->volumes, vnr_got);
 out_idr_remove_minor:
 	idr_remove(&minors, minor_got);
 	synchronize_rcu();
@@ -2834,8 +2858,9 @@
 	rcu_read_unlock();
 }
 
+/* aligned 4kByte */
 struct meta_data_on_disk {
-	u64 la_size;           /* last agreed size. */
+	u64 la_size_sect;      /* last agreed size. */
 	u64 uuid[UI_SIZE];   /* UUIDs. */
 	u64 device_uuid;
 	u64 reserved_u64_1;
@@ -2843,13 +2868,17 @@
 	u32 magic;
 	u32 md_size_sect;
 	u32 al_offset;         /* offset to this block */
-	u32 al_nr_extents;     /* important for restoring the AL */
+	u32 al_nr_extents;     /* important for restoring the AL (userspace) */
 	      /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
 	u32 bm_offset;         /* offset to the bitmap, from here */
 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
 	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
-	u32 reserved_u32[3];
 
+	/* see al_tr_number_to_on_disk_sector() */
+	u32 al_stripes;
+	u32 al_stripe_size_4k;
+
+	u8 reserved_u8[4096 - (7*8 + 10*4)];
 } __packed;
 
 /**
@@ -2862,6 +2891,10 @@
 	sector_t sector;
 	int i;
 
+	/* Don't accidentally change the DRBD meta data layout. */
+	BUILD_BUG_ON(UI_SIZE != 4);
+	BUILD_BUG_ON(sizeof(struct meta_data_on_disk) != 4096);
+
 	del_timer(&mdev->md_sync_timer);
 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
@@ -2876,9 +2909,9 @@
 	if (!buffer)
 		goto out;
 
-	memset(buffer, 0, 512);
+	memset(buffer, 0, sizeof(*buffer));
 
-	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
+	buffer->la_size_sect = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
 	for (i = UI_CURRENT; i < UI_SIZE; i++)
 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
@@ -2893,7 +2926,10 @@
 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
 	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
 
-	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
+	buffer->al_stripes = cpu_to_be32(mdev->ldev->md.al_stripes);
+	buffer->al_stripe_size_4k = cpu_to_be32(mdev->ldev->md.al_stripe_size_4k);
+
+	D_ASSERT(drbd_md_ss(mdev->ldev) == mdev->ldev->md.md_offset);
 	sector = mdev->ldev->md.md_offset;
 
 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
@@ -2911,13 +2947,141 @@
 	put_ldev(mdev);
 }
 
+static int check_activity_log_stripe_size(struct drbd_conf *mdev,
+		struct meta_data_on_disk *on_disk,
+		struct drbd_md *in_core)
+{
+	u32 al_stripes = be32_to_cpu(on_disk->al_stripes);
+	u32 al_stripe_size_4k = be32_to_cpu(on_disk->al_stripe_size_4k);
+	u64 al_size_4k;
+
+	/* both not set: default to old fixed size activity log */
+	if (al_stripes == 0 && al_stripe_size_4k == 0) {
+		al_stripes = 1;
+		al_stripe_size_4k = MD_32kB_SECT/8;
+	}
+
+	/* some paranoia plausibility checks */
+
+	/* we need both values to be set */
+	if (al_stripes == 0 || al_stripe_size_4k == 0)
+		goto err;
+
+	al_size_4k = (u64)al_stripes * al_stripe_size_4k;
+
+	/* Upper limit of activity log area, to avoid potential overflow
+	 * problems in al_tr_number_to_on_disk_sector(). As right now, more
+	 * than 72 * 4k blocks total only increases the amount of history,
+	 * limiting this arbitrarily to 16 GB is not a real limitation ;-)  */
+	if (al_size_4k > (16 * 1024 * 1024/4))
+		goto err;
+
+	/* Lower limit: we need at least 8 transaction slots (32kB)
+	 * to not break existing setups */
+	if (al_size_4k < MD_32kB_SECT/8)
+		goto err;
+
+	in_core->al_stripe_size_4k = al_stripe_size_4k;
+	in_core->al_stripes = al_stripes;
+	in_core->al_size_4k = al_size_4k;
+
+	return 0;
+err:
+	dev_err(DEV, "invalid activity log striping: al_stripes=%u, al_stripe_size_4k=%u\n",
+			al_stripes, al_stripe_size_4k);
+	return -EINVAL;
+}
+
+static int check_offsets_and_sizes(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
+{
+	sector_t capacity = drbd_get_capacity(bdev->md_bdev);
+	struct drbd_md *in_core = &bdev->md;
+	s32 on_disk_al_sect;
+	s32 on_disk_bm_sect;
+
+	/* The on-disk size of the activity log, calculated from offsets, and
+	 * the size of the activity log calculated from the stripe settings,
+	 * should match.
+	 * Though we could relax this a bit: it is ok, if the striped activity log
+	 * fits in the available on-disk activity log size.
+	 * Right now, that would break how resize is implemented.
+	 * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware
+	 * of possible unused padding space in the on disk layout. */
+	if (in_core->al_offset < 0) {
+		if (in_core->bm_offset > in_core->al_offset)
+			goto err;
+		on_disk_al_sect = -in_core->al_offset;
+		on_disk_bm_sect = in_core->al_offset - in_core->bm_offset;
+	} else {
+		if (in_core->al_offset != MD_4kB_SECT)
+			goto err;
+		if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4kB_SECT)
+			goto err;
+
+		on_disk_al_sect = in_core->bm_offset - MD_4kB_SECT;
+		on_disk_bm_sect = in_core->md_size_sect - in_core->bm_offset;
+	}
+
+	/* old fixed size meta data is exactly that: fixed. */
+	if (in_core->meta_dev_idx >= 0) {
+		if (in_core->md_size_sect != MD_128MB_SECT
+		||  in_core->al_offset != MD_4kB_SECT
+		||  in_core->bm_offset != MD_4kB_SECT + MD_32kB_SECT
+		||  in_core->al_stripes != 1
+		||  in_core->al_stripe_size_4k != MD_32kB_SECT/8)
+			goto err;
+	}
+
+	if (capacity < in_core->md_size_sect)
+		goto err;
+	if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev))
+		goto err;
+
+	/* should be aligned, and at least 32k */
+	if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT))
+		goto err;
+
+	/* should fit (for now: exactly) into the available on-disk space;
+	 * overflow prevention is in check_activity_log_stripe_size() above. */
+	if (on_disk_al_sect != in_core->al_size_4k * MD_4kB_SECT)
+		goto err;
+
+	/* again, should be aligned */
+	if (in_core->bm_offset & 7)
+		goto err;
+
+	/* FIXME check for device grow with flex external meta data? */
+
+	/* can the available bitmap space cover the last agreed device size? */
+	if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512)
+		goto err;
+
+	return 0;
+
+err:
+	dev_err(DEV, "meta data offsets don't make sense: idx=%d "
+			"al_s=%u, al_sz4k=%u, al_offset=%d, bm_offset=%d, "
+			"md_size_sect=%u, la_size=%llu, md_capacity=%llu\n",
+			in_core->meta_dev_idx,
+			in_core->al_stripes, in_core->al_stripe_size_4k,
+			in_core->al_offset, in_core->bm_offset, in_core->md_size_sect,
+			(unsigned long long)in_core->la_size_sect,
+			(unsigned long long)capacity);
+
+	return -EINVAL;
+}
+
+
 /**
  * drbd_md_read() - Reads in the meta data super block
  * @mdev:	DRBD device.
  * @bdev:	Device from which the meta data should be read in.
  *
- * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
+ * Return NO_ERROR on success, and an enum drbd_ret_code in case
  * something goes wrong.
+ *
+ * Called exactly once during drbd_adm_attach(), while still being D_DISKLESS,
+ * even before @bdev is assigned to @mdev->ldev.
  */
 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
 {
@@ -2925,12 +3089,17 @@
 	u32 magic, flags;
 	int i, rv = NO_ERROR;
 
-	if (!get_ldev_if_state(mdev, D_ATTACHING))
-		return ERR_IO_MD_DISK;
+	if (mdev->state.disk != D_DISKLESS)
+		return ERR_DISK_CONFIGURED;
 
 	buffer = drbd_md_get_buffer(mdev);
 	if (!buffer)
-		goto out;
+		return ERR_NOMEM;
+
+	/* First, figure out where our meta data superblock is located,
+	 * and read it. */
+	bdev->md.meta_dev_idx = bdev->disk_conf->meta_dev_idx;
+	bdev->md.md_offset = drbd_md_ss(bdev);
 
 	if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
 		/* NOTE: can't do normal error processing here as this is
@@ -2949,46 +3118,52 @@
 		rv = ERR_MD_UNCLEAN;
 		goto err;
 	}
+
+	rv = ERR_MD_INVALID;
 	if (magic != DRBD_MD_MAGIC_08) {
 		if (magic == DRBD_MD_MAGIC_07)
 			dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
 		else
 			dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
-		rv = ERR_MD_INVALID;
-		goto err;
-	}
-	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
-		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
-		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
-		rv = ERR_MD_INVALID;
-		goto err;
-	}
-	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
-		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
-		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
-		rv = ERR_MD_INVALID;
-		goto err;
-	}
-	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
-		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
-		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
-		rv = ERR_MD_INVALID;
 		goto err;
 	}
 
 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
-		rv = ERR_MD_INVALID;
 		goto err;
 	}
 
-	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
+
+	/* convert to in_core endian */
+	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size_sect);
 	for (i = UI_CURRENT; i < UI_SIZE; i++)
 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
 	bdev->md.flags = be32_to_cpu(buffer->flags);
 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
 
+	bdev->md.md_size_sect = be32_to_cpu(buffer->md_size_sect);
+	bdev->md.al_offset = be32_to_cpu(buffer->al_offset);
+	bdev->md.bm_offset = be32_to_cpu(buffer->bm_offset);
+
+	if (check_activity_log_stripe_size(mdev, buffer, &bdev->md))
+		goto err;
+	if (check_offsets_and_sizes(mdev, bdev))
+		goto err;
+
+	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
+		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
+		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
+		goto err;
+	}
+	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
+		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
+		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
+		goto err;
+	}
+
+	rv = NO_ERROR;
+
 	spin_lock_irq(&mdev->tconn->req_lock);
 	if (mdev->state.conn < C_CONNECTED) {
 		unsigned int peer;
@@ -3000,8 +3175,6 @@
 
  err:
 	drbd_md_put_buffer(mdev);
- out:
-	put_ldev(mdev);
 
 	return rv;
 }
@@ -3252,13 +3425,6 @@
 	return 0;
 }
 
-void drbd_go_diskless(struct drbd_conf *mdev)
-{
-	D_ASSERT(mdev->state.disk == D_FAILED);
-	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
-		drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
-}
-
 /**
  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
  * @mdev:	DRBD device.
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 2af26fc..42fda4a 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -696,37 +696,52 @@
 	return 0;
 }
 
-/* initializes the md.*_offset members, so we are able to find
- * the on disk meta data */
+/* Initializes the md.*_offset members, so we are able to find
+ * the on disk meta data.
+ *
+ * We currently have two possible layouts:
+ * external:
+ *   |----------- md_size_sect ------------------|
+ *   [ 4k superblock ][ activity log ][  Bitmap  ]
+ *   | al_offset == 8 |
+ *   | bm_offset = al_offset + X      |
+ *  ==> bitmap sectors = md_size_sect - bm_offset
+ *
+ * internal:
+ *            |----------- md_size_sect ------------------|
+ * [data.....][  Bitmap  ][ activity log ][ 4k superblock ]
+ *                        | al_offset < 0 |
+ *            | bm_offset = al_offset - Y |
+ *  ==> bitmap sectors = Y = al_offset - bm_offset
+ *
+ *  Activity log size used to be fixed 32kB,
+ *  but is about to become configurable.
+ */
 static void drbd_md_set_sector_offsets(struct drbd_conf *mdev,
 				       struct drbd_backing_dev *bdev)
 {
 	sector_t md_size_sect = 0;
-	int meta_dev_idx;
+	unsigned int al_size_sect = bdev->md.al_size_4k * 8;
 
-	rcu_read_lock();
-	meta_dev_idx = rcu_dereference(bdev->disk_conf)->meta_dev_idx;
+	bdev->md.md_offset = drbd_md_ss(bdev);
 
-	switch (meta_dev_idx) {
+	switch (bdev->md.meta_dev_idx) {
 	default:
 		/* v07 style fixed size indexed meta data */
-		bdev->md.md_size_sect = MD_RESERVED_SECT;
-		bdev->md.md_offset = drbd_md_ss__(mdev, bdev);
-		bdev->md.al_offset = MD_AL_OFFSET;
-		bdev->md.bm_offset = MD_BM_OFFSET;
+		bdev->md.md_size_sect = MD_128MB_SECT;
+		bdev->md.al_offset = MD_4kB_SECT;
+		bdev->md.bm_offset = MD_4kB_SECT + al_size_sect;
 		break;
 	case DRBD_MD_INDEX_FLEX_EXT:
 		/* just occupy the full device; unit: sectors */
 		bdev->md.md_size_sect = drbd_get_capacity(bdev->md_bdev);
-		bdev->md.md_offset = 0;
-		bdev->md.al_offset = MD_AL_OFFSET;
-		bdev->md.bm_offset = MD_BM_OFFSET;
+		bdev->md.al_offset = MD_4kB_SECT;
+		bdev->md.bm_offset = MD_4kB_SECT + al_size_sect;
 		break;
 	case DRBD_MD_INDEX_INTERNAL:
 	case DRBD_MD_INDEX_FLEX_INT:
-		bdev->md.md_offset = drbd_md_ss__(mdev, bdev);
 		/* al size is still fixed */
-		bdev->md.al_offset = -MD_AL_SECTORS;
+		bdev->md.al_offset = -al_size_sect;
 		/* we need (slightly less than) ~ this much bitmap sectors: */
 		md_size_sect = drbd_get_capacity(bdev->backing_bdev);
 		md_size_sect = ALIGN(md_size_sect, BM_SECT_PER_EXT);
@@ -735,14 +750,13 @@
 
 		/* plus the "drbd meta data super block",
 		 * and the activity log; */
-		md_size_sect += MD_BM_OFFSET;
+		md_size_sect += MD_4kB_SECT + al_size_sect;
 
 		bdev->md.md_size_sect = md_size_sect;
 		/* bitmap offset is adjusted by 'super' block size */
-		bdev->md.bm_offset   = -md_size_sect + MD_AL_OFFSET;
+		bdev->md.bm_offset   = -md_size_sect + MD_4kB_SECT;
 		break;
 	}
-	rcu_read_unlock();
 }
 
 /* input size is expected to be in KB */
@@ -805,7 +819,7 @@
 enum determine_dev_size drbd_determine_dev_size(struct drbd_conf *mdev, enum dds_flags flags) __must_hold(local)
 {
 	sector_t prev_first_sect, prev_size; /* previous meta location */
-	sector_t la_size, u_size;
+	sector_t la_size_sect, u_size;
 	sector_t size;
 	char ppb[10];
 
@@ -828,7 +842,7 @@
 
 	prev_first_sect = drbd_md_first_sector(mdev->ldev);
 	prev_size = mdev->ldev->md.md_size_sect;
-	la_size = mdev->ldev->md.la_size_sect;
+	la_size_sect = mdev->ldev->md.la_size_sect;
 
 	/* TODO: should only be some assert here, not (re)init... */
 	drbd_md_set_sector_offsets(mdev, mdev->ldev);
@@ -864,7 +878,7 @@
 	if (rv == dev_size_error)
 		goto out;
 
-	la_size_changed = (la_size != mdev->ldev->md.la_size_sect);
+	la_size_changed = (la_size_sect != mdev->ldev->md.la_size_sect);
 
 	md_moved = prev_first_sect != drbd_md_first_sector(mdev->ldev)
 		|| prev_size	   != mdev->ldev->md.md_size_sect;
@@ -886,9 +900,9 @@
 		drbd_md_mark_dirty(mdev);
 	}
 
-	if (size > la_size)
+	if (size > la_size_sect)
 		rv = grew;
-	if (size < la_size)
+	if (size < la_size_sect)
 		rv = shrunk;
 out:
 	lc_unlock(mdev->act_log);
@@ -903,7 +917,7 @@
 		  sector_t u_size, int assume_peer_has_space)
 {
 	sector_t p_size = mdev->p_size;   /* partner's disk size. */
-	sector_t la_size = bdev->md.la_size_sect; /* last agreed size. */
+	sector_t la_size_sect = bdev->md.la_size_sect; /* last agreed size. */
 	sector_t m_size; /* my size */
 	sector_t size = 0;
 
@@ -917,8 +931,8 @@
 	if (p_size && m_size) {
 		size = min_t(sector_t, p_size, m_size);
 	} else {
-		if (la_size) {
-			size = la_size;
+		if (la_size_sect) {
+			size = la_size_sect;
 			if (m_size && m_size < size)
 				size = m_size;
 			if (p_size && p_size < size)
@@ -1127,15 +1141,32 @@
 	return 0 != (flags & DRBD_GENL_F_SET_DEFAULTS);
 }
 
-static void enforce_disk_conf_limits(struct disk_conf *dc)
+static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev)
 {
-	if (dc->al_extents < DRBD_AL_EXTENTS_MIN)
-		dc->al_extents = DRBD_AL_EXTENTS_MIN;
-	if (dc->al_extents > DRBD_AL_EXTENTS_MAX)
-		dc->al_extents = DRBD_AL_EXTENTS_MAX;
+	/* This is limited by 16 bit "slot" numbers,
+	 * and by available on-disk context storage.
+	 *
+	 * Also (u16)~0 is special (denotes a "free" extent).
+	 *
+	 * One transaction occupies one 4kB on-disk block,
+	 * we have n such blocks in the on disk ring buffer,
+	 * the "current" transaction may fail (n-1),
+	 * and there is 919 slot numbers context information per transaction.
+	 *
+	 * 72 transaction blocks amounts to more than 2**16 context slots,
+	 * so cap there first.
+	 */
+	const unsigned int max_al_nr = DRBD_AL_EXTENTS_MAX;
+	const unsigned int sufficient_on_disk =
+		(max_al_nr + AL_CONTEXT_PER_TRANSACTION -1)
+		/AL_CONTEXT_PER_TRANSACTION;
 
-	if (dc->c_plan_ahead > DRBD_C_PLAN_AHEAD_MAX)
-		dc->c_plan_ahead = DRBD_C_PLAN_AHEAD_MAX;
+	unsigned int al_size_4k = bdev->md.al_size_4k;
+
+	if (al_size_4k > sufficient_on_disk)
+		return max_al_nr;
+
+	return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION;
 }
 
 int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
@@ -1182,7 +1213,13 @@
 	if (!expect(new_disk_conf->resync_rate >= 1))
 		new_disk_conf->resync_rate = 1;
 
-	enforce_disk_conf_limits(new_disk_conf);
+	if (new_disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
+		new_disk_conf->al_extents = DRBD_AL_EXTENTS_MIN;
+	if (new_disk_conf->al_extents > drbd_al_extents_max(mdev->ldev))
+		new_disk_conf->al_extents = drbd_al_extents_max(mdev->ldev);
+
+	if (new_disk_conf->c_plan_ahead > DRBD_C_PLAN_AHEAD_MAX)
+		new_disk_conf->c_plan_ahead = DRBD_C_PLAN_AHEAD_MAX;
 
 	fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
 	if (fifo_size != mdev->rs_plan_s->size) {
@@ -1330,7 +1367,8 @@
 		goto fail;
 	}
 
-	enforce_disk_conf_limits(new_disk_conf);
+	if (new_disk_conf->c_plan_ahead > DRBD_C_PLAN_AHEAD_MAX)
+		new_disk_conf->c_plan_ahead = DRBD_C_PLAN_AHEAD_MAX;
 
 	new_plan = fifo_alloc((new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ);
 	if (!new_plan) {
@@ -1399,8 +1437,16 @@
 		goto fail;
 	}
 
-	/* RT - for drbd_get_max_capacity() DRBD_MD_INDEX_FLEX_INT */
-	drbd_md_set_sector_offsets(mdev, nbc);
+	/* Read our meta data super block early.
+	 * This also sets other on-disk offsets. */
+	retcode = drbd_md_read(mdev, nbc);
+	if (retcode != NO_ERROR)
+		goto fail;
+
+	if (new_disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
+		new_disk_conf->al_extents = DRBD_AL_EXTENTS_MIN;
+	if (new_disk_conf->al_extents > drbd_al_extents_max(nbc))
+		new_disk_conf->al_extents = drbd_al_extents_max(nbc);
 
 	if (drbd_get_max_capacity(nbc) < new_disk_conf->disk_size) {
 		dev_err(DEV, "max capacity %llu smaller than disk size %llu\n",
@@ -1416,7 +1462,7 @@
 		min_md_device_sectors = (2<<10);
 	} else {
 		max_possible_sectors = DRBD_MAX_SECTORS;
-		min_md_device_sectors = MD_RESERVED_SECT * (new_disk_conf->meta_dev_idx + 1);
+		min_md_device_sectors = MD_128MB_SECT * (new_disk_conf->meta_dev_idx + 1);
 	}
 
 	if (drbd_get_capacity(nbc->md_bdev) < min_md_device_sectors) {
@@ -1467,8 +1513,6 @@
 	if (!get_ldev_if_state(mdev, D_ATTACHING))
 		goto force_diskless;
 
-	drbd_md_set_sector_offsets(mdev, nbc);
-
 	if (!mdev->bitmap) {
 		if (drbd_bm_init(mdev)) {
 			retcode = ERR_NOMEM;
@@ -1476,10 +1520,6 @@
 		}
 	}
 
-	retcode = drbd_md_read(mdev, nbc);
-	if (retcode != NO_ERROR)
-		goto force_diskless_dec;
-
 	if (mdev->state.conn < C_CONNECTED &&
 	    mdev->state.role == R_PRIMARY &&
 	    (mdev->ed_uuid & ~((u64)1)) != (nbc->md.uuid[UI_CURRENT] & ~((u64)1))) {
@@ -3162,6 +3202,7 @@
 				    CS_VERBOSE + CS_WAIT_COMPLETE);
 		idr_remove(&mdev->tconn->volumes, mdev->vnr);
 		idr_remove(&minors, mdev_to_minor(mdev));
+		destroy_workqueue(mdev->submit.wq);
 		del_gendisk(mdev->vdisk);
 		synchronize_rcu();
 		kref_put(&mdev->kref, &drbd_minor_destroy);
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index a9eccfc..1921871 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -2265,7 +2265,7 @@
 		drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
-		drbd_al_begin_io(mdev, &peer_req->i);
+		drbd_al_begin_io(mdev, &peer_req->i, true);
 	}
 
 	err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
@@ -3992,7 +3992,7 @@
 
 	clear_bit(DISCARD_MY_DATA, &mdev->flags);
 
-	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
+	drbd_md_sync(mdev); /* update connected indicator, la_size_sect, ... */
 
 	return 0;
 }
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 2b8303a..9f7ff1c 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -34,14 +34,14 @@
 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
 
 /* Update disk stats at start of I/O request */
-static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
+static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
 {
-	const int rw = bio_data_dir(bio);
+	const int rw = bio_data_dir(req->master_bio);
 	int cpu;
 	cpu = part_stat_lock();
 	part_round_stats(cpu, &mdev->vdisk->part0);
 	part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
-	part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
+	part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], req->i.size >> 9);
 	(void) cpu; /* The macro invocations above want the cpu argument, I do not like
 		       the compiler warning about cpu only assigned but never used... */
 	part_inc_in_flight(&mdev->vdisk->part0, rw);
@@ -1020,12 +1020,24 @@
 		bio_endio(bio, -EIO);
 }
 
-void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
+static void drbd_queue_write(struct drbd_conf *mdev, struct drbd_request *req)
 {
-	const int rw = bio_rw(bio);
-	struct bio_and_error m = { NULL, };
+	spin_lock(&mdev->submit.lock);
+	list_add_tail(&req->tl_requests, &mdev->submit.writes);
+	spin_unlock(&mdev->submit.lock);
+	queue_work(mdev->submit.wq, &mdev->submit.worker);
+}
+
+/* returns the new drbd_request pointer, if the caller is expected to
+ * drbd_send_and_submit() it (to save latency), or NULL if we queued the
+ * request on the submitter thread.
+ * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
+ */
+struct drbd_request *
+drbd_request_prepare(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
+{
+	const int rw = bio_data_dir(bio);
 	struct drbd_request *req;
-	bool no_remote = false;
 
 	/* allocate outside of all locks; */
 	req = drbd_req_new(mdev, bio);
@@ -1035,7 +1047,7 @@
 		 * if user cannot handle io errors, that's not our business. */
 		dev_err(DEV, "could not kmalloc() req\n");
 		bio_endio(bio, -ENOMEM);
-		return;
+		return ERR_PTR(-ENOMEM);
 	}
 	req->start_time = start_time;
 
@@ -1044,19 +1056,27 @@
 		req->private_bio = NULL;
 	}
 
-	/* For WRITES going to the local disk, grab a reference on the target
-	 * extent.  This waits for any resync activity in the corresponding
-	 * resync extent to finish, and, if necessary, pulls in the target
-	 * extent into the activity log, which involves further disk io because
-	 * of transactional on-disk meta data updates.
-	 * Empty flushes don't need to go into the activity log, they can only
-	 * flush data for pending writes which are already in there. */
+	/* Update disk stats */
+	_drbd_start_io_acct(mdev, req);
+
 	if (rw == WRITE && req->private_bio && req->i.size
 	&& !test_bit(AL_SUSPENDED, &mdev->flags)) {
+		if (!drbd_al_begin_io_fastpath(mdev, &req->i)) {
+			drbd_queue_write(mdev, req);
+			return NULL;
+		}
 		req->rq_state |= RQ_IN_ACT_LOG;
-		drbd_al_begin_io(mdev, &req->i);
 	}
 
+	return req;
+}
+
+static void drbd_send_and_submit(struct drbd_conf *mdev, struct drbd_request *req)
+{
+	const int rw = bio_rw(req->master_bio);
+	struct bio_and_error m = { NULL, };
+	bool no_remote = false;
+
 	spin_lock_irq(&mdev->tconn->req_lock);
 	if (rw == WRITE) {
 		/* This may temporarily give up the req_lock,
@@ -1078,9 +1098,6 @@
 		goto out;
 	}
 
-	/* Update disk stats */
-	_drbd_start_io_acct(mdev, req, bio);
-
 	/* We fail READ/READA early, if we can not serve it.
 	 * We must do this before req is registered on any lists.
 	 * Otherwise, drbd_req_complete() will queue failed READ for retry. */
@@ -1137,7 +1154,116 @@
 
 	if (m.bio)
 		complete_master_bio(mdev, &m);
-	return;
+}
+
+void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
+{
+	struct drbd_request *req = drbd_request_prepare(mdev, bio, start_time);
+	if (IS_ERR_OR_NULL(req))
+		return;
+	drbd_send_and_submit(mdev, req);
+}
+
+static void submit_fast_path(struct drbd_conf *mdev, struct list_head *incoming)
+{
+	struct drbd_request *req, *tmp;
+	list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
+		const int rw = bio_data_dir(req->master_bio);
+
+		if (rw == WRITE /* rw != WRITE should not even end up here! */
+		&& req->private_bio && req->i.size
+		&& !test_bit(AL_SUSPENDED, &mdev->flags)) {
+			if (!drbd_al_begin_io_fastpath(mdev, &req->i))
+				continue;
+
+			req->rq_state |= RQ_IN_ACT_LOG;
+		}
+
+		list_del_init(&req->tl_requests);
+		drbd_send_and_submit(mdev, req);
+	}
+}
+
+static bool prepare_al_transaction_nonblock(struct drbd_conf *mdev,
+					    struct list_head *incoming,
+					    struct list_head *pending)
+{
+	struct drbd_request *req, *tmp;
+	int wake = 0;
+	int err;
+
+	spin_lock_irq(&mdev->al_lock);
+	list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
+		err = drbd_al_begin_io_nonblock(mdev, &req->i);
+		if (err == -EBUSY)
+			wake = 1;
+		if (err)
+			continue;
+		req->rq_state |= RQ_IN_ACT_LOG;
+		list_move_tail(&req->tl_requests, pending);
+	}
+	spin_unlock_irq(&mdev->al_lock);
+	if (wake)
+		wake_up(&mdev->al_wait);
+
+	return !list_empty(pending);
+}
+
+void do_submit(struct work_struct *ws)
+{
+	struct drbd_conf *mdev = container_of(ws, struct drbd_conf, submit.worker);
+	LIST_HEAD(incoming);
+	LIST_HEAD(pending);
+	struct drbd_request *req, *tmp;
+
+	for (;;) {
+		spin_lock(&mdev->submit.lock);
+		list_splice_tail_init(&mdev->submit.writes, &incoming);
+		spin_unlock(&mdev->submit.lock);
+
+		submit_fast_path(mdev, &incoming);
+		if (list_empty(&incoming))
+			break;
+
+		wait_event(mdev->al_wait, prepare_al_transaction_nonblock(mdev, &incoming, &pending));
+		/* Maybe more was queued, while we prepared the transaction?
+		 * Try to stuff them into this transaction as well.
+		 * Be strictly non-blocking here, no wait_event, we already
+		 * have something to commit.
+		 * Stop if we don't make any more progres.
+		 */
+		for (;;) {
+			LIST_HEAD(more_pending);
+			LIST_HEAD(more_incoming);
+			bool made_progress;
+
+			/* It is ok to look outside the lock,
+			 * it's only an optimization anyways */
+			if (list_empty(&mdev->submit.writes))
+				break;
+
+			spin_lock(&mdev->submit.lock);
+			list_splice_tail_init(&mdev->submit.writes, &more_incoming);
+			spin_unlock(&mdev->submit.lock);
+
+			if (list_empty(&more_incoming))
+				break;
+
+			made_progress = prepare_al_transaction_nonblock(mdev, &more_incoming, &more_pending);
+
+			list_splice_tail_init(&more_pending, &pending);
+			list_splice_tail_init(&more_incoming, &incoming);
+
+			if (!made_progress)
+				break;
+		}
+		drbd_al_begin_io_commit(mdev, false);
+
+		list_for_each_entry_safe(req, tmp, &pending, tl_requests) {
+			list_del_init(&req->tl_requests);
+			drbd_send_and_submit(mdev, req);
+		}
+	}
 }
 
 void drbd_make_request(struct request_queue *q, struct bio *bio)
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 424dc7b..f41e224 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -89,7 +89,8 @@
 	md_io->done = 1;
 	wake_up(&mdev->misc_wait);
 	bio_put(bio);
-	put_ldev(mdev);
+	if (mdev->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
+		put_ldev(mdev);
 }
 
 /* reads on behalf of the partner,
@@ -1410,7 +1411,7 @@
 	struct drbd_conf *mdev = w->mdev;
 
 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
-		drbd_al_begin_io(mdev, &req->i);
+		drbd_al_begin_io(mdev, &req->i, false);
 
 	drbd_req_make_private_bio(req, req->master_bio);
 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
diff --git a/include/linux/drbd_limits.h b/include/linux/drbd_limits.h
index 1fa19c5..1fedf2b 100644
--- a/include/linux/drbd_limits.h
+++ b/include/linux/drbd_limits.h
@@ -126,13 +126,12 @@
 #define DRBD_RESYNC_RATE_DEF 250
 #define DRBD_RESYNC_RATE_SCALE 'k'  /* kilobytes */
 
-  /* less than 7 would hit performance unnecessarily.
-   * 919 slots context information per transaction,
-   * 32k activity log, 4k transaction size,
-   * one transaction in flight:
-   * 919 * 7 = 6433 */
+  /* less than 7 would hit performance unnecessarily. */
 #define DRBD_AL_EXTENTS_MIN  7
-#define DRBD_AL_EXTENTS_MAX  6433
+  /* we use u16 as "slot number", (u16)~0 is "FREE".
+   * If you use >= 292 kB on-disk ring buffer,
+   * this is the maximum you can use: */
+#define DRBD_AL_EXTENTS_MAX  0xfffe
 #define DRBD_AL_EXTENTS_DEF  1237
 #define DRBD_AL_EXTENTS_SCALE '1'
 
diff --git a/include/linux/lru_cache.h b/include/linux/lru_cache.h
index 4019013..4626228 100644
--- a/include/linux/lru_cache.h
+++ b/include/linux/lru_cache.h
@@ -256,6 +256,7 @@
 extern void lc_set(struct lru_cache *lc, unsigned int enr, int index);
 extern void lc_del(struct lru_cache *lc, struct lc_element *element);
 
+extern struct lc_element *lc_get_cumulative(struct lru_cache *lc, unsigned int enr);
 extern struct lc_element *lc_try_get(struct lru_cache *lc, unsigned int enr);
 extern struct lc_element *lc_find(struct lru_cache *lc, unsigned int enr);
 extern struct lc_element *lc_get(struct lru_cache *lc, unsigned int enr);
diff --git a/lib/lru_cache.c b/lib/lru_cache.c
index 8335d39..4a83ecd 100644
--- a/lib/lru_cache.c
+++ b/lib/lru_cache.c
@@ -365,7 +365,13 @@
 	return 0;
 }
 
-static struct lc_element *__lc_get(struct lru_cache *lc, unsigned int enr, bool may_change)
+/* used as internal flags to __lc_get */
+enum {
+	LC_GET_MAY_CHANGE = 1,
+	LC_GET_MAY_USE_UNCOMMITTED = 2,
+};
+
+static struct lc_element *__lc_get(struct lru_cache *lc, unsigned int enr, unsigned int flags)
 {
 	struct lc_element *e;
 
@@ -380,22 +386,31 @@
 	 * this enr is currently being pulled in already,
 	 * and will be available once the pending transaction
 	 * has been committed. */
-	if (e && e->lc_new_number == e->lc_number) {
+	if (e) {
+		if (e->lc_new_number != e->lc_number) {
+			/* It has been found above, but on the "to_be_changed"
+			 * list, not yet committed.  Don't pull it in twice,
+			 * wait for the transaction, then try again...
+			 */
+			if (!(flags & LC_GET_MAY_USE_UNCOMMITTED))
+				RETURN(NULL);
+			/* ... unless the caller is aware of the implications,
+			 * probably preparing a cumulative transaction. */
+			++e->refcnt;
+			++lc->hits;
+			RETURN(e);
+		}
+		/* else: lc_new_number == lc_number; a real hit. */
 		++lc->hits;
 		if (e->refcnt++ == 0)
 			lc->used++;
 		list_move(&e->list, &lc->in_use); /* Not evictable... */
 		RETURN(e);
 	}
+	/* e == NULL */
 
 	++lc->misses;
-	if (!may_change)
-		RETURN(NULL);
-
-	/* It has been found above, but on the "to_be_changed" list, not yet
-	 * committed.  Don't pull it in twice, wait for the transaction, then
-	 * try again */
-	if (e)
+	if (!(flags & LC_GET_MAY_CHANGE))
 		RETURN(NULL);
 
 	/* To avoid races with lc_try_lock(), first, mark us dirty
@@ -477,7 +492,27 @@
  */
 struct lc_element *lc_get(struct lru_cache *lc, unsigned int enr)
 {
-	return __lc_get(lc, enr, 1);
+	return __lc_get(lc, enr, LC_GET_MAY_CHANGE);
+}
+
+/**
+ * lc_get_cumulative - like lc_get; also finds to-be-changed elements
+ * @lc: the lru cache to operate on
+ * @enr: the label to look up
+ *
+ * Unlike lc_get this also returns the element for @enr, if it is belonging to
+ * a pending transaction, so the return values are like for lc_get(),
+ * plus:
+ *
+ * pointer to an element already on the "to_be_changed" list.
+ * 	In this case, the cache was already marked %LC_DIRTY.
+ *
+ * Caller needs to make sure that the pending transaction is completed,
+ * before proceeding to actually use this element.
+ */
+struct lc_element *lc_get_cumulative(struct lru_cache *lc, unsigned int enr)
+{
+	return __lc_get(lc, enr, LC_GET_MAY_CHANGE|LC_GET_MAY_USE_UNCOMMITTED);
 }
 
 /**
@@ -648,3 +683,4 @@
 EXPORT_SYMBOL(lc_seq_dump_details);
 EXPORT_SYMBOL(lc_try_lock);
 EXPORT_SYMBOL(lc_is_used);
+EXPORT_SYMBOL(lc_get_cumulative);