[PATCH] RPC: transport switch function naming

 Introduce block header comments and a function naming convention to the
 socket transport implementation.  Provide a debug setting for transports
 that is separate from RPCDBG_XPRT.  Eliminate xprt_default_timeout().

 Provide block comments for exposed interfaces in xprt.c, and eliminate
 the useless obvious comments.

 Convert printk's to dprintk's.

 Test-plan:
 Compile kernel with CONFIG_NFS enabled.

 Version: Thu, 11 Aug 2005 16:04:04 -0400

 Signed-off-by: Chuck Lever <cel@netapp.com>
 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index fa1180a..80222de 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -33,23 +33,21 @@
 #include <net/udp.h>
 #include <net/tcp.h>
 
+/*
+ * Maximum port number to use when requesting a reserved port.
+ */
+#define XS_MAX_RESVPORT		(800U)
+
 #ifdef RPC_DEBUG
 # undef  RPC_DEBUG_DATA
-# define RPCDBG_FACILITY	RPCDBG_XPRT
+# define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-#define XPRT_MAX_RESVPORT	(800)
-
 #ifdef RPC_DEBUG_DATA
-/*
- * Print the buffer contents (first 128 bytes only--just enough for
- * diropres return).
- */
-static void
-xprt_pktdump(char *msg, u32 *packet, unsigned int count)
+static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 {
-	u8	*buf = (u8 *) packet;
-	int	j;
+	u8 *buf = (u8 *) packet;
+	int j;
 
 	dprintk("RPC:      %s\n", msg);
 	for (j = 0; j < count && j < 128; j += 4) {
@@ -64,25 +62,22 @@
 	dprintk("\n");
 }
 #else
-static inline void
-xprt_pktdump(char *msg, u32 *packet, unsigned int count)
+static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 {
 	/* NOP */
 }
 #endif
 
-/*
- * Look up RPC transport given an INET socket
+/**
+ * xs_sendpages - write pages directly to a socket
+ * @sock: socket to send on
+ * @addr: UDP only -- address of destination
+ * @addrlen: UDP only -- length of destination address
+ * @xdr: buffer containing this request
+ * @base: starting position in the buffer
+ *
  */
-static inline struct rpc_xprt *
-xprt_from_sock(struct sock *sk)
-{
-	return (struct rpc_xprt *) sk->sk_user_data;
-}
-
-static int
-xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
-		struct xdr_buf *xdr, unsigned int base, int msgflags)
+static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, int msgflags)
 {
 	struct page **ppage = xdr->pages;
 	unsigned int len, pglen = xdr->page_len;
@@ -125,7 +120,7 @@
 	}
 	if (base || xdr->page_base) {
 		pglen -= base;
-		base  += xdr->page_base;
+		base += xdr->page_base;
 		ppage += base >> PAGE_CACHE_SHIFT;
 		base &= ~PAGE_CACHE_MASK;
 	}
@@ -176,23 +171,25 @@
 	return ret;
 }
 
-/*
- * Write data to socket.
+/**
+ * xs_sendmsg - write an RPC request to a socket
+ * @xprt: generic transport
+ * @req: the RPC request to write
+ *
  */
-static inline int
-xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
+static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
-	struct socket	*sock = xprt->sock;
-	struct xdr_buf	*xdr = &req->rq_snd_buf;
+	struct socket *sock = xprt->sock;
+	struct xdr_buf *xdr = &req->rq_snd_buf;
 	struct sockaddr *addr = NULL;
 	int addrlen = 0;
-	unsigned int	skip;
-	int		result;
+	unsigned int skip;
+	int result;
 
 	if (!sock)
 		return -ENOTCONN;
 
-	xprt_pktdump("packet data:",
+	xs_pktdump("packet data:",
 				req->rq_svec->iov_base,
 				req->rq_svec->iov_len);
 
@@ -201,13 +198,13 @@
 		addr = (struct sockaddr *) &xprt->addr;
 		addrlen = sizeof(xprt->addr);
 	}
-	/* Dont repeat bytes */
+	/* Don't repeat bytes */
 	skip = req->rq_bytes_sent;
 
 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
-	result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
+	result = xs_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
 
-	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", xdr->len - skip, result);
+	dprintk("RPC:      xs_sendmsg(%d) = %d\n", xdr->len - skip, result);
 
 	if (result >= 0)
 		return result;
@@ -215,8 +212,7 @@
 	switch (result) {
 	case -ECONNREFUSED:
 		/* When the server has died, an ICMP port unreachable message
-		 * prompts ECONNREFUSED.
-		 */
+		 * prompts ECONNREFUSED. */
 	case -EAGAIN:
 		break;
 	case -ECONNRESET:
@@ -227,13 +223,25 @@
 			result = -ENOTCONN;
 		break;
 	default:
-		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
+		break;
 	}
 	return result;
 }
 
-static int
-xprt_send_request(struct rpc_task *task)
+/**
+ * xs_send_request - write an RPC request to a socket
+ * @task: address of RPC task that manages the state of an RPC request
+ *
+ * Return values:
+ *      0:  The request has been sent
+ * EAGAIN:  The socket was blocked, please call again later to
+ *          complete the request
+ *  other:  Some other error occured, the request was not sent
+ *
+ * XXX: In the case of soft timeouts, should we eventually give up
+ *      if the socket is not able to make progress?
+ */
+static int xs_send_request(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
@@ -242,18 +250,18 @@
 	/* set up everything as needed. */
 	/* Write the record marker */
 	if (xprt->stream) {
-		u32	*marker = req->rq_svec[0].iov_base;
+		u32 *marker = req->rq_svec[0].iov_base;
 
 		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
 	}
 
 	/* Continue transmitting the packet/record. We must be careful
 	 * to cope with writespace callbacks arriving _after_ we have
-	 * called xprt_sendmsg().
+	 * called sendmsg().
 	 */
 	while (1) {
 		req->rq_xtime = jiffies;
-		status = xprt_sendmsg(xprt, req);
+		status = xs_sendmsg(xprt, req);
 
 		if (status < 0)
 			break;
@@ -285,7 +293,7 @@
 
 	if (status == -EAGAIN) {
 		if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
-			/* Protect against races with xprt_write_space */
+			/* Protect against races with xs_write_space */
 			spin_lock_bh(&xprt->sock_lock);
 			/* Don't race with disconnect */
 			if (!xprt_connected(xprt))
@@ -303,65 +311,77 @@
 	return status;
 }
 
-/*
- * Close down a transport socket
+/**
+ * xs_close - close a socket
+ * @xprt: transport
+ *
  */
-static void
-xprt_close(struct rpc_xprt *xprt)
+static void xs_close(struct rpc_xprt *xprt)
 {
-	struct socket	*sock = xprt->sock;
-	struct sock	*sk = xprt->inet;
+	struct socket *sock = xprt->sock;
+	struct sock *sk = xprt->inet;
 
 	if (!sk)
 		return;
 
+	dprintk("RPC:      xs_close xprt %p\n", xprt);
+
 	write_lock_bh(&sk->sk_callback_lock);
 	xprt->inet = NULL;
 	xprt->sock = NULL;
 
-	sk->sk_user_data    = NULL;
-	sk->sk_data_ready   = xprt->old_data_ready;
+	sk->sk_user_data = NULL;
+	sk->sk_data_ready = xprt->old_data_ready;
 	sk->sk_state_change = xprt->old_state_change;
-	sk->sk_write_space  = xprt->old_write_space;
+	sk->sk_write_space = xprt->old_write_space;
 	write_unlock_bh(&sk->sk_callback_lock);
 
-	sk->sk_no_check	 = 0;
+	sk->sk_no_check = 0;
 
 	sock_release(sock);
 }
 
-static void xprt_socket_destroy(struct rpc_xprt *xprt)
+/**
+ * xs_destroy - prepare to shutdown a transport
+ * @xprt: doomed transport
+ *
+ */
+static void xs_destroy(struct rpc_xprt *xprt)
 {
+	dprintk("RPC:      xs_destroy xprt %p\n", xprt);
+
 	cancel_delayed_work(&xprt->sock_connect);
 	flush_scheduled_work();
 
 	xprt_disconnect(xprt);
-	xprt_close(xprt);
+	xs_close(xprt);
 	kfree(xprt->slot);
 }
 
-/*
- * Input handler for RPC replies. Called from a bottom half and hence
- * atomic.
- */
-static void
-udp_data_ready(struct sock *sk, int len)
+static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
 {
-	struct rpc_task	*task;
-	struct rpc_xprt	*xprt;
+	return (struct rpc_xprt *) sk->sk_user_data;
+}
+
+/**
+ * xs_udp_data_ready - "data ready" callback for UDP sockets
+ * @sk: socket with data to read
+ * @len: how much data to read
+ *
+ */
+static void xs_udp_data_ready(struct sock *sk, int len)
+{
+	struct rpc_task *task;
+	struct rpc_xprt *xprt;
 	struct rpc_rqst *rovr;
-	struct sk_buff	*skb;
+	struct sk_buff *skb;
 	int err, repsize, copied;
 	u32 _xid, *xp;
 
 	read_lock(&sk->sk_callback_lock);
-	dprintk("RPC:      udp_data_ready...\n");
-	if (!(xprt = xprt_from_sock(sk))) {
-		printk("RPC:      udp_data_ready request not found!\n");
+	dprintk("RPC:      xs_udp_data_ready...\n");
+	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
-	}
-
-	dprintk("RPC:      udp_data_ready client %p\n", xprt);
 
 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
 		goto out;
@@ -371,7 +391,7 @@
 
 	repsize = skb->len - sizeof(struct udphdr);
 	if (repsize < 4) {
-		printk("RPC: impossible RPC reply size %d!\n", repsize);
+		dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
 		goto dropit;
 	}
 
@@ -410,11 +430,7 @@
 	read_unlock(&sk->sk_callback_lock);
 }
 
-/*
- * Copy from an skb into memory and shrink the skb.
- */
-static inline size_t
-tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
+static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
 {
 	if (len > desc->count)
 		len = desc->count;
@@ -430,18 +446,14 @@
 	return len;
 }
 
-/*
- * TCP read fragment marker
- */
-static inline void
-tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
 	size_t len, used;
 	char *p;
 
 	p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
 	len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
-	used = tcp_copy_data(desc, p, len);
+	used = xs_tcp_copy_data(desc, p, len);
 	xprt->tcp_offset += used;
 	if (used != len)
 		return;
@@ -455,15 +467,15 @@
 	xprt->tcp_offset = 0;
 	/* Sanity check of the record length */
 	if (xprt->tcp_reclen < 4) {
-		printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
+		dprintk("RPC:      invalid TCP record fragment length\n");
 		xprt_disconnect(xprt);
+		return;
 	}
 	dprintk("RPC:      reading TCP record fragment of length %d\n",
 			xprt->tcp_reclen);
 }
 
-static void
-tcp_check_recm(struct rpc_xprt *xprt)
+static void xs_tcp_check_recm(struct rpc_xprt *xprt)
 {
 	dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
 			xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
@@ -478,11 +490,7 @@
 	}
 }
 
-/*
- * TCP read xid
- */
-static inline void
-tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
 	size_t len, used;
 	char *p;
@@ -490,7 +498,7 @@
 	len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
 	dprintk("RPC:      reading XID (%Zu bytes)\n", len);
 	p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
-	used = tcp_copy_data(desc, p, len);
+	used = xs_tcp_copy_data(desc, p, len);
 	xprt->tcp_offset += used;
 	if (used != len)
 		return;
@@ -499,14 +507,10 @@
 	xprt->tcp_copied = 4;
 	dprintk("RPC:      reading reply for XID %08x\n",
 						ntohl(xprt->tcp_xid));
-	tcp_check_recm(xprt);
+	xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP read and complete request
- */
-static inline void
-tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
 	struct rpc_rqst *req;
 	struct xdr_buf *rcvbuf;
@@ -533,12 +537,12 @@
 		memcpy(&my_desc, desc, sizeof(my_desc));
 		my_desc.count = len;
 		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
-					  &my_desc, tcp_copy_data);
+					  &my_desc, xs_tcp_copy_data);
 		desc->count -= r;
 		desc->offset += r;
 	} else
 		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
-					  desc, tcp_copy_data);
+					  desc, xs_tcp_copy_data);
 
 	if (r > 0) {
 		xprt->tcp_copied += r;
@@ -581,14 +585,10 @@
 		xprt_complete_rqst(xprt, req, xprt->tcp_copied);
 	}
 	spin_unlock(&xprt->sock_lock);
-	tcp_check_recm(xprt);
+	xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP discard extra bytes from a short read
- */
-static inline void
-tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
+static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
 {
 	size_t len;
 
@@ -599,16 +599,10 @@
 	desc->offset += len;
 	xprt->tcp_offset += len;
 	dprintk("RPC:      discarded %Zu bytes\n", len);
-	tcp_check_recm(xprt);
+	xs_tcp_check_recm(xprt);
 }
 
-/*
- * TCP record receive routine
- * We first have to grab the record marker, then the XID, then the data.
- */
-static int
-tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
-		unsigned int offset, size_t len)
+static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
 {
 	struct rpc_xprt *xprt = rd_desc->arg.data;
 	skb_reader_t desc = {
@@ -616,64 +610,72 @@
 		.offset	= offset,
 		.count	= len,
 		.csum	= 0
-       	};
+	};
 
-	dprintk("RPC:      tcp_data_recv\n");
+	dprintk("RPC:      xs_tcp_data_recv started\n");
 	do {
 		/* Read in a new fragment marker if necessary */
 		/* Can we ever really expect to get completely empty fragments? */
 		if (xprt->tcp_flags & XPRT_COPY_RECM) {
-			tcp_read_fraghdr(xprt, &desc);
+			xs_tcp_read_fraghdr(xprt, &desc);
 			continue;
 		}
 		/* Read in the xid if necessary */
 		if (xprt->tcp_flags & XPRT_COPY_XID) {
-			tcp_read_xid(xprt, &desc);
+			xs_tcp_read_xid(xprt, &desc);
 			continue;
 		}
 		/* Read in the request data */
 		if (xprt->tcp_flags & XPRT_COPY_DATA) {
-			tcp_read_request(xprt, &desc);
+			xs_tcp_read_request(xprt, &desc);
 			continue;
 		}
 		/* Skip over any trailing bytes on short reads */
-		tcp_read_discard(xprt, &desc);
+		xs_tcp_read_discard(xprt, &desc);
 	} while (desc.count);
-	dprintk("RPC:      tcp_data_recv done\n");
+	dprintk("RPC:      xs_tcp_data_recv done\n");
 	return len - desc.count;
 }
 
-static void tcp_data_ready(struct sock *sk, int bytes)
+/**
+ * xs_tcp_data_ready - "data ready" callback for TCP sockets
+ * @sk: socket with data to read
+ * @bytes: how much data to read
+ *
+ */
+static void xs_tcp_data_ready(struct sock *sk, int bytes)
 {
 	struct rpc_xprt *xprt;
 	read_descriptor_t rd_desc;
 
 	read_lock(&sk->sk_callback_lock);
-	dprintk("RPC:      tcp_data_ready...\n");
-	if (!(xprt = xprt_from_sock(sk))) {
-		printk("RPC:      tcp_data_ready socket info not found!\n");
+	dprintk("RPC:      xs_tcp_data_ready...\n");
+	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
-	}
 	if (xprt->shutdown)
 		goto out;
 
-	/* We use rd_desc to pass struct xprt to tcp_data_recv */
+	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
 	rd_desc.arg.data = xprt;
 	rd_desc.count = 65536;
-	tcp_read_sock(sk, &rd_desc, tcp_data_recv);
+	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
 out:
 	read_unlock(&sk->sk_callback_lock);
 }
 
-static void
-tcp_state_change(struct sock *sk)
+/**
+ * xs_tcp_state_change - callback to handle TCP socket state changes
+ * @sk: socket whose state has changed
+ *
+ */
+static void xs_tcp_state_change(struct sock *sk)
 {
-	struct rpc_xprt	*xprt;
+	struct rpc_xprt *xprt;
 
 	read_lock(&sk->sk_callback_lock);
 	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
-	dprintk("RPC:      tcp_state_change client %p...\n", xprt);
+	dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
 	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
 				sk->sk_state, xprt_connected(xprt),
 				sock_flag(sk, SOCK_DEAD),
@@ -703,17 +705,20 @@
 	read_unlock(&sk->sk_callback_lock);
 }
 
-/*
+/**
+ * xs_write_space - callback invoked when socket buffer space becomes
+ *                         available
+ * @sk: socket whose state has changed
+ *
  * Called when more output buffer space is available for this socket.
  * We try not to wake our writers until they can make "significant"
  * progress, otherwise we'll waste resources thrashing sock_sendmsg
  * with a bunch of small requests.
  */
-static void
-xprt_write_space(struct sock *sk)
+static void xs_write_space(struct sock *sk)
 {
-	struct rpc_xprt	*xprt;
-	struct socket	*sock;
+	struct rpc_xprt *xprt;
+	struct socket *sock;
 
 	read_lock(&sk->sk_callback_lock);
 	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
@@ -743,11 +748,15 @@
 	read_unlock(&sk->sk_callback_lock);
 }
 
-/*
- * Set socket buffer length
+/**
+ * xs_set_buffer_size - set send and receive limits
+ * @xprt: generic transport
+ *
+ * Set socket send and receive limits based on the
+ * sndsize and rcvsize fields in the generic transport
+ * structure. This applies only to UDP sockets.
  */
-static void
-xprt_sock_setbufsize(struct rpc_xprt *xprt)
+static void xs_set_buffer_size(struct rpc_xprt *xprt)
 {
 	struct sock *sk = xprt->inet;
 
@@ -764,15 +773,12 @@
 	}
 }
 
-/*
- * Bind to a reserved port
- */
-static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
+static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
 {
 	struct sockaddr_in myaddr = {
 		.sin_family = AF_INET,
 	};
-	int		err, port;
+	int err, port;
 
 	/* Were we already bound to a given port? Try to reuse it */
 	port = xprt->port;
@@ -782,20 +788,47 @@
 						sizeof(myaddr));
 		if (err == 0) {
 			xprt->port = port;
+			dprintk("RPC:      xs_bindresvport bound to port %u\n",
+					port);
 			return 0;
 		}
 		if (--port == 0)
-			port = XPRT_MAX_RESVPORT;
+			port = XS_MAX_RESVPORT;
 	} while (err == -EADDRINUSE && port != xprt->port);
 
-	printk("RPC: Can't bind to reserved port (%d).\n", -err);
+	dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
 	return err;
 }
 
-static void
-xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
+static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
 {
-	struct sock	*sk = sock->sk;
+	struct socket *sock;
+	int type, err;
+
+	dprintk("RPC:      xs_create(%s %d)\n",
+			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
+
+	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
+	if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
+		dprintk("RPC:      can't create socket (%d).\n", -err);
+		return NULL;
+	}
+
+	/* If the caller has the capability, bind to a reserved port */
+	if (resvport && xs_bindresvport(xprt, sock) < 0)
+		goto failed;
+
+	return sock;
+
+failed:
+	sock_release(sock);
+	return NULL;
+}
+
+static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
+{
+	struct sock *sk = sock->sk;
 
 	if (xprt->inet)
 		return;
@@ -806,16 +839,16 @@
 	xprt->old_state_change = sk->sk_state_change;
 	xprt->old_write_space = sk->sk_write_space;
 	if (xprt->prot == IPPROTO_UDP) {
-		sk->sk_data_ready = udp_data_ready;
+		sk->sk_data_ready = xs_udp_data_ready;
 		sk->sk_no_check = UDP_CSUM_NORCV;
 		xprt_set_connected(xprt);
 	} else {
 		tcp_sk(sk)->nonagle = 1;	/* disable Nagle's algorithm */
-		sk->sk_data_ready = tcp_data_ready;
-		sk->sk_state_change = tcp_state_change;
+		sk->sk_data_ready = xs_tcp_data_ready;
+		sk->sk_state_change = xs_tcp_state_change;
 		xprt_clear_connected(xprt);
 	}
-	sk->sk_write_space = xprt_write_space;
+	sk->sk_write_space = xs_write_space;
 
 	/* Reset to new socket */
 	xprt->sock = sock;
@@ -825,39 +858,13 @@
 	return;
 }
 
-/*
- * Datastream sockets are created here, but xprt_connect will create
- * and connect stream sockets.
+/**
+ * xs_connect_worker - try to connect a socket to a remote endpoint
+ * @args: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
  */
-static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport)
-{
-	struct socket	*sock;
-	int		type, err;
-
-	dprintk("RPC:      xprt_create_socket(%s %d)\n",
-			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
-
-	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
-
-	if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
-		printk("RPC: can't create socket (%d).\n", -err);
-		return NULL;
-	}
-
-	/* If the caller has the capability, bind to a reserved port */
-	if (resvport && xprt_bindresvport(xprt, sock) < 0) {
-		printk("RPC: can't bind to reserved port.\n");
-		goto failed;
-	}
-
-	return sock;
-
-failed:
-	sock_release(sock);
-	return NULL;
-}
-
-static void xprt_socket_connect(void *args)
+static void xs_connect_worker(void *args)
 {
 	struct rpc_xprt *xprt = (struct rpc_xprt *)args;
 	struct socket *sock = xprt->sock;
@@ -866,18 +873,20 @@
 	if (xprt->shutdown || xprt->addr.sin_port == 0)
 		goto out;
 
+	dprintk("RPC:      xs_connect_worker xprt %p\n", xprt);
+
 	/*
 	 * Start by resetting any existing state
 	 */
-	xprt_close(xprt);
-	sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport);
+	xs_close(xprt);
+	sock = xs_create(xprt, xprt->prot, xprt->resvport);
 	if (sock == NULL) {
 		/* couldn't create socket or bind to reserved port;
 		 * this is likely a permanent error, so cause an abort */
 		goto out;
 	}
-	xprt_bind_socket(xprt, sock);
-	xprt_sock_setbufsize(xprt);
+	xs_bind(xprt, sock);
+	xs_set_buffer_size(xprt);
 
 	status = 0;
 	if (!xprt->stream)
@@ -908,20 +917,23 @@
 	smp_mb__after_clear_bit();
 }
 
-static void
-xprt_connect_sock(struct rpc_task *task)
+/**
+ * xs_connect - connect a socket to a remote endpoint
+ * @task: address of RPC task that manages state of connect request
+ *
+ * TCP: If the remote end dropped the connection, delay reconnecting.
+ */
+static void xs_connect(struct rpc_task *task)
 {
 	struct rpc_xprt *xprt = task->tk_xprt;
 
 	if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
-		/* Note: if we are here due to a dropped connection
-		 * 	 we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
-		 * 	 seconds
-		 */
-		if (xprt->sock != NULL)
+		if (xprt->sock != NULL) {
+			dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
 			schedule_delayed_work(&xprt->sock_connect,
 					RPC_REESTABLISH_TIMEOUT);
-		else {
+		} else {
+			dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
 			schedule_work(&xprt->sock_connect);
 			/* flush_scheduled_work can sleep... */
 			if (!RPC_IS_ASYNC(task))
@@ -930,29 +942,23 @@
 	}
 }
 
-/*
- * Set default timeout parameters
- */
-static void
-xprt_default_timeout(struct rpc_timeout *to, int proto)
-{
-	if (proto == IPPROTO_UDP)
-		xprt_set_timeout(to, 5,  5 * HZ);
-	else
-		xprt_set_timeout(to, 2, 60 * HZ);
-}
-
-static struct rpc_xprt_ops xprt_socket_ops = {
-	.set_buffer_size	= xprt_sock_setbufsize,
-	.connect		= xprt_connect_sock,
-	.send_request		= xprt_send_request,
-	.close			= xprt_close,
-	.destroy		= xprt_socket_destroy,
+static struct rpc_xprt_ops xs_ops = {
+	.set_buffer_size	= xs_set_buffer_size,
+	.connect		= xs_connect,
+	.send_request		= xs_send_request,
+	.close			= xs_close,
+	.destroy		= xs_destroy,
 };
 
 extern unsigned int xprt_udp_slot_table_entries;
 extern unsigned int xprt_tcp_slot_table_entries;
 
+/**
+ * xs_setup_udp - Set up transport to use a UDP socket
+ * @xprt: transport to set up
+ * @to:   timeout parameters
+ *
+ */
 int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
 	size_t slot_table_size;
@@ -967,7 +973,7 @@
 	memset(xprt->slot, 0, slot_table_size);
 
 	xprt->prot = IPPROTO_UDP;
-	xprt->port = XPRT_MAX_RESVPORT;
+	xprt->port = XS_MAX_RESVPORT;
 	xprt->stream = 0;
 	xprt->nocong = 0;
 	xprt->cwnd = RPC_INITCWND;
@@ -975,18 +981,24 @@
 	/* XXX: header size can vary due to auth type, IPv6, etc. */
 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
-	INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
+	INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
 
-	xprt->ops = &xprt_socket_ops;
+	xprt->ops = &xs_ops;
 
 	if (to)
 		xprt->timeout = *to;
 	else
-		xprt_default_timeout(to, xprt->prot);
+		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
 
 	return 0;
 }
 
+/**
+ * xs_setup_tcp - Set up transport to use a TCP socket
+ * @xprt: transport to set up
+ * @to: timeout parameters
+ *
+ */
 int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
 	size_t slot_table_size;
@@ -1001,21 +1013,21 @@
 	memset(xprt->slot, 0, slot_table_size);
 
 	xprt->prot = IPPROTO_TCP;
-	xprt->port = XPRT_MAX_RESVPORT;
+	xprt->port = XS_MAX_RESVPORT;
 	xprt->stream = 1;
 	xprt->nocong = 1;
 	xprt->cwnd = RPC_MAXCWND(xprt);
 	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
 	xprt->max_payload = (1U << 31) - 1;
 
-	INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
+	INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
 
-	xprt->ops = &xprt_socket_ops;
+	xprt->ops = &xs_ops;
 
 	if (to)
 		xprt->timeout = *to;
 	else
-		xprt_default_timeout(to, xprt->prot);
+		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
 
 	return 0;
 }