首页 > 代码库 > linux进程间通信--本地socket(tcp部分)

linux进程间通信--本地socket(tcp部分)

[内核版本]

linux-2.6.31


[尚存缺憾]

1、getsockopt和setsockopt的某些特定参数的调用(net/unix/af_unix.c中定义的*sockop函数均保留接口,返回EOPNOTSUPP错误);

2、ss命令查看到的本地socket状态需要进一步确认;


[注意事项]

1、使用本地socket进行通信时,其通信过程并不通过报文交互进行状态机切换:

    a)server端在执行listen函数之后,该socket即处于监听状态;

    b)client端在执行connect函数时,正常情况下,内核将sock状态设置为SS_CONNECTED,将sk状态设置为TCP_ESTABLISHED,然后通知server端有client请求(向server的socket发送SIGIO信号);

    c)一旦出现某一情况导致server端的最大链接请求(应该是由sk_max_ack_backlog成员定义,在listen函数中指定)使用完毕,如果不设置连接超时时间,client端应该在2147483647秒的时间超时

// include/linux/kernel.h

#define LONG_MAX((long)(~0UL>>1))

// include/linux/sched.h

#defineMAX_SCHEDULE_TIMEOUTLONG_MAX

// net/core/sock.c

void sock_init_data(struct socket *sock, struct sock *sk)

{

        ...

sk->sk_sndtimeo=MAX_SCHEDULE_TIMEOUT;

        ...

}

2、本地socket和网络socket的通信机制有些不同(比如,tcp状态机不通过报文交互实现,getsockopt不能取到tcp的连接状态,内核在处理本地socket时,可能不会有一些协议栈的过程,即可能忽略了报文的完整性检查),很多网络socket上使用的函数在迁移至本地socket时需要酌情考虑。


[server端代码]

int ct_ipc_sock_create(struct ct_fd * sockfd, int server_flag)
{
	log_debug("server_flag: %d", server_flag);
	int ret = CT_RET_SUCCESS;
	int result = 0;
	struct sockaddr_un srv_addr;

	memset(&srv_addr, 0, sizeof(struct sockaddr_un));

	if (SERVER_SOCKET == server_flag)
	{
		sockfd->fd = socket(PF_UNIX, SOCK_DGRAM, 0);
		if (0 > sockfd->fd)
		{
			log_err("create Unix Socket error");
			ret = CT_RET_SYS_SOCK_CREATE_ERR;
			goto ct_ipc_sock_create_err;
		}

		srv_addr.sun_family=AF_UNIX; 
		strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path)-1);    
		unlink(UNIX_DOMAIN);
		
		ret=bind(sockfd->fd,(struct sockaddr*)&srv_addr,sizeof(srv_addr));    
		if(ret==-1)
		{
			log_err("cannot bind server socket (%d)", sockfd->fd);        
			ret = CT_RET_SYS_SOCK_BIND_ERR;
			goto ct_ipc_sock_bind_listen_err;
		}
		
	}
	else if (CLIENT_SOCKET == server_flag)
	{
		sockfd->fd = socket(PF_UNIX, SOCK_STREAM, 0);
		if (0 > sockfd->fd)
		{
			log_err("create Unix Socket error");
			ret = CT_RET_SYS_SOCK_CREATE_ERR;
			goto ct_ipc_sock_create_err;
		}
		log_debug("sockfd: %d", sockfd->fd);
	
		setsockopt(sockfd->fd, SOL_SOCKET, TCP_NODELAY, &result, sizeof(int));

		srv_addr.sun_family=AF_UNIX; 
		strncpy(srv_addr.sun_path, UNIX_DOMAIN_CLOUD, sizeof(srv_addr.sun_path)-1);    
		unlink(UNIX_DOMAIN_CLOUD);
		ret=bind(sockfd->fd,(struct sockaddr*)&srv_addr,sizeof(srv_addr));
		
		if(ret==-1)
		{
			log_err("cannot bind server socket (%d)", sockfd->fd);        
			ret = CT_RET_SYS_SOCK_BIND_ERR;
			goto ct_ipc_sock_bind_listen_err;
		}
		log_debug("%s", strerror(errno));

		ret=listen(sockfd->fd, MAX_LISTEN_BACKLOG);   
		if(ret==-1)
		{       
			log_err("cannot listen the client connect request (%d)", sockfd);
			ret = CT_RET_SYS_SOCK_LISTEN_ERR; 
			goto ct_ipc_sock_bind_listen_err;
		}
	}
	else
	{
	}
	
	log_debug("%s", strerror(errno));
	
	return ret;
	
ct_ipc_sock_bind_listen_err:
	close(sockfd->fd);
ct_ipc_sock_create_err:
	sockfd->fd = -1;
	return ret;
}
int ct_select_run()
{
	int ret = CT_RET_SUCCESS;
	fd_set  rset, allset;

	int nfds = 0;
	int maxfd = 0;
	int cloudfd = 0;
	int read_num = 0;
	int write_num = 0;
	socklen_t addrLen;
	struct sockaddr_in addr;
	char rsp[] = "ACK";
	char data_buf[MAX_BUF] = {0};

	log_debug("sockfd: %d, nlfd.fd: %d, cloud_sockfd.fd: %d", ipc_sockfd.fd, nlfd.fd, cloud_sockfd.fd);

	group_filter_init();
	
	maxfd = ipc_sockfd.fd ;
	
	FD_ZERO(&allset);
	FD_SET(cloud_sockfd.fd, &allset);
	log_debug("sockfd: %d, maxfd: %d, cloud_sockfd.fd: %d", ipc_sockfd.fd, maxfd, cloud_sockfd.fd);

	while (1/*nfds == 0*/)
	{
		rset = allset;
		log_debug("sockfd: %d", ipc_sockfd.fd);
		nfds = select(maxfd + 1, &rset, NULL, NULL, NULL);
		if (0 > nfds)
		{
			log_err("select error: %s", strerror(errno));
			ret = CT_RET_SYS_SOCK_SELECT_ERR;
			return ret;
		}

		if (FD_ISSET(cloud_sockfd.fd, &rset)) {
			log_debug("cloud_sockfd: %d", cloud_sockfd.fd);
			cloudfd = accept(cloud_sockfd.fd, (struct sockaddr *)&addr, &addrLen);

			if (cloudfd < 0) {
				
                log_err("accept (%d) client error: %d", cloud_sockfd.fd, ret);
                continue;
			}
			
			read_num = read(cloudfd, data_buf, MAX_BUF);
			log_debug("client socket is: %d, read_num: %d", cloudfd, read_num);
			
            if (read_num > 0) {
				write_num = write(cloudfd, sta_info, (sizeof(sta_list_attr)*num + sizeof(int)));
				log_debug("client socket is: %d, write_num: %d", cloudfd, write_num);;
				}
            }

			close(cloudfd);
		}
	}

	return ret;
}


[client端代码]

int do_filter_notify(cloud_cmd_attr * attr, char * buf, int buf_len)
{
	int ret = 0;
	int client_fd = -1;
	char recv_buf[128] = {0};
	struct sockaddr_un srv_addr;
	int maxfd = 0;
	int nfds = 0;
	fd_set  rset, allset;
	struct timeval tv;
	struct timeval t;
	/* Wait up to five seconds. */
	tv.tv_sec = CLI_TIME_OUT;
	tv.tv_usec = 0;
	
	memset(&srv_addr, 0, sizeof(struct sockaddr_un));

	client_fd = socket(AF_UNIX, SOCK_STREAM, 0);
	log_debug("client_fd: %d\n", client_fd);
	printf("client_fd: %d\n", client_fd);
	if (-1 != client_fd) 
	{
		t.tv_sec = UNIX_SOCK_CONNECT_TIMEO;
		t.tv_usec = 0; 
		ret = setsockopt(client_fd, SOL_SOCKET, SO_SNDTIMEO, &t, sizeof(t));
		srv_addr.sun_family = AF_UNIX;
		memcpy(srv_addr.sun_path, UNIX_DOMAIN_CLOUD, strlen(UNIX_DOMAIN_CLOUD));
		log_debug("srv_addr.sun_path: %s\n", srv_addr.sun_path);
		printf("srv_addr.sun_path: %s\n", srv_addr.sun_path);
		ret = connect(client_fd, (struct sockaddr *)&srv_addr, sizeof(srv_addr));
		if (-1 == ret)
		{
			log_err("error: %d, %s\n", errno, strerror(errno));
			ret = CT_RET_SYS_SOCK_CONNECT_ERR;
			goto close_fd;
		}
		else 
		{
			int write_num = 0;
			write_num = write(client_fd, attr, sizeof(cloud_cmd_attr));
			if (sizeof(cloud_cmd_attr) == write_num) 
			{
				FD_ZERO(&allset);	
				FD_SET(client_fd, &allset);
				maxfd = client_fd;
				rset = allset;
				nfds = select(maxfd + 1, &rset, NULL, NULL, &tv);
				log_debug("nfds: %d\n", nfds);
				printf("nfds: %d\n", nfds);
				if (FD_ISSET(client_fd, &rset))
				{
					int num = 0;
					if (NULL == buf)
					{
						num = read(client_fd, recv_buf, sizeof(recv_buf));
					}
					else
					{
						num = read(client_fd, buf, buf_len);
					}
					if (num <= 0) 
					{
						log_err("read %d error", client_fd);
						ret = CT_RET_FD_READ_ERR;
					}
					else
					{
						log_debug("client_fd: %d, read length: %d", client_fd, num);
						printf("client_fd: %d, read length: %d", client_fd, num);
						if (NULL == buf)
						{
							log_debug("recv_buf: %s\n", recv_buf);
							if (!strcasecmp(recv_buf, "ack"))
							{
								ret = CT_RET_SUCCESS;
							}
						}
						else
						{
							// do nothing
						}
					}
					goto close_fd;
				}
				else
				{
					log_err("no response from: %d", client_fd);
					printf("no response from: %d", client_fd);
					ret = CT_RET_SOCK_SELECT_TIMEOUT;
					goto close_fd;
				}
			}
			else
			{
				log_err("(%d) write to peer %s error (num: %d)", client_fd, UNIX_DOMAIN_CLOUD, ret);
				printf("(%d) write to peer %s error (num: %d)", client_fd, UNIX_DOMAIN_CLOUD, ret);
				ret = CT_RET_FD_WRITE_ERR;
				goto close_fd;
			}	
		}
	} 
	else
	{
		log_err("create Unix socket error");
		printf("create Unix socket error");
		ret = CT_RET_SYS_SOCK_CREATE_ERR;
	}

	return ret;
close_fd:
	close(client_fd);
	client_fd = -1;
	return ret;
}


linux进程间通信--本地socket(tcp部分)