首页 > 代码库 > 第13章 TCP编程(4)_基于自定义协议的多线程模型

第13章 TCP编程(4)_基于自定义协议的多线程模型

7. 基于自定义协议的多线程模型

(1)服务端编程

  ①主线程负责调用accept与客户端连接

  ②当接受客户端连接后,创建子线程来服务客户端,以处理多客户端的并发访问。

  ③服务端接到的客户端信息后,回显给客户端

(2)客户端编程

  ①从键盘输入信息,并发送给服务端

  ②接收来自服务端的信息

//msg.h与前一节相同

技术分享
#ifndef __MSG_H__
#define __MSG_H__
#include <sys/types.h>

//求结构体中成员变量的偏移地址
#define   OFFSET(TYPE, MEMB)   ((size_t) &((TYPE *)0)->MEMB)

//自定义的协议(TLV:Type length Value)
typedef struct{
    //协议头部
    char head[10];//TLV中的T
    unsigned int checkNum; //校验码
    unsigned int cbSizeContent; //协议体的长度
    //协议体部
    char buff[512]; //数据
}MSG;

//发送一个基于自定义协议的message,发送的数据存放在buff中
extern int write_msg(int sockfd, char* buff, size_t len);

//读取一个基于自定义协议的message,读取的数据存放在buff中
extern int read_msg(int sockfd, char* buff, size_t len);

#endif
View Code

//msg.c与前一节相同

技术分享
#include "msg.h"
#include <unistd.h>
#include <string.h>
#include <memory.h>
#include <sys/types.h>

//计算校验码
static unsigned int msg_check(MSG* msg)
{
    unsigned int s = 0;
    int i = 0;
    for(; i<sizeof(msg->head); i++){
        s += msg->head[i];
    }

    for(i=0; i<msg->cbSizeContent; i++){
        s += msg->buff[i];
    }

    return s;
}

//接收规定字节的数据,如果缓冲区的数据不够则等待
//返回:0,连接中断或发生错误
//      非0:实际接收到的字节数
static int recvData(int sockfd, char* buff, int nBytes)
{
    size_t nRecv = 0;

    do
    {
        size_t nLen = 0;
        //接收数据,直到收到指定的字节数
        nLen =  read(sockfd, buff + nRecv, nBytes - nRecv);
        nRecv += nLen;

        if(nLen == 0){ //读完或对方的写端己关闭
            nRecv = -1;
            break;
        }

    }while(nRecv < nBytes);

    return nRecv;
}

//发送指定字节数(数据量较大)的数据(采取分批发送的方法)
//当要发送的字节数超过send缓冲区大小时,要分批发送
static int sendData(int sockfd, char* buff, int nBytes)
{
    size_t nSend = 0;

    do
    {
        size_t nLen = 0;
        //接收数据,直到收到指定的字节数
        nLen = write(sockfd, buff + nSend, nBytes - nSend);
        nSend += nLen;
        if(nLen < 0 || nLen == 0){ //当写完或对方读端己关闭
            if(nLen == 0) 
                nSend = -1;
            break;
        }

    }while(nSend < nBytes);
    
    return nSend;
}

//发送一个基于自定义协议的message,发送的数据存放在buff中
int write_msg(int sockfd, char* buff, size_t len)
{
    /*封装数据成自定义的格式*/
    MSG msg;
    memset(&msg, 0, sizeof(msg));
    strcpy(msg.head, "msghead");
    memcpy(msg.buff, buff, len);
    msg.cbSizeContent = len;
    msg.checkNum = msg_check(&msg);

    /*发送自定义消息*/
    int nRet = sendData(sockfd, (char*)&msg, sizeof(msg));

    return nRet;
}

//读取一个基于自定义协议的message,读取的数据存放在buff中
int read_msg(int sockfd, char* buff, size_t len)
{
    MSG  msg;
    memset(&msg, 0, sizeof(msg));
    size_t size = 0;
    
    //读取结构体
    size = recvData(sockfd, (char*)&msg, sizeof(msg));
    if(size == -1){ //另一方socket被关闭
        return 0;   
    }else if(size != sizeof(msg)){
        return -1;
    }

    //进行校验码验证
    unsigned int s = msg_check(&msg);
    if((s == (unsigned int)msg.checkNum) 
            && (!strcmp("msghead", msg.head))){
        memcpy(buff, msg.buff, len);
        return size;
    }

    return -1;
}
View Code

//echo_tcp_client.c客户端程序与前一节相同

技术分享
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#include "msg.h"

/*基于自定义协议的多进程服务端和客户端通信*
测试:telnet 127.0.0.1 xxxx 
      http://xxx.xxx.xxx.xxx:端口号
注意:演示时可关闭服务器的防火墙,防火墙口被过滤
      #service iptables status     查看防火墙
      #service iptables stop       关闭防火墙
*/

int sockfd;
int bStop = 0;

void sig_handler(int signo)
{
    if(signo == SIGINT){
        bStop = 1;
        printf("server close\n");

        exit(1);
    }

    if(signo == SIGCHLD){
        printf("child process deaded...\n");
        wait(0);
    }
}

//显示客户端信息
void out_addr(struct sockaddr_in* addr)
{
    //将端口从网络字节序转换成主机字节序
    int port = ntohs(addr->sin_port);
    //获得IP地址
    char ip[16] ={0};
    //将ip地址从网络字节序转换成点分十分制
    inet_ntop(AF_INET, &addr->sin_addr.s_addr, ip, sizeof(ip));

    printf("Client: %s(%d) connected\n", ip, port);
}

//服务程序
void do_service(int fd)
{
    /*服务端和客户端进行读写操作(双向通信)*/
    char buff[512];
    while(1){
       memset(buff, 0, sizeof(buff));
       printf("start read and write...\n");
       size_t size;
       
       //读取客户端发送过来的消息
       if((size = read_msg(fd, buff, sizeof(buff))) < 0){
           printf("%s\n", buff); //测试
           perror("protocol error");
           break;
       }else if(size == 0){
           //当客户端断开连接时而服务器试图去读取其
           //数据,socket就类似管道,相当于客户端写端被关闭,这里read_msg
           //会返回0
           printf("%s\n", buff); //测试
           break;
       }else{
           printf("%s\n", buff);//显示客户端发送的消息
           //写回客户端(回显功能)
           if(write_msg(fd, buff, sizeof(buff)) < 0){
               perror("protocol error");
               if(errno == EPIPE){
                  //如果客户端己被关闭(相当于管道的读端关闭),会产生SIGPIPE信号
                  //将并errno设置为EPIPE
                   break;
               }
           }
       }
    }
}

int main(int argc, char* argv[])
{
    if(argc < 2){
        printf("usage: %s port\n", argv[0]);
    }

    //按ctrl-c时中止服务端程序
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }
    //回收子进程
    if(signal(SIGCHLD, sig_handler) == SIG_ERR){
        perror("signal sigchld error");
        exit(1);
    }


    /*步骤1:创建socket(套接字)
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPv4
     *SOCK_STREAM:tcp协议
     */
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    /*步骤2:将sock和地址(包括ip、port)进行绑定*/
    struct sockaddr_in servAddr; //使用专用地址结构体
    memset(&servAddr, 0, sizeof(servAddr));
    //往地址中填入ip、port和Internet地址族类型
    servAddr.sin_family = AF_INET;//IPv4
    servAddr.sin_port = htons(atoi(argv[1])); //port
    servAddr.sin_addr.s_addr = INADDR_ANY; //任一可用的IP

    if(bind(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) <0 ){
        perror("bind error");
        exit(1);
    }

    /*步骤3:调用listen函数启动监听
     *       通知系统去接受来自客户端的连接请求
     */
    if(listen(sockfd, 10) < 0){  //队列中最多允许10个连接请求
        perror("listen error");
        exit(1);
    }

    /*步骤4:调用accept函数,从请求队列中获取一个连接
     *       并返回新的socket描述符
     * */
    struct sockaddr_in clientAddr;
    socklen_t clientAddr_len = sizeof(clientAddr);
    while(!bStop){
        //如果没有客户端连接,调用此函数后会阻塞,直至获得一个客户端连接
        int fd = accept(sockfd, (struct sockaddr*)&clientAddr, &clientAddr_len);
       
        if(fd < 0){
            perror("accept error");
            continue;
        }
        
        /*步骤5:启动子进程去和客户端进行双向通信*/
        pid_t pid = fork();
        if(pid < 0){
            continue;
        }else if(pid == 0){//child process
            //输出客户端信息
            out_addr(&clientAddr);
            //处理客户端请求
            do_service(fd);
            close(fd);
            break;
        }else{  //parent process
            /*步骤6: 关闭fd套接字*/
            close(fd);
        }
    }

    close(sockfd);

    return 0;
}
/*输出结果
 * [root@localhost 13.TCP]# bin/echo_tcp_server 8888 
 * Client: 127.0.0.1(40664) connected
 * start read and write...
 * abcdefg
 * start read and write...
 * 1234567890
 * start read and write...
 * ^Cserver close
 * child process deaded...
 * server close
 */
View Code

//echo_tcp_server_th.c 服务端程序

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#include "msg.h"

/*基于自定义协议的多线程服务端和客户端通信*
测试:telnet 127.0.0.1 xxxx 
      http://xxx.xxx.xxx.xxx:端口号
注意:演示时可关闭服务器的防火墙,防火墙口被过滤
      #service iptables status     查看防火墙
      #service iptables stop       关闭防火墙
*/

int sockfd;
int bStop = 0;

void sig_handler(int signo)
{
    if(signo == SIGINT){
        bStop = 0;
        printf("server close\n");

        exit(1);
    }
}

//服务程序(与前一节例子几乎相同)
void do_service(int fd)
{
    /*服务端和客户端进行读写操作(双向通信)*/
    char buff[512];
    while(1){
       memset(buff, 0, sizeof(buff));
       size_t size;
       
       //读取客户端发送过来的消息
       if((size = read_msg(fd, buff, sizeof(buff))) < 0){
           perror("protocol error");
           break;
       }else if(size == 0){
           //当客户端断开连接时而服务器试图去读取其
           //数据,socket就类似管道,相当于客户端写端被关闭,这里read_msg
           //会返回0
           break;
       }else{
           printf("%s\n", buff);//显示客户端发送的消息
           //写回客户端(回显功能)
           if(write_msg(fd, buff, sizeof(buff)) < 0){
               perror("protocol error");
               if(errno == EPIPE){
                  //如果客户端己被关闭(相当于管道的读端关闭),会产生SIGPIPE信号
                  //将并errno设置为EPIPE
                   break;
               }
           }
       }
    }
}

/*显示客户端信息(注意:与前一节的例子不同)
 *注意:fd指向一个socket,本质上socket是一个结构体
 *其包含了通讯双方的IP、端口等信息
 */
void out_fd(int fd)
{
    struct sockaddr_in addr;
    socklen_t len = sizeof(addr);

    //从fd中获得客户端的相关信息并放置到sockaddr_in结构体中
    if(getpeername(fd, (struct sockaddr*)&addr, &len) < 0){
        perror("getpeername error");
        return;
    }

    char ip[16];
    memset(ip, 0, sizeof(ip));
    int port = ntohs(addr.sin_port);
    //将网络字节序的IP地址转为点分十进制的字符串
    inet_ntop(AF_INET, &addr.sin_addr.s_addr, ip, sizeof(ip));
    printf("%16s(%5d) closed!\n", ip, port);
}

//线程函数
void* th_fn(void* arg)
{
    int fd = (int)arg;

    do_service(fd);
    out_fd(fd);

    close(fd);

    return (void*)0;
}

int main(int argc, char* argv[])
{
    if(argc < 2){
        printf("usage: %s port\n", argv[0]);
    }

    //按ctrl-c时中止服务端程序
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }

    /*步骤1:创建socket(套接字)
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPv4
     *SOCK_STREAM:tcp协议
     */
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    /*步骤2:将sock和地址(包括ip、port)进行绑定*/
    struct sockaddr_in servAddr; //使用专用地址结构体
    memset(&servAddr, 0, sizeof(servAddr));
    //往地址中填入ip、port和Internet地址族类型
    servAddr.sin_family = AF_INET;//IPv4
    servAddr.sin_port = htons(atoi(argv[1])); //port
    servAddr.sin_addr.s_addr = INADDR_ANY; //任一可用的IP

    if(bind(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) <0 ){
        perror("bind error");
        exit(1);
    }

    /*步骤3:调用listen函数启动监听
     *       通知系统去接受来自客户端的连接请求
     */
    if(listen(sockfd, 10) < 0){  //队列中最多允许10个连接请求
        perror("listen error");
        exit(1);
    }

    /*步骤4:调用accept函数,从请求队列中获取一个连接
     *       并返回新的socket描述符
     * */

    //设置线程的分离属性
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    while(!bStop){
        //主线程负责调用accept去获得客户端的连接
        //如果没有客户端连接,调用此函数后会阻塞,直至获得一个客户端连接
        int fd = accept(sockfd, NULL, NULL);
       
        if(fd < 0){
            perror("accept error");
            continue;
        }
        
        /*步骤5:以分离方式启动子线程去和客户端进行双向通信
        *注意:是以分离方式启动的,从而可以在子线程服务客户端结束时,由子
        *线程自己回收其所占的资源
        */
        pthread_t th;
        int err;
        //以分离状态启动子线程
        if((err = pthread_create(&th, &attr, th_fn, (void*)fd)) != 0){
            perror("pthread create error");
        }
    }

    pthread_attr_destroy(&attr);

    close(sockfd);

    return 0;
}
/*输出结果
 * [root@localhost 13.TCP]# bin/echo_tcp_server_th 8888
 * asdfasdf
 * asdfas
 * asdfasd
 * aaa
 *        127.0.0.1(40683) closed!
 * asdfasf
 * ads
 *        127.0.0.1(40682) closed!
 *^Cserver close
 */

第13章 TCP编程(4)_基于自定义协议的多线程模型