首页 > 代码库 > Linux Linux程序练习十九

Linux Linux程序练习十九

题目:编写一个同步服务器模型
要求:
1)客户端A主机给服务器B主机发送报文,
2)B服务器主机收到报文以后同时分发给C1主机、C2主机;
3)C1主机和C2主机打印出客户端A的报文
bug总结:本来这道题目并不困难,就是向客户端连接池中的其他客户端发送数据,但是我这里出现了一个失误,
我把接收到的数据直接发送了。 第一步:recv_packet(fd, &pack, &buflen, 0);
第二步:send_packet(cltpool[i], &pack, buflen, 0);
但是第二步中buflen的实际长度应该比buflen+4,导致发送的结构体不完整,接收失败,效果类似于没有接收到消息
核心代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <fcntl.h>
#include <termios.h>
#include <signal.h>
#include <pthread.h>
#include "commsock.h"

#define MAXBUFSIZE 1020

//报文结构
typedef struct _packet
{
    int len;
    char buf[MAXBUFSIZE];
} Packet;

/**
 * readn - 读取固定大小的字节
 * @fd:文件描述符
 * @buf:接收缓冲区
 * @count:指定读取字节数
 * 成功返回count,失败返回-1,对等方连接关闭返回<count
 * */
int readn(int fd, void *buf, int count)
{
    int nread = 0;
    int lread = count;
    char *pbuf = (char *) buf;
    while (lread > 0)
    {
        do
        {
            nread = read(fd, pbuf, lread);
        } while (nread == -1 && errno == EINTR);
        if (nread == -1)
            return -1;
        else if (nread == 0)
            return count - lread;
        lread -= nread;
        pbuf += nread;
    }
    return count;
}

/**
 * writen - 写固定大小字节数
 * @fd:文件描述符
 * @buf:写入缓冲区
 * @count:指定写入字节数
 * 成功返回count,失败返回-1
 * */
int writen(int fd, void *buf, int count)
{
    int lwrite = count;
    int nwrite = 0;
    char *pbuf = (char *) buf;
    while (lwrite > 0)
    {
        do
        {
            nwrite = write(fd, pbuf, lwrite);
        } while (nwrite == -1 && errno == EINTR);
        if (nwrite == -1)
            return -1;
        lwrite -= nwrite;
        pbuf += nwrite;
    }
    return count;
}

/**
 * read_timeout - 读超时检测函数,不含读操作
 * @fd:文件描述符
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT
 * */
int read_timeout(int fd, unsigned int wait_seconds)
{
    int ret = 0;
    if (wait_seconds > 0)
    {
        fd_set readfds;
        FD_ZERO(&readfds);
        FD_SET(fd, &readfds);
        struct timeval timeout;
        timeout.tv_sec = wait_seconds;
        timeout.tv_usec = 0;
        do
        {
            ret = select(fd + 1, &readfds, NULL, NULL, &timeout);
        } while (ret == -1 && errno == EINTR);
        //ret==-1
        if (ret == 0)
        {
            errno = ETIMEDOUT;
            ret = -1;
        } else if (ret == 1)
        {
            ret = 0;
        }
    }
    return ret;
}

/**
 * write_timeout - 写超时检测函数,不含写操作
 * @fd:文件描述符
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT
 * */
int write_timeout(int fd, unsigned int wait_seconds)
{
    int ret = 0;
    if (wait_seconds > 0)
    {
        fd_set writefds;
        FD_ZERO(&writefds);
        FD_SET(fd, &writefds);
        struct timeval timeout;
        timeout.tv_sec = wait_seconds;
        timeout.tv_usec = 0;
        do
        {
            ret = select(fd + 1, NULL, &writefds, NULL, &timeout);
        } while (ret == -1 && errno == EINTR);
        //ret==-1
        if (ret == 0)
        {
            errno = ETIMEDOUT;
            ret = -1;
        } else if (ret == 1)
        {
            ret = 0;
        }
    }
    return ret;
}

/**
 * activate_nonblock - 设置套接字非阻塞
 * @fd:文件描述符
 * 成功返回0,失败返回-1
 * */
int activate_nonblock(int fd)
{
    int ret = 0;
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1)
        return -1;
    flags = flags | O_NONBLOCK;
    ret = fcntl(fd, F_SETFL, flags);
    //ret==-1
    return ret;
}

/**
 * deactivate_nonblock - 设置套接字阻塞
 * @fd:文件描述符
 * 成功返回0,失败返回-1
 * */
int deactivate_nonblock(int fd)
{
    int ret = 0;
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1)
        return -1;
    flags = flags & (~O_NONBLOCK);
    ret = fcntl(fd, F_SETFL, flags);
    return ret;
}

/**
 * connect_timeout - 带超时的connect(函数内已执行connect)
 * @fd:文件描述符
 * @addr:服务器网络地址结构
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回-1,超时返回-1并且errno=ETIMEDOUT
 * */
int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
    int ret = 0;
    if (wait_seconds > 0)
    {
        if (activate_nonblock(fd) == -1)
            return -1;
    }
    ret = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr_in));
    if (ret == -1 && errno == EINPROGRESS)
    {
        fd_set writefds;
        FD_ZERO(&writefds);
        FD_SET(fd, &writefds);
        struct timeval timeout;
        timeout.tv_sec = wait_seconds;
        timeout.tv_usec = 0;
        int nwrite = select(fd + 1, NULL, &writefds, NULL, &timeout);
        //nwrite==-1 此时ret==-1
        if (nwrite == 0)
            errno = ETIMEDOUT;
        else if (nwrite == 1)
        {
            int err = 0;
            socklen_t len = sizeof(err);
            ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
            if (ret == 0)
            {
                if (err != 0)
                {
                    errno = err;
                    ret = -1;
                }
            }
        }
    }
    if (wait_seconds > 0)
    {
        if (deactivate_nonblock(fd) == -1)
            return -1;
    }
    return ret;
}

/**
 * sock_init - 初始化SOCKET环境
 * @connid:连接套接字
 * 成功返回0,失败返回错误码
 * */
int sock_init(int *connid)
{
    int ret = 0;
    if (connid == NULL)
    {
        ret = SckParamErr;
        printf("cltsock_init() params not correct !\n");
        return ret;
    }
    //init
    ret = socket(AF_INET, SOCK_STREAM, 0);
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("socket() err");
        return ret;
    } else
    {
        *connid = ret;
        ret = 0;
    }
    return ret;
}

/**
 * connect_server - 连接服务器
 * @connid:连接套接字
 * @port:端口号
 * @ipaddr:IP地址
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回错误码
 * */
int connect_server(int connid, int port, char *ipaddr,
        unsigned int wait_seconds)
{
    int ret = 0;
    if (connid < 0 || port < 0 || port > 65535 || ipaddr == NULL
            || wait_seconds < 0)
    {
        ret = SckParamErr;
        printf("cltsock_init() params not correct !\n");
        return ret;
    }
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8080);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = connect_timeout(connid, &addr, wait_seconds);
    if (ret == -1)
    {
        if (errno == ETIMEDOUT)
        {
            ret = SckTimeOut;
            printf("connect_timeout() time out !\n");
            return ret;
        }
        ret = SckBaseErr;
        perror("connect_timeout() err");
        return ret;
    }
    return ret;
}

/**
 * send_packet - 发送数据包
 * @fd:文件描述符
 * @pack:数据包
 * @buflen:数据包大小
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回错误码
 * */
int send_packet(int fd, Packet *pack, int buflen, unsigned int wait_seconds)
{
    int ret = 0;
    //可写检测
    ret = write_timeout(fd, wait_seconds);
    if (ret == -1)
    {
        if (errno == ETIMEDOUT)
        {
            ret = SckTimeOut;
            printf("write_timeout() time out !\n");
            return ret;
        }
        ret = SckBaseErr;
        perror("write_timeout() err");
        return ret;
    }
    //发送数据
    ret = writen(fd, pack, buflen);
    if (ret != buflen)
    {
        ret = SckBaseErr;
        perror("writen() err");
        return ret;
    } else
    {
        ret = 0;
    }
    return ret;
}

/**
 * send_packet - 接收数据包
 * @fd:文件描述符
 * @pack:数据包
 * @buflen:数据包大小
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 成功返回0,失败返回错误码
 * */
int recv_packet(int fd, Packet *pack, int *buflen, unsigned int wait_seconds)
{
    int ret = 0;
    //读超时检测
    ret = read_timeout(fd, wait_seconds);
    if (ret == -1)
    {
        if (errno == ETIMEDOUT)
        {
            ret = SckTimeOut;
            printf("read_timeout() time out !\n");
            return ret;
        }
        ret = SckBaseErr;
        perror("read_timeout() err");
        return ret;
    }
    //获取数据长度
    int len = 0;
    ret = readn(fd, &pack->len, 4);
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("readn() err");
        return ret;
    } else if (ret < 4)
    {
        ret = SckPeerClosed;
        printf("peer is closed !\n");
        return ret;
    }
    //网络字节序转化成本地字节序
    len = ntohl(pack->len);
    //获取包体
    ret = readn(fd, pack->buf, len);
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("readn() err");
        return ret;
    } else if (ret < len)
    {
        ret = SckPeerClosed;
        printf("peer is closed !\n");
        return ret;
    } else if (ret == len)
    {
        ret = 0;
    }
    *buflen = len;
    return ret;
}

/**
 * start_thread - 客户端线程回调函数
 * @arg:参数
 * */
void *start_thread(void *arg)
{
    if (arg == NULL)
    {
        printf("start_thread() params not correct !\n");
        return NULL;
    }
    int fd = (int) arg;
    int ret = 0;
    //接收信息并且打印
    Packet pack;
    int buflen = MAXBUFSIZE;
    while (1)
    {
        memset(&pack, 0, sizeof(pack));
        ret = recv_packet(fd, &pack, &buflen, 500);
        if (ret != 0)
        {
            printf("客户端线程退出了!\n");
            //退出当前进程
            pthread_exit(NULL);
        }
        //打印数据
        fputs(pack.buf, stdout);
        //fflush(stdout);
    }
    return NULL;
}

/**
 * run_clt - 运行客户端
 * @connid:连接套接字
 * @wait_seconds:等待超时秒数,如果为0表示不检测超时
 * 失败返回错误码
 * */
int run_clt(int connid, unsigned int wait_seconds)
{
    int ret = 0;
    //安装信号
    if (signal(SIGPIPE, handler) == SIG_ERR)
    {
        ret = SckBaseErr;
        printf("signal() failed !\n");
        return ret;
    }
    //开始多线程
    pthread_t thr1;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    //设置进程为可分离状态
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (pthread_create(&thr1, &attr, start_thread, connid) != 0)
    {
        ret = SckBaseErr;
        printf("pthread_create() failed !\n");
        return ret;
    }
    Packet pack;
    memset(&pack, 0, sizeof(pack));
    int buflen = 0;
    while (fgets(pack.buf, MAXBUFSIZE, stdin) != NULL)
    {
        //去除\n
        buflen = strlen(pack.buf);
        pack.len = htonl(buflen);
        //发送数据
        ret = send_packet(connid, &pack, buflen + 4, wait_seconds);
        if (ret != 0)
        {
            return ret;
        }
    }
    return ret;
}

/**
 * close_socket - 关闭连接
 * @fd:文件描述符
 * 成功返回0
 * */
int close_socket(int fd)
{
    int ret = 0;
    close(fd);
    return ret;
}

/*
 * clear_back - 退格键不回显
 * 成功返回0,失败返回错误码
 * */
int clear_back()
{
    int ret = 0;
    struct termios term;
    memset(&term, 0, sizeof(term));
    //获取当前系统设置
    if (tcgetattr(STDIN_FILENO, &term) == -1)
    {
        ret = SckBaseErr;
        perror("tcgetattr() err");
        return ret;
    }
    //修改系统设置
    term.c_cc[VERASE] = \b;
    //立即生效
    if (tcsetattr(STDIN_FILENO, TCSANOW, &term) == -1)
    {
        ret = SckBaseErr;
        perror("tcsetattr() err");
        return ret;
    }
    return ret;
}

/**
 * listen_socket - 创建服务器监听套接字
 * @fd:套接字
 * @port:端口号
 * 成功返回0,失败返回错误码
 * */
int listen_socket(int fd, int port)
{
    int ret = 0;
    if (port < 0 || port < 0 || port > 65535)
    {
        ret = SckParamErr;
        printf("listen_socket() params not correct !\n");
        return ret;
    }
    //reuse addr
    int optval = 1;
    ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("setsockopt() err");
        return ret;
    }
    //bind
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    ret = bind(fd, (struct sockaddr *) &addr, sizeof(addr));
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("bind() err");
        return ret;
    }
    //listen
    ret = listen(fd, SOMAXCONN);
    if (ret == -1)
    {
        ret = SckBaseErr;
        perror("listen() err");
        return ret;
    }
    return ret;
}

/**
 * product_clt - 处理客户端信息
 * @fd:客户端A文件描述符
 * @cltpool:客户端套接字池
 * @maxindex:连接池最后一个元素的下标
 * 成功返回0,失败返回错误码
 * */
int product_clt(int fd, int *cltpool, int maxindex)
{
    int ret = 0;
    //接收客户端信息
    Packet pack;
    memset(&pack, 0, sizeof(pack));
    int buflen = 0;
    ret = recv_packet(fd, &pack, &buflen, 0);
    if (ret != 0)
        return ret;
    //转发给其他客户端
    int i = 0;
    for (i = 0; i <= maxindex; i++)
    {
        if (cltpool[i] != -1 && cltpool[i] != fd)
        {
            //发送信息
            ret = send_packet(cltpool[i], &pack, buflen+4, 0);
            if (ret != 0)
                return ret;
        }
    }
    return ret;
}

/**
 * handler - 信号捕捉函数
 * @sign:信号值
 * */
void handler(int sign)
{
    if (sign == SIGPIPE)
    {
        printf("accept SIGPIPE!\n");
    }
}

/**
 * select_socket - select机制管理客户端连接
 * @fd:文件描述符
 * 失败返回错误码
 * */
int select_socket(int fd)
{
    int ret = 0;
    //安装信号
    if (signal(SIGPIPE, handler) == SIG_ERR)
    {
        ret = SckBaseErr;
        printf("signal() failed !\n");
        return ret;
    }
    //定义客户端套接字临时变量
    int conn = 0;
    struct sockaddr_in peeraddr;
    socklen_t peerlen = 0;
    //已经处理的select事件
    int nread = 0;
    //创建客户端连接池
    int cltpool[FD_SETSIZE] = { 0 };
    //初始化连接池
    int i = 0;
    for (i = 0; i < FD_SETSIZE; i++)
    {
        cltpool[i] = -1;
    }
    //定义数组尾部元素下标
    int maxindex = 0;
    //定义最大的套接字(初始值是监听套接字)
    int maxfd = fd;
    //定义最新的套接字集合
    fd_set allsets;
    FD_ZERO(&allsets);
    //定义需要监听的套接字集合
    fd_set readfds;
    FD_ZERO(&readfds);
    //将监听套接字加入最新的套接字集合
    FD_SET(fd, &allsets);
    while (1)
    {
        //将最新的套接字集合赋值给需要监听的套接字集合
        readfds = allsets;
        do
        {
            nread = select(maxfd + 1, &readfds, NULL, NULL, NULL);
        } while (nread == -1 && errno == EINTR); //屏蔽信号
        if (nread == -1)
        {
            ret = SckBaseErr;
            perror("select() err");
            return ret;
        }
        //1.服务器监听套接字处理
        if (FD_ISSET(fd, &readfds))
        {
            //接收到客户端的连接
            memset(&peeraddr, 0, sizeof(peeraddr));
            peerlen = sizeof(peeraddr);
            conn = accept(fd, (struct sockaddr *) &peeraddr, &peerlen);
            if (conn == -1)
            {
                ret = SckBaseErr;
                perror("accept() err");
                return ret;
            }
            //将客户端连接添加到连接池
            for (i = 0; i < FD_SETSIZE; i++)
            {
                if (cltpool[i] == -1)
                {
                    if (i > maxindex)
                    {
                        maxindex = i;
                    }
                    cltpool[i] = conn;
                    break;
                }
            }
            if (i == FD_SETSIZE)
            {
                ret = SckBaseErr;
                close(conn);
                printf("客户端连接池已满!\n");
                return ret;
            }
            if (conn > maxfd)
                maxfd = conn;
            //将该客户端套接字加入到最新套接字集合
            FD_SET(conn, &allsets);
            printf("server accept from :%s\n", inet_ntoa(peeraddr.sin_addr));
            if (--nread <= 0)
                continue;
        }
        //处理客户端请求
        if (nread <= 0)
            continue;
        for (i = 0; i <= maxindex; i++)
        {
            if (cltpool[i] == -1)
                continue;
            if (FD_ISSET(cltpool[i], &readfds))
            {
                //处理客户端请求
                ret = product_clt(cltpool[i], cltpool, maxindex);
                if (ret != 0)
                {
                    //从最新的套接字集合中删除
                    FD_CLR(cltpool[i], &allsets);
                    //处理请求失败,关闭客户端连接
                    close(cltpool[i]);
                    //从客户端连接池中清除
                    cltpool[i] = -1;
                    break;
                }
                if (--nread <= 0)
                    break;
            }
        }
    }
    return ret;
}

 

Linux Linux程序练习十九