首页 > 代码库 > Netlink 内核实现分析(二):通信

Netlink 内核实现分析(二):通信

在前一篇博文《Netlink 内核实现分析(一):创建》中已经较为具体的分析了Linux内核netlink子系统的初始化流程、内核netlink套接字的创建、应用层netlink套接字的创建和绑定流程,本文来具体的分析一下内核是怎样实现netlink消息在内核和应用进程之间全双工异步通信的。

一、netlink通信数据结构

1、netlink消息报头:struct nlmsghdr

struct nlmsghdr {
	__u32		nlmsg_len;	/* Length of message including header */
	__u16		nlmsg_type;	/* Message content */
	__u16		nlmsg_flags;	/* Additional flags */
	__u32		nlmsg_seq;	/* Sequence number */
	__u32		nlmsg_pid;	/* Sending process port ID */
};
netlink消息同TCP/UDP消息一样,也须要遵循协议要求的格式,每一个netlink消息的开头是固定长度的netlink报头。报头后才是实际的载荷。netlink报头一共占16个字节,详细内容即同struct nlmsghdr中定义的一样。
(1)nlmsg_len:整个netlink消息的长度(包括消息头);

(2)nlmsg_type:消息状态。内核在include/uapi/linux/netlink.h中定义了下面4种通用的消息类型,它们各自是:

NLMSG_NOOP:不运行不论什么动作,必须将该消息丢弃;

NLMSG_ERROR:消息错误发生;

NLMSG_DONE:标识分组消息的末尾;

NLMSG_OVERRUN:缓冲区溢出。表示某些消息已经丢失。

除了这4种类型的消息以外,不同的netlink协议也能够自行加入自己所特有的消息类型。可是内核定义了类型保留宏(#define NLMSG_MIN_TYPE 0x10)。即小于该值的消息类型值由内核保留,不可用。

(3)nlmsg_flags:消息标记,它们用以表示消息的类型,相同定义在include/uapi/linux/netlink.h中;

#define NLM_F_REQUEST		1	/* It is request message. 	*/
#define NLM_F_MULTI		2	/* Multipart message, terminated by NLMSG_DONE */
#define NLM_F_ACK		4	/* Reply with ack, with zero or error code */
#define NLM_F_ECHO		8	/* Echo this request 		*/
#define NLM_F_DUMP_INTR		16	/* Dump was inconsistent due to sequence change */

/* Modifiers to GET request */
#define NLM_F_ROOT	0x100	/* specify tree	root	*/
#define NLM_F_MATCH	0x200	/* return all matching	*/
#define NLM_F_ATOMIC	0x400	/* atomic GET		*/
#define NLM_F_DUMP	(NLM_F_ROOT|NLM_F_MATCH)

/* Modifiers to NEW request */
#define NLM_F_REPLACE	0x100	/* Override existing		*/
#define NLM_F_EXCL	0x200	/* Do not touch, if it exists	*/
#define NLM_F_CREATE	0x400	/* Create, if it does not exist	*/
#define NLM_F_APPEND	0x800	/* Add to end of list		*/
(4)nlmsg_seq:消息序列号。用以将消息排队。有些类似TCP协议中的序号(不全然一样)。可是netlink的这个字段是可选的,不强制使用;
(5)nlmsg_pid:发送port的ID号,对于内核来说该值就是0,对于用户进程来说就是其socket所绑定的ID号。

2、socket消息数据包结构:struct msghdr

struct user_msghdr {
	void		__user *msg_name;	/* ptr to socket address structure */
	int		msg_namelen;		/* size of socket address structure */
	struct iovec	__user *msg_iov;	/* scatter/gather array */
	__kernel_size_t	msg_iovlen;		/* # elements in msg_iov */
	void		__user *msg_control;	/* ancillary data */
	__kernel_size_t	msg_controllen;		/* ancillary data buffer length */
	unsigned int	msg_flags;		/* flags on received message */
};
应用层向内核传递消息能够使用sendto()或sendmsg()函数。当中sendmsg函数须要应用程序手动封装msghdr消息结构,而sendto()函数则会由内核代为分配。当中
(1)msg_name:指向数据包的目的地址;
(2)msg_namelen:目的地址数据结构的长度;
(3)msg_iov:消息包的实际数据块,定义例如以下:
struct iovec
{
	void *iov_base;	/* BSD uses caddr_t (1003.1g requires void *) */
	__kernel_size_t iov_len; /* Must be size_t (1003.1g) */
};
iov_base:消息包实际载荷的首地址;
iov_len:消息实际载荷的长度。
(4)msg_control:消息的辅助数据;
(5)msg_controllen:消息辅助数据的大小;
(6)msg_flags:接收消息的标识。


对于该结构,我们更须要关注的是前三个变量參数。对于netlink数据包来说当中msg_name指向的就是目的sockaddr_nl地址结构实例的首地址,iov_base指向的就是消息实体中的nlmsghdr消息头的地址,而iov_len赋值为nlmsghdr中的nlmsg_len就可以(消息头+实际数据)。

3、netlink消息处理宏

#define NLMSG_ALIGNTO	4U
#define NLMSG_ALIGN(len) ( ((len)+NLMSG_ALIGNTO-1) & ~(NLMSG_ALIGNTO-1) )			/* 对len运行4字节对齐 */
#define NLMSG_HDRLEN	 ((int) NLMSG_ALIGN(sizeof(struct nlmsghdr)))				/* netlink消息头长度 */
#define NLMSG_LENGTH(len) ((len) + NLMSG_HDRLEN)									/* netlink消息载荷len加上消息头 */
#define NLMSG_SPACE(len) NLMSG_ALIGN(NLMSG_LENGTH(len))								/* 对netlink消息全长运行字节对齐 */
#define NLMSG_DATA(nlh)  ((void*)(((char*)nlh) + NLMSG_LENGTH(0)))					/* 获取netlink消息实际载荷位置 */
#define NLMSG_NEXT(nlh,len)	 ((len) -= NLMSG_ALIGN((nlh)->nlmsg_len), 				  (struct nlmsghdr*)(((char*)(nlh)) + NLMSG_ALIGN((nlh)->nlmsg_len)))/* 取得下一个消息的首地址。同一时候len也降低为剩余消息的总长度 */
#define NLMSG_OK(nlh,len) ((len) >= (int)sizeof(struct nlmsghdr) && 			   (nlh)->nlmsg_len >= sizeof(struct nlmsghdr) && 			   (nlh)->nlmsg_len <= (len))											/* 验证消息的长度 */
#define NLMSG_PAYLOAD(nlh,len) ((nlh)->nlmsg_len - NLMSG_SPACE((len)))				/* 返回PAYLOAD的长度 */
Linux为了处理netlink消息方便,在 include/uapi/linux/netlink.h中定义了以上消息处理宏,用于各种场合。

对于Netlink消息来说,处理例如以下格式(见netlink.h):

/* ========================================================================
 *         Netlink Messages and Attributes Interface (As Seen On TV)
 * ------------------------------------------------------------------------
 *                          Messages Interface
 * ------------------------------------------------------------------------
 *
 * Message Format:
 *    <--- nlmsg_total_size(payload)  --->
 *    <-- nlmsg_msg_size(payload) ->
 *   +----------+- - -+-------------+- - -+-------- - -
 *   | nlmsghdr | Pad |   Payload   | Pad | nlmsghdr
 *   +----------+- - -+-------------+- - -+-------- - -
 *   nlmsg_data(nlh)---^                   ^
 *   nlmsg_next(nlh)-----------------------+
 *
 * Payload Format:
 *    <---------------------- nlmsg_len(nlh) --------------------->
 *    <------ hdrlen ------>       <- nlmsg_attrlen(nlh, hdrlen) ->
 *   +----------------------+- - -+--------------------------------+
 *   |     Family Header    | Pad |           Attributes           |
 *   +----------------------+- - -+--------------------------------+
 *   nlmsg_attrdata(nlh, hdrlen)---^
 *
 * ------------------------------------------------------------------------
 *                          Attributes Interface
 * ------------------------------------------------------------------------
 *
 * Attribute Format:
 *    <------- nla_total_size(payload) ------->
 *    <---- nla_attr_size(payload) ----->
 *   +----------+- - -+- - - - - - - - - +- - -+-------- - -
 *   |  Header  | Pad |     Payload      | Pad |  Header
 *   +----------+- - -+- - - - - - - - - +- - -+-------- - -
 *                     <- nla_len(nla) ->      ^
 *   nla_data(nla)----^                        |
 *   nla_next(nla)-----------------------------'
 *
 *=========================================================================
 */

二、应用层向内核发送netlink消息

使用例如以下演示样例程序可向内核netlink套接字发送消息:
#define TEST_DATA_LEN	16
#DEFINE TEST_DATA		"netlink send test"		/* 仅作为演示样例,内核NETLINK_ROUTE套接字无法解析 */

struct sockaddr_nl nladdr;
struct msghdr msg;
struct nlmsghdr *nlhdr;
struct iovec iov;

/* 填充目的地址结构 */
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
nladdr.nl_pid = 0;						/* 地址为内核 */
nladdr.nl_groups = 0;						/* 单播 */

/* 填充netlink消息头 */
nlhdr = (struct nlmsghdr *)malloc(NLMSG_SPACE(TEST_DATA_LEN));

nlhdr->nlmsg_len = NLMSG_LENGTH(TEST_DATA_LEN);
nlhdr->nlmsg_flags = NLM_F_REQUEST;
nlhdr->nlmsg_pid = get_pid();					/* 当前套接字所绑定的ID号(此处为本进程的PID) */
nlhdr->nlmsg_seq = 0;

/* 填充netlink消息实际载荷 */
strcpy(NLMSG_DATA(nlhdr), TEST_DATA);
iov.iov_base = (void *)nlhdr;
iov.iov_len = nlhdr->nlmsg_len;
	
/* 填充数据消息结构 */
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)&(nladdr);
msg.msg_namelen = sizeof(nladdr);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

/* 发送netlink消息 */
sendmsg (sock, &msg, 0); /* sock描写叙述符见《Netlink 内核实现分析(一):创建》。为NETLINK_ROUTE类型套接字 */
这里列出了一个调用sendmsg向内核发送消息的演示样例代码片段(仅作为演示样例,发送的消息内核netlink套接字可能无法解析)。首先初始化目的地址数据结构,设置nl_pid和nl_groups为0指定消息的目的地址为内核;然后初始化netlink消息头指明消息的长度为TEST_DATA_LEN + NLMSG_ALIGN(sizeof(struct nlmsghdr))(包括消息头),发送端的ID号为发送socket消息所绑定的ID号(这样内核才知道消息是谁发送的);然后设置消息的实际载荷,将数据复制到紧接消息头后的实际载荷部分;最后组装成msg消息就能够调用sendmsg向内核发送了。


以下尾随内核的sendmsg系统调用的整个流程来分析消息是怎样被送到内核的(须要注意的是,在不使用NETLINK_MMAP技术的情况下。整个发送的过程中存在1~2次数据的内存拷贝动作。后面会逐一点出。):

技术分享
图1 用户态netlink数据发送流程
SYSCALL_DEFINE3(sendmsg, int, fd, struct user_msghdr __user *, msg, unsigned int, flags)
{
	if (flags & MSG_CMSG_COMPAT)
		return -EINVAL;
	return __sys_sendmsg(fd, msg, flags);
}
long __sys_sendmsg(int fd, struct user_msghdr __user *msg, unsigned flags)
{
	int fput_needed, err;
	struct msghdr msg_sys;
	struct socket *sock;

	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;

	err = ___sys_sendmsg(sock, msg, &msg_sys, flags, NULL);

	fput_light(sock->file, fput_needed);
out:
	return err;
}
sendmsg系统调用调用__sys_sendmsg来进行实际的操作。这里首先通过fd描写叙述符找到相应的socket套接字结构实例,然后调用___sys_sendmsg()函数,传入的參数中第三个和最后一个须要关注一下,当中第三个它是一个内核版的socket消息数据包结构,同应用层的略有不同,定义例如以下:
struct msghdr {
	void		*msg_name;	/* ptr to socket address structure */
	int		msg_namelen;	/* size of socket address structure */
	struct iov_iter	msg_iter;	/* data */
	void		*msg_control;	/* ancillary data */
	__kernel_size_t	msg_controllen;	/* ancillary data buffer length */
	unsigned int	msg_flags;	/* flags on received message */
	struct kiocb	*msg_iocb;	/* ptr to iocb for async requests */
};
当中msg_name、msg_namelen、msg_control、msg_controllen和msg_flags字段同应用层的含义是一样的。msg_iter为msg_iov和msg_iovlen的合体,最后msg_iocb用于异步请求。最后一个參数是一个struct used_address结构体指针,这个结构体定义例如以下:
struct used_address {
	struct sockaddr_storage name;
	unsigned int name_len;
};
这里的name字段用来存储消息的地址,name_len字段是消息地址的长度,它们同struct msghdr结构体的前两个字段一致。该结构体主要用与sendmmsg系统调用(用于同事时向一个socket地址发送多个数据包,能够避免反复的网络security检查。从而提高发送效率)保存多个数据包的目的地址。

如今这里设置为NULL,表示不使用。继续往下分析,进入___sys_sendmsg()函数内部,这个函数比較长,分段来分析:

static int ___sys_sendmsg(struct socket *sock, struct user_msghdr __user *msg,
			 struct msghdr *msg_sys, unsigned int flags,
			 struct used_address *used_address)
{
	struct compat_msghdr __user *msg_compat =
	    (struct compat_msghdr __user *)msg;
	struct sockaddr_storage address;
	struct iovec iovstack[UIO_FASTIOV], *iov = iovstack;
	unsigned char ctl[sizeof(struct cmsghdr) + 20]
	    __attribute__ ((aligned(sizeof(__kernel_size_t))));
	/* 20 is size of ipv6_pktinfo */
	unsigned char *ctl_buf = ctl;
	int ctl_len;
	ssize_t err;

	msg_sys->msg_name = &address;

	if (MSG_CMSG_COMPAT & flags)
		err = get_compat_msghdr(msg_sys, msg_compat, NULL, &iov);
	else
		err = copy_msghdr_from_user(msg_sys, msg, NULL, &iov);
	if (err < 0)
		return err;
这里的iovstack数组是用来加速用户数据拷贝的(这里假定用户数据的iovec个数通常不会超过UIO_FASTIOV个。假设超过会通过kmalloc分配内存)。首先这里推断flag中是否设置了32bit修正标识。从前文中系统调用的入口处已经能够看出了。这里显然不会设置该标识位,所以这里调用copy_msghdr_from_user函数将用户空间传入的消息(struct user_msghdr __user *msg)安全的复制到内核空间中(struct msghdr *msg_sys),来简单的看一下这个函数:
static int copy_msghdr_from_user(struct msghdr *kmsg,
				 struct user_msghdr __user *umsg,
				 struct sockaddr __user **save_addr,
				 struct iovec **iov)
{
	struct sockaddr __user *uaddr;
	struct iovec __user *uiov;
	size_t nr_segs;
	ssize_t err;

	if (!access_ok(VERIFY_READ, umsg, sizeof(*umsg)) ||
	    __get_user(uaddr, &umsg->msg_name) ||
	    __get_user(kmsg->msg_namelen, &umsg->msg_namelen) ||
	    __get_user(uiov, &umsg->msg_iov) ||
	    __get_user(nr_segs, &umsg->msg_iovlen) ||
	    __get_user(kmsg->msg_control, &umsg->msg_control) ||
	    __get_user(kmsg->msg_controllen, &umsg->msg_controllen) ||
	    __get_user(kmsg->msg_flags, &umsg->msg_flags))
		return -EFAULT;

	if (!uaddr)
		kmsg->msg_namelen = 0;

	if (kmsg->msg_namelen < 0)
		return -EINVAL;

	if (kmsg->msg_namelen > sizeof(struct sockaddr_storage))
		kmsg->msg_namelen = sizeof(struct sockaddr_storage);

	if (save_addr)
		*save_addr = uaddr;

	if (uaddr && kmsg->msg_namelen) {
		if (!save_addr) {
			err = move_addr_to_kernel(uaddr, kmsg->msg_namelen,
						  kmsg->msg_name);
			if (err < 0)
				return err;
		}
	} else {
		kmsg->msg_name = NULL;
		kmsg->msg_namelen = 0;
	}

	if (nr_segs > UIO_MAXIOV)
		return -EMSGSIZE;

	kmsg->msg_iocb = NULL;

	return import_iovec(save_addr ?

READ : WRITE, uiov, nr_segs, UIO_FASTIOV, iov, &kmsg->msg_iter); }

函数首先调用access_ok检查用户数据的有效性,然后调用__get_user函数运行单数据的复制操作(并没有复制数据包内容),接着做一些简单的入參推断。

然后假设用户消息中存在目的地址且入參save_addr为空(当前情景中正好就是这类情况)。就调用move_addr_to_kernel()函数将消息地址复制到内核kmsg的结构中。否则将kmsg中的目的地址和长度字段置位空。接下来推断消息实际载荷iovec结构的个数,这里UIO_MAXIOV值定义为1024,也就是说消息数据iovec结构的最大个数不能超过这个值,这点很重要。
最后调用import_iovec()函数開始运行实际数据从用户态向内核态的拷贝动作(注意这里并没有拷贝用户空间实际消息载荷数据,只检查了用户地址有效性并拷贝了长度等字段)。在拷贝完毕后,&kmsg->msg_iter中的数据初始化情况例如以下:

int type:WRITE;
size_t iov_offset:初始化为0;
size_t count:全部iovec结构数据的总长度(即iov->iov_len的总和);
const struct iovec *iov:首个iov结构指针;
unsigned long nr_segs:iovec结构的个数。

数据拷贝完毕后,再回到___sys_sendmsg函数中继续分析:
	err = -ENOBUFS;

	if (msg_sys->msg_controllen > INT_MAX)
		goto out_freeiov;
	ctl_len = msg_sys->msg_controllen;
	if ((MSG_CMSG_COMPAT & flags) && ctl_len) {
		err =
		    cmsghdr_from_user_compat_to_kern(msg_sys, sock->sk, ctl,
						     sizeof(ctl));
		if (err)
			goto out_freeiov;
		ctl_buf = msg_sys->msg_control;
		ctl_len = msg_sys->msg_controllen;
	} else if (ctl_len) {
		if (ctl_len > sizeof(ctl)) {
			ctl_buf = sock_kmalloc(sock->sk, ctl_len, GFP_KERNEL);
			if (ctl_buf == NULL)
				goto out_freeiov;
		}
		err = -EFAULT;
		/*
		 * Careful! Before this, msg_sys->msg_control contains a user pointer.
		 * Afterwards, it will be a kernel pointer. Thus the compiler-assisted
		 * checking falls down on this.
		 */
		if (copy_from_user(ctl_buf,
				   (void __user __force *)msg_sys->msg_control,
				   ctl_len))
			goto out_freectl;
		msg_sys->msg_control = ctl_buf;
	}
这一段程序是用来拷贝消息辅助数据的,比較直观,我们前文中的演示样例程序并没有传递辅助数据。所以这里不具体分析。继续往下看:
	msg_sys->msg_flags = flags;

	if (sock->file->f_flags & O_NONBLOCK)
		msg_sys->msg_flags |= MSG_DONTWAIT;
	/*
	 * If this is sendmmsg() and current destination address is same as
	 * previously succeeded address, omit asking LSM's decision.
	 * used_address->name_len is initialized to UINT_MAX so that the first
	 * destination address never matches.
	 */
	if (used_address && msg_sys->msg_name &&
	    used_address->name_len == msg_sys->msg_namelen &&
	    !memcmp(&used_address->name, msg_sys->msg_name,
		    used_address->name_len)) {
		err = sock_sendmsg_nosec(sock, msg_sys);
		goto out_freectl;
	}
	err = sock_sendmsg(sock, msg_sys);
	/*
	 * If this is sendmmsg() and sending to current destination address was
	 * successful, remember it.
	 */
	if (used_address && err >= 0) {
		used_address->name_len = msg_sys->msg_namelen;
		if (msg_sys->msg_name)
			memcpy(&used_address->name, msg_sys->msg_name,
			       used_address->name_len);
	}
这里首先保存用户传递的flag标识。然后推断假设当前的socket已经被配置为非堵塞模式则置位MSG_DONTWAIT标识(定义在include/linux/socket.h中)。接下来通过传入的used_address指针推断当前发送消息的目的地址是否同它记录的一致,假设一致则调用sock_sendmsg_nosec()函数发送数据,否则调用sock_sendmsg()函数发送数据。sock_sendmsg()事实上终于也是通过调用sock_sendmsg_nosec()来发送数据的,它们的差别就在于是否调用安全检查函数,例如以下:
int sock_sendmsg(struct socket *sock, struct msghdr *msg)
{
	int err = security_socket_sendmsg(sock, msg,
					  msg_data_left(msg));

	return err ?

: sock_sendmsg_nosec(sock, msg); } EXPORT_SYMBOL(sock_sendmsg);

在sendmmsg系统调用每一次发送多个消息时,因为发送的目的地一般都是一致的,所以仅仅须要在发送第一个消息爆时运行检查就能够了,通过这样的策略就能够加速数据的发送。最后,在发送完数据后,假设传入的used_address指针非空,就会将本次成功发送数据的目的地址记录下来。供下次发送数据比較。

接下来进入sock_sendmsg_nosec(sock, msg_sys)内部继续分析消息的发送流程:
static inline int sock_sendmsg_nosec(struct socket *sock, struct msghdr *msg)
{
	int ret = sock->ops->sendmsg(sock, msg, msg_data_left(msg));
	BUG_ON(ret == -EIOCBQUEUED);
	return ret;
}
这里调用了socket所绑定协议特有的数据发送钩子函数,当中最后一个參数为msg->msg_iter->count。即消息实际载荷的总长度。在前一篇文章中已经看到了对于netlink类型的套接字来说该函数被注冊为netlink_sendmsg(),以下来分析这个函数,这个函数较长,分段分析:
static int netlink_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
	struct sock *sk = sock->sk;
	struct netlink_sock *nlk = nlk_sk(sk);
	DECLARE_SOCKADDR(struct sockaddr_nl *, addr, msg->msg_name);
	u32 dst_portid;
	u32 dst_group;
	struct sk_buff *skb;
	int err;
	struct scm_cookie scm;
	u32 netlink_skb_flags = 0;

	if (msg->msg_flags&MSG_OOB)
		return -EOPNOTSUPP;

	err = scm_send(sock, msg, &scm, true);
	if (err < 0)
		return err;
首先,这里定义了一个struct sockaddr_nl *addr指针,它指向了msg->msg_name表示消息的目的地址(会做地址长度检查)。然后调用scm_send()发送消息辅助数据(不分析)。
	if (msg->msg_namelen) {
		err = -EINVAL;
		if (addr->nl_family != AF_NETLINK)
			goto out;
		dst_portid = addr->nl_pid;
		dst_group = ffs(addr->nl_groups);
		err =  -EPERM;
		if ((dst_group || dst_portid) &&
		    !netlink_allowed(sock, NL_CFG_F_NONROOT_SEND))
			goto out;
		netlink_skb_flags |= NETLINK_SKB_DST;
	} else {
		dst_portid = nlk->dst_portid;
		dst_group = nlk->dst_group;
	}
这里假设用户指定了netlink消息的目的地址。则对其进行校验,然后推断当前netlink协议的NL_CFG_F_NONROOT_SEND标识是否设置。假设设置了改标识则同意非root用户发送组播,对于NETLINK_ROUTE类型的netlink套接字,并没有设置该标识。表明非root用户不能发送组播消息;然后设置NETLINK_SKB_DST标识。假设用户没有指定netlink消息的目的地址,则使用netlink套接字默认的(该值默觉得0,会在调用connect系统调用时在netlink_connect()中被赋值为用户设置的值)。注意这里dst_group经过ffs的处理后转化为组播地址位数(找到最低有效位)。
	if (!nlk->bound) {
		err = netlink_autobind(sock);
		if (err)
			goto out;
	} else {
		/* Ensure nlk is hashed and visible. */
		smp_rmb();
	}
接下来推断当前的netlink套接字是否被绑定过,假设没有绑定过这里调用netlink_autobind()进行动态绑定。该函数在前一篇文章中已经分析,继续往下分析
	/* It's a really convoluted way for userland to ask for mmaped
	 * sendmsg(), but that's what we've got...
	 */
	if (netlink_tx_is_mmaped(sk) &&
	    msg->msg_iter.type == ITER_IOVEC &&
	    msg->msg_iter.nr_segs == 1 &&
	    msg->msg_iter.iov->iov_base == NULL) {
		err = netlink_mmap_sendmsg(sk, msg, dst_portid, dst_group,
					   &scm);
		goto out;
	}
假设内核配置了CONFIG_NETLINK_MMAP内核选项。则表示内核空间和应用层的消息发送队列支持内存映射,然后通过调用netlink_mmap_sendmsg来发送netlink消息,该种方式将降低数据的内存数据的拷贝动作,降低发送时间和资源占用。

现我的环境中并不支持,继续往下分析:

	err = -EMSGSIZE;
	if (len > sk->sk_sndbuf - 32)
		goto out;
	err = -ENOBUFS;
	skb = netlink_alloc_large_skb(len, dst_group);
	if (skb == NULL)
		goto out;

接下来推断须要发送的数据是否过长(长于发送缓存大小),然后通过netlink_alloc_large_skb分配skb结构(传入的參数为消息载荷的长度以及组播地址)。
	NETLINK_CB(skb).portid	= nlk->portid;
	NETLINK_CB(skb).dst_group = dst_group;
	NETLINK_CB(skb).creds	= scm.creds;
	NETLINK_CB(skb).flags	= netlink_skb_flags;

	err = -EFAULT;
	if (memcpy_from_msg(skb_put(skb, len), msg, len)) {
		kfree_skb(skb);
		goto out;
	}

	err = security_netlink_send(sk, skb);
	if (err) {
		kfree_skb(skb);
		goto out;
	}

	if (dst_group) {
		atomic_inc(&skb->users);
		netlink_broadcast(sk, skb, dst_portid, dst_group, GFP_KERNEL);
	}
	err = netlink_unicast(sk, skb, dst_portid, msg->msg_flags&MSG_DONTWAIT);
在成功创建skb结构之后,这里就開始初始化它。这里使用到了skb中的扩展cb字段(char cb[48] __aligned(8),一共48个字节用于存放netlink的地址和标识相关的附加信息足够了),同一时候使用宏NETLINK_CB来操作这些字段。netlink将skb的cb字段强制定义为struct netlink_skb_parms结构:
struct netlink_skb_parms {
	struct scm_creds	creds;		/* Skb credentials	*/
	__u32			portid;
	__u32			dst_group;
	__u32			flags;
	struct sock		*sk;
};
当中portid表示原端套接字所绑定的id,dst_group表示消息目的组播地址。flag为标识,sk指向原端套接字的sock结构。


这里首先将套接字绑定的portid赋值到skb得cb字段中、同一时候设置组播地址的数量以及netlink_skb标识(这里是已经置位NETLINK_SKB_DST)。接下来调用最关键的调用memcpy_from_msg拷贝数据。它首先调用skb_put调整skb->tail指针。然后运行copy_from_iter(data, len, &msg->msg_iter)将数据从msg->msg_iter中传输到skb->data中(这是第一次内存拷贝动作。将用户空间数据直接复制到内核skb中)。

接下来调用security_netlink_send()运行security检查,最后假设是组播发送则调用netlink_broadcast()发送消息,否则调用netlink_unicast()发送单播消息。我们先尾随当前的情景来分析单播发送函数netlink_unicast():
int netlink_unicast(struct sock *ssk, struct sk_buff *skb,
		    u32 portid, int nonblock)
{
	struct sock *sk;
	int err;
	long timeo;

	skb = netlink_trim(skb, gfp_any());

	timeo = sock_sndtimeo(ssk, nonblock);
retry:
	sk = netlink_getsockbyportid(ssk, portid);
	if (IS_ERR(sk)) {
		kfree_skb(skb);
		return PTR_ERR(sk);
	}
	if (netlink_is_kernel(sk))
		return netlink_unicast_kernel(sk, skb, ssk);

	if (sk_filter(sk, skb)) {
		err = skb->len;
		kfree_skb(skb);
		sock_put(sk);
		return err;
	}

	err = netlink_attachskb(sk, skb, &timeo, ssk);
	if (err == 1)
		goto retry;
	if (err)
		return err;

	return netlink_sendskb(sk, skb);
}
这里首先调用netlink_trim()又一次裁剪skb的数据区的大小,这可能会clone出一个新的skb结构同一时候又一次分配skb->data的内存空间(这就出现了第三次的内存拷贝动作!)。当然假设原本skb中多余的内存数据区很小或者该内存空间是在vmalloc空间中的就不会运行上述操作。我们如今尾随的情景上下文中就是后一种情况。并不会又一次分配空间。
接下来记下发送超时等待时间。假设已经设置了MSG_DONTWAIT标识,则等待时间为0。否则返回sk->sk_sndtimeo(该值在sock初始化时由sock_init_data()函数赋值为MAX_SCHEDULE_TIMEOUT)。
接下来依据目的portid号和原端sock结构查找目的端的sock结构:
static struct sock *netlink_getsockbyportid(struct sock *ssk, u32 portid)
{
	struct sock *sock;
	struct netlink_sock *nlk;

	sock = netlink_lookup(sock_net(ssk), ssk->sk_protocol, portid);
	if (!sock)
		return ERR_PTR(-ECONNREFUSED);

	/* Don't bother queuing skb if kernel socket has no input function */
	nlk = nlk_sk(sock);
	if (sock->sk_state == NETLINK_CONNECTED &&
	    nlk->dst_portid != nlk_sk(ssk)->portid) {
		sock_put(sock);
		return ERR_PTR(-ECONNREFUSED);
	}
	return sock;
}
这里首先调用netlink_lookup运行查找工作,查找的命名空间和协议号同原端sock,它会从nl_table[protocol]的哈希表中找到已经注冊的目的端sock套接字。找到以后运行校验。如若找到的socket已经connect了,则它的目的portid必须是原端的portid。
接下来推断目的的netlink socket是否是内核的netlink socket:
static inline int netlink_is_kernel(struct sock *sk)
{
	return nlk_sk(sk)->flags & NETLINK_KERNEL_SOCKET;
}
假设目的地址是内核空间,则调用netlink_unicast_kernel向内核进行单播,入參是目的sock、原端sock和数据skb。

否则继续向下运行。如今的情景中,我们尾随用户空间中发送的数据。进入netlink_unicast_kernel()中:

static int netlink_unicast_kernel(struct sock *sk, struct sk_buff *skb,
				  struct sock *ssk)
{
	int ret;
	struct netlink_sock *nlk = nlk_sk(sk);

	ret = -ECONNREFUSED;
	if (nlk->netlink_rcv != NULL) {
		ret = skb->len;
		netlink_skb_set_owner_r(skb, sk);
		NETLINK_CB(skb).sk = ssk;
		netlink_deliver_tap_kernel(sk, ssk, skb);
		nlk->netlink_rcv(skb);
		consume_skb(skb);
	} else {
		kfree_skb(skb);
	}
	sock_put(sk);
	return ret;
}
检查目标netlink套接字是否注冊了netlink_rcv()接收函数。假设没有则直接丢弃该数据包,否则继续发送流程,这里首先设置一些标识:
skb->sk = sk;     /* 将目的sock赋值给skb->sk指针 */
skb->destructor = netlink_skb_destructor;   /* 注冊destructor钩子函数 */
NETLINK_CB(skb).sk = ssk;   /* 将原端的sock保存早skb的cb扩展字段中 */
最后就调用了nlk->netlink_rcv(skb)函数将消息送到内核中的目的netlink套接字中了。

在前一篇文章中已经看到在内核注冊netlink套接字的时候已经将其接收函数注冊到了netlink_rcv中:

struct sock *
__netlink_kernel_create(struct net *net, int unit, struct module *module,
			struct netlink_kernel_cfg *cfg)
{
	......
	if (cfg && cfg->input)
		nlk_sk(sk)->netlink_rcv = cfg->input;
对于NETLINK_ROUTE类型的套接字来说就是rtnetlink_rcv了,netlink_rcv()钩子函数会接收并解析用户传下来的数据。不同类型的netlink协议各不同样,这里就不进行分析了。

至此应用层下发单播的netlink数据就下发完毕了。

如今我们回到netlink_sendmsg()函数中简单分析一下发送组播数据的流程是如何的:
static int netlink_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
	......

	if (dst_group) {
		atomic_inc(&skb->users);
		netlink_broadcast(sk, skb, dst_portid, dst_group, GFP_KERNEL);
	}
int netlink_broadcast(struct sock *ssk, struct sk_buff *skb, u32 portid,
		      u32 group, gfp_t allocation)
{
	return netlink_broadcast_filtered(ssk, skb, portid, group, allocation,
		NULL, NULL);
}
EXPORT_SYMBOL(netlink_broadcast);
这里间接调用netlink_broadcast_filtered()函数:
int netlink_broadcast_filtered(struct sock *ssk, struct sk_buff *skb, u32 portid,
	u32 group, gfp_t allocation,
	int (*filter)(struct sock *dsk, struct sk_buff *skb, void *data),
	void *filter_data)
{
	struct net *net = sock_net(ssk);
	struct netlink_broadcast_data info;
	struct sock *sk;

	skb = netlink_trim(skb, allocation);

	info.exclude_sk = ssk;
	info.net = net;
	info.portid = portid;
	info.group = group;
	info.failure = 0;
	info.delivery_failure = 0;
	info.congested = 0;
	info.delivered = 0;
	info.allocation = allocation;
	info.skb = skb;
	info.skb2 = NULL;
	info.tx_filter = filter;
	info.tx_data = http://www.mamicode.com/filter_data;>这里首先初始化netlink组播数据结构netlink_broadcast_data,当中info.group中保存了目的组播地址。然后从nl_table[ssk->sk_protocol].mc_list里边查找增加组播组的socket,并调用do_one_broadcast()函数依次发送组播数据:
static void do_one_broadcast(struct sock *sk,
				    struct netlink_broadcast_data *p)
{
	struct netlink_sock *nlk = nlk_sk(sk);
	int val;

	if (p->exclude_sk == sk)
		return;

	if (nlk->portid == p->portid || p->group - 1 >= nlk->ngroups ||
	    !test_bit(p->group - 1, nlk->groups))
		return;

	if (!net_eq(sock_net(sk), p->net))
		return;

	if (p->failure) {
		netlink_overrun(sk);
		return;
	}

	......
	} else if ((val = netlink_broadcast_deliver(sk, p->skb2)) < 0) {
		netlink_overrun(sk);
		if (nlk->flags & NETLINK_BROADCAST_SEND_ERROR)
			p->delivery_failure = 1;
	......
}
当然,在发送之前会做一些必要的检查,比如这里会确保原端sock和目的端sock不是同一个。它们属于同一个网络命名空间,目的的组播地址为发送的目的组播地址等等,然后会对skb和组播数据结构netlink_broadcast_data进行一些处理,最后调用
netlink_broadcast_deliver()函数对目的sock发送数据skb:
static int netlink_broadcast_deliver(struct sock *sk, struct sk_buff *skb)
{
	......
		__netlink_sendskb(sk, skb);
	......
}

static int __netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = skb->len;

	......
	skb_queue_tail(&sk->sk_receive_queue, skb);
	sk->sk_data_ready(sk);
	return len;
}
能够看到,这里将要发送的skb加入到目的sock的接收队列末尾,然后调用sk_data_ready()通知钩子函数,告知目的sock有数据到达,运行处理流程。

对于内核的netlink来说内核netlink的创建函数中已经将其注冊为

struct sock *
__netlink_kernel_create(struct net *net, int unit, struct module *module,
			struct netlink_kernel_cfg *cfg)
{
	......
	sk->sk_data_ready = netlink_data_ready;
	......
}
static void netlink_data_ready(struct sock *sk)
{
	BUG();
}
很明显了。内核netlink套接字是不管怎样也不应该接收到组播消息的。可是对于应用层netlink套接字,该sk_data_ready()钩子函数在初始化netlink函数sock_init_data()中被注冊为sock_def_readable(),这个函数后面再分析。
至此应用层下发的netlink数据流程就分析结束了。

三、内核向应用层发送netlink消息

技术分享
图2 内核发送netlink单播消息

内核能够通过nlmsg_unicast()函数向应用层发送单播消息,由各个netlink协议负责调用,也有的协议是直接调用netlink_unicast()函数,事实上nlmsg_unicast()也仅是netlink_unicast()的一个封装而已:
/**
 * nlmsg_unicast - unicast a netlink message
 * @sk: netlink socket to spread message to
 * @skb: netlink message as socket buffer
 * @portid: netlink portid of the destination socket
 */
static inline int nlmsg_unicast(struct sock *sk, struct sk_buff *skb, u32 portid)
{
	int err;

	err = netlink_unicast(sk, skb, portid, MSG_DONTWAIT);
	if (err > 0)
		err = 0;

	return err;
}
这里以非堵塞(MSG_DONTWAIT)的形式向应用层发送消息,这时的portid为应用层套接字所绑定的id号。我们再次进入到netlink_unicast()内部,这次因为目的sock不再是内核,所以要走不同的的分支了
int netlink_unicast(struct sock *ssk, struct sk_buff *skb,
		    u32 portid, int nonblock)
{
	struct sock *sk;
	int err;
	long timeo;

	skb = netlink_trim(skb, gfp_any());

	timeo = sock_sndtimeo(ssk, nonblock);
retry:
	sk = netlink_getsockbyportid(ssk, portid);
	if (IS_ERR(sk)) {
		kfree_skb(skb);
		return PTR_ERR(sk);
	}
	if (netlink_is_kernel(sk))
		return netlink_unicast_kernel(sk, skb, ssk);

	if (sk_filter(sk, skb)) {
		err = skb->len;
		kfree_skb(skb);
		sock_put(sk);
		return err;
	}

	err = netlink_attachskb(sk, skb, &timeo, ssk);
	if (err == 1)
		goto retry;
	if (err)
		return err;

	return netlink_sendskb(sk, skb);
}
EXPORT_SYMBOL(netlink_unicast);
这里首先sk_filter运行防火墙的过滤,确保能够发送以后调用netlink_attachskb将要发送的skb绑定到netlink sock上。
int netlink_attachskb(struct sock *sk, struct sk_buff *skb,
		      long *timeo, struct sock *ssk)
{
	struct netlink_sock *nlk;

	nlk = nlk_sk(sk);

	if ((atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||
	     test_bit(NETLINK_CONGESTED, &nlk->state)) &&
	    !netlink_skb_is_mmaped(skb)) {
		DECLARE_WAITQUEUE(wait, current);
		if (!*timeo) {
			if (!ssk || netlink_is_kernel(ssk))
				netlink_overrun(sk);
			sock_put(sk);
			kfree_skb(skb);
			return -EAGAIN;
		}

		__set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&nlk->wait, &wait);

		if ((atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||
		     test_bit(NETLINK_CONGESTED, &nlk->state)) &&
		    !sock_flag(sk, SOCK_DEAD))
			*timeo = schedule_timeout(*timeo);

		__set_current_state(TASK_RUNNING);
		remove_wait_queue(&nlk->wait, &wait);
		sock_put(sk);

		if (signal_pending(current)) {
			kfree_skb(skb);
			return sock_intr_errno(*timeo);
		}
		return 1;
	}
	netlink_skb_set_owner_r(skb, sk);
	return 0;
}
假设目的sock的接收缓冲区剩余的的缓存大小小于已经提交的数据量。或者标志位已经置位了堵塞标识NETLINK_CONGESTED,这表明数据不能够马上的送到目的端的接收缓存中。因此,在原端不是内核socket且没有设置非堵塞标识的情况下会定义一个等待队列并等待指定的时间并返回1,否则直接丢弃该skb数据包并返回失败。
若目的端的接收缓存区空间足够。就会调用netlink_skb_set_owner_r进行绑定。

回到netlink_unicast()函数中,能够看到若运行netlink_attachskb()的返回值为1。就会再次尝试发送操作。

最后调用netlink_sendskb()运行发送操作:

int netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = __netlink_sendskb(sk, skb);

	sock_put(sk);
	return len;
}

这里重新回到了__netlink_sendskb函数运行发送流程:
static int __netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = skb->len;

	netlink_deliver_tap(skb);

#ifdef CONFIG_NETLINK_MMAP
	if (netlink_skb_is_mmaped(skb))
		netlink_queue_mmaped_skb(sk, skb);
	else if (netlink_rx_is_mmaped(sk))
		netlink_ring_set_copied(sk, skb);
	else
#endif /* CONFIG_NETLINK_MMAP */
		skb_queue_tail(&sk->sk_receive_queue, skb);
	sk->sk_data_ready(sk);
	return len;
}
这里的sk_data_ready()钩子函数在初始化netlink函数sock_init_data()中被注冊为sock_def_readable()。进入分析一下:
static void sock_def_readable(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq))
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
						POLLRDNORM | POLLRDBAND);
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	rcu_read_unlock();
}

这里唤醒目的接收端socket的等待队列。这样应用层套接字就能够接收并处理消息了。
以上分析了内核发送单播消息,以下来看一下内核发送多播消息流程。该流程很easy,它的入口是nlmsg_multicast():
static inline int nlmsg_multicast(struct sock *sk, struct sk_buff *skb,
				  u32 portid, unsigned int group, gfp_t flags)
{
	int err;

	NETLINK_CB(skb).dst_group = group;

	err = netlink_broadcast(sk, skb, portid, group, flags);
	if (err > 0)
		err = 0;

	return err;
}
nlmsg_multicast及兴许的流程前文中都已分析过了。此处不再赘述。至此内核发送netlink消息已经完毕。下满来看一下应用层是怎样接收该消息的。

四、应用层接收内核netlink消息

使用例如以下演示样例程序能够以堵塞的方式接收内核发送的netlink消息:
#define TEST_DATA_LEN	16

struct sockaddr_nl nladdr;
struct msghdr msg;
struct nlmsghdr *nlhdr;
struct iovec iov;

/* 清空源地址结构 */
memset(&nladdr, 0, sizeof(nladdr));

/* 清空netlink消息头 */
nlhdr = (struct nlmsghdr *)malloc(NLMSG_SPACE(TEST_DATA_LEN));
memset(nlhdr, 0, NLMSG_SPACE(TEST_DATA_LEN));

/* 封装netlink消息 */
iov.iov_base = (void *)nlhdr;					/* 接收缓存地址 */
iov.iov_len = NLMSG_LENGTH(TEST_DATA_LEN);;		/* 接收缓存大小 */
	
/* 填充数据消息结构 */
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)&(nladdr);
msg.msg_namelen = sizeof(nladdr);				/* 地址长度由内核赋值 */
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

/* 接收netlink消息 */
recvmsg(sock_fd, &msg, 0);
本演示样例程序同前文中的发送程序类似,须要有接收端组装接收msg消息。

同发送流程的不同之处在于:

(1)msg.msg_name地址结构中存放的是消息源的地址信息,由内核负责填充。
(2)iov.iov_base为接收缓存的地址空间。其须要在接收前清空。
(3)iov.iov_len为单个iov接收缓存的长度,须要指明。
(4)msg.msg_namelen:为地址占用长度。有内核负责填充。
(5)msg.msg_iovlen:为接收iov空间的个数,须要指明。
这里用到了recvmsg系统调用,现进入该系统调用分析消息的整个接收的过程(须要注意的是,在不使用NETLINK_MMAP技术的情况下,整个接收的过程中存在1次数据的内存拷贝动作!

):

技术分享
图3 用户态netlink接收流程
SYSCALL_DEFINE3(recvmsg, int, fd, struct user_msghdr __user *, msg,
		unsigned int, flags)
{
	if (flags & MSG_CMSG_COMPAT)
		return -EINVAL;
	return __sys_recvmsg(fd, msg, flags);
}
long __sys_recvmsg(int fd, struct user_msghdr __user *msg, unsigned flags)
{
	int fput_needed, err;
	struct msghdr msg_sys;
	struct socket *sock;

	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;

	err = ___sys_recvmsg(sock, msg, &msg_sys, flags, 0);

	fput_light(sock->file, fput_needed);
out:
	return err;
}
同sendmsg系统调用类似,这里也相同首先通过fd描写叙述符查找相应的套接字socket结构,然后调用___sys_recvmsg()运行实际的工作,这个函数比較长,分段分析:
static int ___sys_recvmsg(struct socket *sock, struct user_msghdr __user *msg,
			 struct msghdr *msg_sys, unsigned int flags, int nosec)
{
	struct compat_msghdr __user *msg_compat =
	    (struct compat_msghdr __user *)msg;
	struct iovec iovstack[UIO_FASTIOV];
	struct iovec *iov = iovstack;
	unsigned long cmsg_ptr;
	int total_len, len;
	ssize_t err;

	/* kernel mode address */
	struct sockaddr_storage addr;

	/* user mode address pointers */
	struct sockaddr __user *uaddr;
	int __user *uaddr_len = COMPAT_NAMELEN(msg);

	msg_sys->msg_name = &addr;
同sendmsg类似,这里相同定义了一个大小为8的iovstack数组缓存。用来加速消息处理;随后获取用户空间的地址长度字段的地址。
	if (MSG_CMSG_COMPAT & flags)
		err = get_compat_msghdr(msg_sys, msg_compat, &uaddr, &iov);
	else
		err = copy_msghdr_from_user(msg_sys, msg, &uaddr, &iov);
	if (err < 0)
		return err;
	total_len = iov_iter_count(&msg_sys->msg_iter);

	cmsg_ptr = (unsigned long)msg_sys->msg_control;
	msg_sys->msg_flags = flags & (MSG_CMSG_CLOEXEC|MSG_CMSG_COMPAT);
这里接着调用copy_msghdr_from_user拷贝用户态msg中的数据到内核态msg_sys中。

当然这里主要是为了接收内核的消息,用户空间并没有什么实际的数据,这里最基本的作用就是确定用户须要接收多少数据量。

注意第三个參数已经不再是NULL了。而是指向了uaddr指针的地址。再次进入该函数具体分析一下:

static int copy_msghdr_from_user(struct msghdr *kmsg,
				 struct user_msghdr __user *umsg,
				 struct sockaddr __user **save_addr,
				 struct iovec **iov)
{
	struct sockaddr __user *uaddr;
	struct iovec __user *uiov;
	size_t nr_segs;
	ssize_t err;

	if (!access_ok(VERIFY_READ, umsg, sizeof(*umsg)) ||
<span style="color:#ff0000;">	    __get_user(uaddr, &umsg->msg_name) ||</span>
	    __get_user(kmsg->msg_namelen, &umsg->msg_namelen) ||
	    __get_user(uiov, &umsg->msg_iov) ||
	    __get_user(nr_segs, &umsg->msg_iovlen) ||
	    __get_user(kmsg->msg_control, &umsg->msg_control) ||
	    __get_user(kmsg->msg_controllen, &umsg->msg_controllen) ||
	    __get_user(kmsg->msg_flags, &umsg->msg_flags))
		return -EFAULT;

	if (!uaddr)
		kmsg->msg_namelen = 0;

	if (kmsg->msg_namelen < 0)
		return -EINVAL;

	if (kmsg->msg_namelen > sizeof(struct sockaddr_storage))
		kmsg->msg_namelen = sizeof(struct sockaddr_storage);

<span style="color:#ff0000;">	if (save_addr)
		*save_addr = uaddr;</span>

	if (uaddr && kmsg->msg_namelen) {
<span style="color:#ff0000;">		if (!save_addr) {
			err = move_addr_to_kernel(uaddr, kmsg->msg_namelen,
						  kmsg->msg_name);</span>
			if (err < 0)
				return err;
		}
	} else {
		kmsg->msg_name = NULL;
		kmsg->msg_namelen = 0;
	}

	if (nr_segs > UIO_MAXIOV)
		return -EMSGSIZE;

	kmsg->msg_iocb = NULL;

	return import_iovec(<span style="color:#ff0000;">save_addr ? READ : WRITE</span>, uiov, nr_segs,
			    UIO_FASTIOV, iov, &kmsg->msg_iter);
}
注意到当中加红的这几行。当中传入的uaddr指针被指向了用户空间msg->msg_name地址处,然后内核也不再会调用move_addr_to_kernel将用户空间的消息地址字段复制到内核空间了(由于根本不是必需了),然后以READ的方式调用import_iovec()函数。它会检查用户空间的消息数据地址能否够写入,然后依据用户须要接收的msg_iovlen长度封装kmsg->msg_iter结构。再回到___sys_recvmsg()函数中保存接收缓存的总长度到total_len中,然后设置flag标识。
	/* We assume all kernel code knows the size of sockaddr_storage */
	msg_sys->msg_namelen = 0;

	if (sock->file->f_flags & O_NONBLOCK)
		flags |= MSG_DONTWAIT;
	err = (nosec ? sock_recvmsg_nosec : sock_recvmsg)(sock, msg_sys,
							  total_len, flags);
这里将地址的长度字段清零,不使用用户空间传入的(这里假定内核知道地址的长度),然后调用依据nosec的值是否为0而调用sock_recvmsg_nosec()或sock_recvmsg()函数接收数据。nosec在recvmsg系统调用传入的为0,在recvmmsg系统可以调用接收多个消息时传入已经接受的消息个数。
同发送的sendmsg()和sendmmsg()两个系统调用一样,这样设计也是为了加速消息接收。recvmmsg()就是sock_recvmsg_nosec()的一个封装而已,仅仅只是会添加security检查:
int sock_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
		 int flags)
{
	int err = security_socket_recvmsg(sock, msg, size, flags);

	return err ?: sock_recvmsg_nosec(sock, msg, size, flags);
}
EXPORT_SYMBOL(sock_recvmsg);
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
				     size_t size, int flags)
{
	return sock->ops->recvmsg(sock, msg, size, flags);
}
这里调用了接收套接字所在协议的recvmsg接收钩子函数,对于netlink就是netlink_recvmsg()函数:
static int netlink_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
			   int flags)
{
	struct scm_cookie scm;
	struct sock *sk = sock->sk;
	struct netlink_sock *nlk = nlk_sk(sk);
	int noblock = flags&MSG_DONTWAIT;
	size_t copied;
	struct sk_buff *skb, *data_skb;
	int err, ret;

	if (flags&MSG_OOB)
		return -EOPNOTSUPP;

	copied = 0;

	skb = skb_recv_datagram(sk, flags, noblock, &err);
	if (skb == NULL)
		goto out;

	data_skb = skb;
这里首先调用skb_recv_datagram()从接收socket的缓存中接收消息并通过skb返回。假设设置了MSG_DONTWAIT则在接收队列中没有消息时马上返回。否则会堵塞等待。进入该函数具体分析:
struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned int flags,
				  int noblock, int *err)
{
	int peeked, off = 0;

	return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
				   &peeked, &off, err);
}
EXPORT_SYMBOL(skb_recv_datagram);
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags,
				    int *peeked, int *off, int *err)
{
	struct sk_buff_head *queue = &sk->sk_receive_queue;
	struct sk_buff *skb, *last;
	unsigned long cpu_flags;
	long timeo;
	/*
	 * Caller is allowed not to check sk->sk_err before skb_recv_datagram()
	 */
	int error = sock_error(sk);

	if (error)
		goto no_packet;

	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

	do {
		/* Again only user level code calls this function, so nothing
		 * interrupt level will suddenly eat the receive_queue.
		 *
		 * Look at current nfs client by the way...
		 * However, this function was correct in any case. 8)
		 */
		int _off = *off;

		last = (struct sk_buff *)queue;
		spin_lock_irqsave(&queue->lock, cpu_flags);
		skb_queue_walk(queue, skb) {
			last = skb;
			*peeked = skb->peeked;
			if (flags & MSG_PEEK) {
				if (_off >= skb->len && (skb->len || _off ||
							 skb->peeked)) {
					_off -= skb->len;
					continue;
				}

				skb = skb_set_peeked(skb);
				error = PTR_ERR(skb);
				if (IS_ERR(skb))
					goto unlock_err;

				atomic_inc(&skb->users);
			} else
				__skb_unlink(skb, queue);

			spin_unlock_irqrestore(&queue->lock, cpu_flags);
			*off = _off;
			return skb;
		}
		spin_unlock_irqrestore(&queue->lock, cpu_flags);

		if (sk_can_busy_loop(sk) &&
		    sk_busy_loop(sk, flags & MSG_DONTWAIT))
			continue;

		/* User doesn't want to wait */
		error = -EAGAIN;
		if (!timeo)
			goto no_packet;

	} while (!wait_for_more_packets(sk, err, &timeo, last));

	return NULL;

unlock_err:
	spin_unlock_irqrestore(&queue->lock, cpu_flags);
no_packet:
	*err = error;
	return NULL;
}
首先获取socket的接收队列指针到保存到queue变量中,然后获取等待时长sk->sk_rcvtimeo(它在socket初始化时被设置为MAX_SCHEDULE_TIMEOUT,也可通过set_socketopt改动)。接下来进入一个do while()循环等待从接收缓存中获取数据。
首先假定当前接收队列中已经有数据了。这时将队列上锁后从队列中取出一个skb包。然后推断是否设置了MSG_PEEK标识符(假设已经设置了表明仅获取该skb包可是不从接收队列中删除)。若设置了则调用skb_set_peeked()函数skb_clone出一个skb消息包返回,否则直接调用__skb_unlink将本次取出的skb包从列表中删除然后返回。
然后再假定当前接收队列中没有消息存在,这里会依据是否设置了CONFIG_NET_RX_BUSY_POLL内核选项运行两种等待方案。若设置了改选项。则会尝试使用一种忙等待的方案(类似于设备驱动的napi),不用来让当前运行流睡眠而切出,能够加速数据的接收速度,它会调用sk_can_busy_loop()和sk_busy_loop()推断能否够使用该方案。对于没有设置该选项。则使用传统的方案,调用wait_for_more_packets()运行等待操作,它增加等待队列会让进程睡眠:
static int wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
				 const struct sk_buff *skb)
{
	int error;
	DEFINE_WAIT_FUNC(wait, receiver_wake_function);

	prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);

	......
	
	/* handle signals */
	if (signal_pending(current))
		goto interrupted;
		
	error = 0;
	*timeo_p = schedule_timeout(*timeo_p);
out:
	finish_wait(sk_sleep(sk), &wait);
	return error;
interrupted:
	error = sock_intr_errno(*timeo_p);	
	......
}
该函数会做一些必要的推断。可是最核心的就是以上这几行代码。首先将进程设置为INTERRUPTIBLE状态,若没有消息到达或者接收到信号它会一直睡眠(超时时间timeo可设,一般为默认值MAX_SCHEDULE_TIMEOU的情况下会转而调用schedule())在schedule_timeout()函数上。在前文中已经看到,在内核netlink发送流程的最后将数据送到接收队列后会唤醒该等待队列,进程唤醒后返回到__skb_recv_datagram的大循环中去接收消息,若是被信号打断了,那大循环中无法获取消息。又会回到这里运行信号退出动作。
在从接收队列中获取了一个skb以后,我们回到netlink_recvmsg()函数中继续往下分析:
	/* Record the max length of recvmsg() calls for future allocations */
	nlk->max_recvmsg_len = max(nlk->max_recvmsg_len, len);
	nlk->max_recvmsg_len = min_t(size_t, nlk->max_recvmsg_len,
				     16384);

	copied = data_skb->len;
	if (len < copied) {
		msg->msg_flags |= MSG_TRUNC;
		copied = len;
	}

	skb_reset_transport_header(data_skb);
	err = skb_copy_datagram_msg(data_skb, 0, msg, copied);
这里更新了最长的的接收数据长度。然后推断假设获取到的skb数据长度大于大于本次接收缓存的最大长度。则设置MSG_TRUNC标识,并将本次须要接收数据量设置为接收缓存的长度。


接着调用skb_copy_datagram_msg()函数将skb中的实际数据复制到msg消息中(这里进行了一次数据拷贝动作,将skb中的数据直接复制到msg指向的用户空间地址处)。

	if (msg->msg_name) {
		DECLARE_SOCKADDR(struct sockaddr_nl *, addr, msg->msg_name);
		addr->nl_family = AF_NETLINK;
		addr->nl_pad    = 0;
		addr->nl_pid	= NETLINK_CB(skb).portid;
		addr->nl_groups	= netlink_group_mask(NETLINK_CB(skb).dst_group);
		msg->msg_namelen = sizeof(*addr);
	}
在拷贝完毕后这里開始初始化地址结构,这里将family这是为AF_NETLINK地址族,然后设置portid号为保存在原端skb扩展cb字段中的portid,对于这里接收内核发送的skb消息来说本字段为0,然后设置组播地址,该值在前文中内核调用nlmsg_multicast()发送组播消息时设置(对于单播来说就为0),netlink_group_mask()函数将组播地址的位号转换为实际的组播地址(mask),然后这是msg的地址长度为nl_addr的长度。
	if (nlk->flags & NETLINK_RECV_PKTINFO)
		netlink_cmsg_recv_pktinfo(msg, skb);

	memset(&scm, 0, sizeof(scm));
	scm.creds = *NETLINK_CREDS(skb);
	if (flags & MSG_TRUNC)
		copied = data_skb->len;

	skb_free_datagram(sk, skb);

	if (nlk->cb_running &&
	    atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf / 2) {
		ret = netlink_dump(sk);
		if (ret) {
			sk->sk_err = -ret;
			sk->sk_error_report(sk);
		}
	}

	scm_recv(sock, msg, &scm, flags);
out:
	netlink_rcv_wake(sk);
	return err ? : copied;
}
这里假设设置了NETLINK_RECV_PKTINFO标识则将辅助消息头复制到用户空间。接着推断是否设置了MSG_TRUNC标识。假设设置了就又一次设置copied为本次取出的skb中获取数据的长度(特别注意!

)。然后调用skb_free_datagram()释放skb消息包。最后在返回接收数据长度。

再回到___sys_recvmsg()函数中继续往下分析:

	err = (nosec ? sock_recvmsg_nosec : sock_recvmsg)(sock, msg_sys,
							  total_len, flags);
	if (err < 0)
		goto out_freeiov;
	len = err;

	if (uaddr != NULL) {
		err = move_addr_to_user(&addr,
					msg_sys->msg_namelen, uaddr,
					uaddr_len);
		if (err < 0)
			goto out_freeiov;
	}
这里len保存了接收到数据的长度。然后将消息地址信息从内核空间复制到用户空间。
	err = __put_user((msg_sys->msg_flags & ~MSG_CMSG_COMPAT),
			 COMPAT_FLAGS(msg));
	if (err)
		goto out_freeiov;
	if (MSG_CMSG_COMPAT & flags)
		err = __put_user((unsigned long)msg_sys->msg_control - cmsg_ptr,
				 &msg_compat->msg_controllen);
	else
		err = __put_user((unsigned long)msg_sys->msg_control - cmsg_ptr,
				 &msg->msg_controllen);
	if (err)
		goto out_freeiov;
	err = len;

out_freeiov:
	kfree(iov);
	return err;
}
最后将flag、消息辅助数据等拷贝到用户空间,至此recvmsg系统调用就向上返回了,应用层也能够使用获取到的数据了。

应用层接收netlink消息流程结束。


五、总结

本文和前一篇《Netlink 内核实现分析(一):创建》分析了内核与用户态netlink的创建、绑定及通信流程。netlink机制可以有效的利用了socket通信机制,实现了内核态与用户态之间的全双工通信,此处尽管分析的是netlink的实现方式,可是当中都是使用的Linux标准套接字系统调用接口,其运行的流程对于其它协议类型的套接字也是一样的。最后。本文仅对netlink的通用收发流程进行了分析,对于不同协议类型的netlink拥有自己特别的消息格式与处理方式。本文没有详细分析。

后面会对netlink协议中通用的Genetlink通信协议及通信流程做详细分析并给出通信演示样例程序。



















Netlink 内核实现分析(二):通信