首页 > 代码库 > Linux多线程之同步3
Linux多线程之同步3
需求
客户端将需要解决的task发送给服务器,服务器调用线程来解决客户端发送的task,解决完由线程负责将其发送回客户端。(用管道实现通信)
思路
1. server维护两个列表。一是客户端列表。二是任务列表。分别如下:
/* 客户端列表 */ typedef struct tag_fds{ int s_rfd ; int s_wfd ; struct tag_fds* s_next ;}FD_PAIR, *pFD_PAIR;/* 任务列表,相当于资源 */typedef struct tag_que{ TASK s_arr[TASK_CNT + 1] ; int s_front ; int s_tail ;}QUEUE, *pQUEUE ;
2. server端维护一个管道(为了叙述方便,暂时称为server_pipe),用于接收客户端的上线消息。client端维护两个管道,一个管道用于向server端发送所要处理的task,而另一个管道用于接收从server端返回的task result。
3. server端可以使用select函数对所有管道的读端进行轮询。所有的读端包括:用于接收客户端task管道的读端以及server_pipe的读端。
4. 当客户端上线时,它会将自己的进程ID(pid)通过server_pipe发送给服务器。服务器根据pid,可以构造出该客户端所创建的两个管道的名称,以此可以打开客户端的两个管道,同时将server端的读端加入监听集合,并且将该客户端加入客户端队列。
5. 当客户端向服务器发送task时,服务器端的select监听到之后,会遍历客户端列表(根据server端接收task管道的读描述符),找到具体的客户端。我们会将服务器对应于该客户端管道的写描述符连同task任务一起打包(其实就是结构体啦),加入任务列表。之所以要将写描述符打包进一个任务,是因为方便我们的线程处理完任务后,可以直接向客户端返回结果。任务结构体如下:
typedef struct tag_task{ char s_msg[1024]; /* 客户端向服务器端发送的任务用msg存储 */ int s_fd ; /* s_fd为写端,用于线程处理完任务后,发送消息给客户端 */}TASK, *pTASK;
6. 很显然,我们服务器端的主线程在此处就是一个生产者,负责将TASK添加到任务列表中。而主线程创造出的诸多线程则是消费者,从任务列表取出任务,处理完后发送结果至客户端。
7. 此处服务器处理逻辑比较简单,客户端发送什么请求,我们就返回什么请求,打印在屏幕上。
代码
头文件server.h
#ifndef __SERVER_H__#define __SERVER_H__#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <string.h>#include <sys/stat.h>#include <sys/types.h>#include <fcntl.h>#include <pthread.h>#include <sys/time.h>#include <sys/select.h>#include <signal.h>#define MSG_LEN 1024#define TASK_CNT 1024 extern pthread_mutex_t mutex ;extern pthread_cond_t cond_master ;extern pthread_cond_t cond_slave ;typedef struct tag_fds{ int s_rfd ; int s_wfd ; struct tag_fds* s_next ;}FD_PAIR, *pFD_PAIR;typedef struct tag_task{ char s_msg[1024]; int s_fd ;}TASK, *pTASK;typedef struct tag_que{ TASK s_arr[TASK_CNT + 1] ; int s_front ; int s_tail ;}QUEUE, *pQUEUE ;void fds_link_init(pFD_PAIR* phead);void fds_insert(pFD_PAIR* phead, int fd_r, int fd_w);int fds_find_wfd(pFD_PAIR phead, int fd_r);void fds_link_delete(pFD_PAIR* phead,int fd_r);void add_task(pQUEUE pq,pTASK pt );void get_task(pQUEUE pq, pTASK pt);void excute_task(pTASK pt);#endif
服务器主线程main.c
/************************************************************************* > File Name: ./src/main.c > Author: KrisChou > Mail:zhoujx0219@163.com > Created Time: Tue 26 Aug 2014 07:55:50 PM CST ************************************************************************/#include "server.h"pthread_mutex_t mutex ;pthread_cond_t cond_master ;pthread_cond_t cond_slave ;void* slave_handler(void* arg){ pthread_detach(pthread_self()); pQUEUE pq = (pQUEUE)arg ; TASK my_task ; while(1) { get_task(pq, &my_task); excute_task(&my_task); sleep(1); }}int main(int argc, char* argv[])// exe fifo_name thd_cnt{ if(argc != 3) { printf("USAGE: EXE FILENAME THD_CNT ! \n"); exit(1); } signal(SIGINT, SIG_IGN); signal(SIGPIPE,SIG_IGN); signal(SIGQUIT,SIG_IGN); int fd_server ; QUEUE my_que ; pFD_PAIR my_list ; fd_set read_set, ready_set ; struct timeval tm ; int select_ret ; memset(&my_que, 0, sizeof(QUEUE)); fds_link_init(&my_list); int slave_cnt = atoi(argv[2]); pthread_t * arr = (pthread_t*)calloc(slave_cnt, sizeof(pthread_t)); pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond_master, NULL); pthread_cond_init(&cond_slave, NULL); int index = 0 ; while(slave_cnt > 0) { pthread_create(arr + index, NULL,slave_handler, (void*)&my_que ); slave_cnt -- ; index ++ ; } fd_server = open(argv[1], O_RDONLY); if(fd_server == -1) { perror("open"); exit(-1); } FD_ZERO(&read_set); FD_ZERO(&ready_set); FD_SET(fd_server, &read_set); while(1) { tm.tv_sec = 0 ; tm.tv_usec = 1000 ; ready_set = read_set ; select_ret = select(1024,&ready_set, NULL, NULL, &tm ); if(select_ret == 0) { continue ; }else if(select_ret > 0) { if(FD_ISSET(fd_server, &ready_set))// client on r.pid w.pid { char buf[32]; memset(buf, 0, 32); if(read(fd_server, buf, 32) == 0) { continue ; }else { printf("a client on ! \n"); char pipe_name[32]; memset(pipe_name, 0, 32); buf[strlen(buf) - 1] = ‘\0‘; sprintf(pipe_name,"r.%s",buf);//clinet read int wfd, rfd ; wfd = open(pipe_name, O_WRONLY); memset(pipe_name, 0, 32); sprintf(pipe_name,"w.%s",buf);//clinet write rfd = open(pipe_name, O_RDONLY); fds_insert(&my_list, rfd, wfd); FD_SET(rfd, &read_set); } } pFD_PAIR pCur = my_list ; while(pCur) { if(FD_ISSET(pCur ->s_rfd, &ready_set))// client request { char buf[1024] ; memset(buf, 0, 1024); if(read(pCur ->s_rfd, buf, 1024) == 0)//client quit { FD_CLR(pCur ->s_rfd, &read_set); int fd_r = pCur ->s_rfd ; pCur = pCur -> s_next ; fds_link_delete(&my_list, fd_r); }else { TASK tk ; memset(&tk, 0, sizeof(tk)); tk.s_fd = pCur -> s_wfd ; strcpy(tk.s_msg, buf); add_task(&my_que, &tk); pCur = pCur ->s_next ; } }else { pCur = pCur ->s_next ; } } } } pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond_master); pthread_cond_destroy(&cond_slave); return 0 ;}
fds_link.c
/************************************************************************* > File Name: ../src/fds_link.c > Author: KrisChou > Mail:zhoujx0219@163.com > Created Time: Tue 26 Aug 2014 05:40:08 PM CST ************************************************************************/#include "server.h"void fds_link_init(pFD_PAIR* phead){ *phead = NULL ;}void fds_insert(pFD_PAIR* phead, int fd_r, int fd_w){ pFD_PAIR pCur = (pFD_PAIR)calloc(1, sizeof(FD_PAIR)); pCur ->s_rfd = fd_r ; pCur ->s_wfd = fd_w ; pCur ->s_next = *phead ; *phead = pCur ;}/*int fds_find_wfd(pFD_PAIR phead, int fd_r){ while(phead) { if(phead ->s_rfd == fd_r) { break ; }else { phead = phead ->s_next ; } } if(phead == NULL) { return -1 ; }else { return phead ->s_wfd ; }} */void fds_link_delete(pFD_PAIR* phead,int fd_r){ pFD_PAIR pPre , pCur ; pPre = NULL ; pCur = *phead ; while(pCur) { if(pCur ->s_rfd == fd_r) { break ; }else { pPre = pCur ; pCur = pCur ->s_next ; } } if(pPre == NULL) { *phead = pCur ->s_next ; free(pCur); pCur = NULL ; }else { pPre ->s_next = pCur ->s_next ; free(pCur); pCur = NULL; }}
task.c
/************************************************************************* > File Name: ./src/task.c > Author: KrisChou > Mail:zhoujx0219@163.con > Created Time: Tue 26 Aug 2014 07:38:11 PM CST ************************************************************************/#include "server.h"static int que_empty(pQUEUE pq){ return pq -> s_front == pq -> s_tail ;}static int que_full(pQUEUE pq){ return (pq -> s_tail + 1)%(TASK_CNT + 1) == pq -> s_front ;}static int que_cnt(pQUEUE pq){ return (pq ->s_tail - pq ->s_front + TASK_CNT + 1)%(TASK_CNT + 1) ;}void add_task(pQUEUE pq,pTASK pt ){ pthread_mutex_lock(&mutex); while(que_full(pq)) { pthread_cond_wait(&cond_master, &mutex); } pq ->s_arr[pq ->s_tail] = *pt ; pq ->s_tail = (pq ->s_tail + 1)%(TASK_CNT + 1) ; //if(que_cnt(pq) == 1) { pthread_cond_broadcast(&cond_slave); } pthread_mutex_unlock(&mutex); sleep(1);}void get_task(pQUEUE pq, pTASK pt){ pthread_mutex_lock(&mutex); while(que_empty(pq)) { pthread_cond_wait(&cond_slave, &mutex); } *pt = (pq ->s_arr)[pq ->s_front] ; pq -> s_front = (pq -> s_front + 1)%(TASK_CNT + 1);// if(que_cnt(pq) == TASK_CNT - 1) { pthread_cond_broadcast(&cond_master); } pthread_mutex_unlock(&mutex); sleep(1); }void excute_task(pTASK pt){ write(pt ->s_fd, pt ->s_msg, strlen(pt ->s_msg));}
客户端测试client.c
/************************************************************************* > File Name: client.c > Author: KrisChou > Mail: zhoujx0210@163.com > Created Time: Tue 26 Aug 2014 08:56:18 PM CST ************************************************************************/#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <fcntl.h>#include <sys/stat.h>#include <sys/types.h>int main(int argc, char* argv[])//exe fifo{ int fd_server, fd_snd, fd_recv ; char rname[32], wname[32]; fd_server = open(argv[1], O_WRONLY); memset(rname, 0, 32); memset(wname, 0, 32); sprintf(rname,"r.%d", getpid()); sprintf(wname,"w.%d", getpid()); mkfifo(rname,0666); mkfifo(wname,0666); char msg[1024]=""; sprintf(msg,"%d\n", getpid()); write(fd_server, msg, strlen(msg)); fd_recv = open(rname,O_RDONLY); fd_snd = open(wname,O_WRONLY); while(memset(msg, 0, 1024), fgets(msg, 1024, stdin) != NULL) { write(fd_snd, msg, strlen(msg)); memset(msg, 0, 1024); read(fd_recv, msg, 1024); write(1, msg, strlen(msg)); } close(fd_server); close(fd_snd); close(fd_recv); return 0 ;}
Makefile
SRC_DIR := ./srcINC_DIR := ./include EXE_DIR := ./binCC := gcc CFLAGS := -g -oSRC_OBJECTS := $(wildcard $(SRC_DIR)/*.c)INC_OBJECTS := $(wildcard $(INC_DIR)/*.h)$(EXE_DIR)/main : $(SRC_OBJECTS) $(INC_OBJECTS) $(CC) $(CFLAGS) $@ $(SRC_OBJECTS) -I$(INC_DIR) -lpthread
Linux多线程之同步3