首页 > 代码库 > Linux基础——多线程实现任务

Linux基础——多线程实现任务

 

这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。

声明代码如下:

 1 #ifndef _HEAD_H 2 #define _HEAD_H 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 #include <string.h> 7 #include <sys/stat.h> 8 #include <sys/select.h> 9 #include <sys/types.h>10 #include <fcntl.h>11 #include <pthread.h>12 #include <sys/time.h>13 #include <signal.h>14 #define MSG_LEN 102415 #define TASK_CNT 102416 extern pthread_mutex_t lock;17 extern pthread_cond_t cond1;18 extern pthread_cond_t cond2;19 typedef struct tag_fd20 {21     int s_rfd;22     int s_wfd;23     struct tag_fd *next;24 }FD,*pFD;25 typedef struct tag_task26 {27     char s_msg[MSG_LEN];28     int s_fd;29 }TASK,*pTASK;30 typedef struct tag_que31 {32     TASK arr[TASK_CNT+1];33     int front;34     int tail;35 }QUEUE,*pQUEUE;36 void fd_insert(pFD *phead,int rfd,int wfd);37 void fd_init(pFD *phead);38 int  fd_find(pFD phead,int rfd);39 void fd_del(pFD *phead,int rfd);40 void add_task(pQUEUE pq,pTASK pt);41 void get_task(pQUEUE pq,pTASK pt);42 void excute_task(pTASK pt);43 #endif
View Code

 

我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
实现代码如下:

 1 #include "head.h" 2 void fd_init(pFD *phead) 3 { 4     *phead= NULL; 5 } 6 void fd_insert(pFD *phead,int rfd,int wfd) 7 { 8     pFD pnew = (pFD )calloc(1,sizeof(FD)); 9     pnew->s_rfd=rfd;10     pnew->s_wfd=wfd;11     pnew->next = *phead;12     *phead = pnew;13 }14 int fd_find(pFD phead,int rfd)15 {16     while(phead)17     {18         if(phead->s_rfd==rfd)19             break;20         else21             phead = phead->next;22     }23     if(phead == NULL)24         return -1;25     else26         return phead->s_wfd;27 }28 29 void fd_del(pFD *phead,int rfd)30 {31     pFD pcur,ppre;32     pcur=*phead;33     ppre=NULL;34     while(pcur)35     {36         if(pcur->s_rfd == rfd)37             break;38         else39         {40             ppre=pcur;41             pcur = pcur ->next;42         }43     }44     if(ppre==NULL)45     {46         *phead=pcur->next;47         free(pcur);48         pcur=NULL;49     }50     else51     {52         ppre->next=pcur->next;53         free(pcur);54         pcur=NULL;55     }56 }
View Code

然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。

实现代码如下:

 1 #include "head.h" 2 static int que_empty(pQUEUE pq) 3 { 4     return pq->front == pq->tail; 5 } 6 static int que_full(pQUEUE pq) 7 { 8     return (pq->tail+1)%(TASK_CNT+1)==pq->front; 9 }10 static int que_cnt(pQUEUE pq)11 {12     return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1);13 }14 void add_task(pQUEUE pq ,pTASK pt)15 {16     pthread_mutex_lock(&lock);17     while(que_full(pq))18         pthread_cond_wait(&cond1,&lock);19     pq->arr[pq->tail]=*pt;20     pq->tail = (pq->tail+1)%(TASK_CNT+1);21     if(que_cnt(pq)==1)22         pthread_cond_broadcast(&cond2);23     printf("添加了一个任务!!\n");24     pthread_mutex_unlock(&lock);25 }26 void get_task(pQUEUE pq ,pTASK pt)27 {28     pthread_mutex_lock(&lock);29     while(que_empty(pq))30         pthread_cond_wait(&cond2,&lock);31     *pt=pq->arr[pq->front];32     pq->front = (pq->front+1)%(TASK_CNT+1);33     if(que_cnt(pq)== TASK_CNT -1)34         pthread_cond_broadcast(&cond1);35     printf("获得了一个任务!!\n");36     pthread_mutex_unlock(&lock);37 }38 39 40 void excute_task(pTASK pt)41 {42     char buf[1024];43     memset(buf,0,1024);44     strcpy(buf,pt->s_msg);45     int index;46     for(index=0;index < strlen(buf);index++)47         buf[index]=toupper(buf[index]);48     buf[index]=\0;49     write(pt -> s_fd,buf,strlen(buf));50 }
View Code

最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。

服务器实现代码如下:

  1 #include "head.h"  2 pthread_mutex_t lock;  3 pthread_cond_t cond1,cond2;  4 void* hand(void* arg)  5 {  6     pthread_detach(pthread_self());  7     TASK task;  8     pQUEUE pq = (pQUEUE)arg;  9     while(1) 10     { 11         get_task(pq,&task); 12         excute_task(&task); 13         sleep(1); 14     } 15 } 16 int main(int argc,char *argv[]) 17 { 18     if(argc != 3) 19     { 20         perror("参数错误!!\n"); 21         exit(1); 22     } 23     signal(SIGINT,SIG_IGN); 24     signal(SIGPIPE,SIG_IGN); 25     signal(SIGQUIT,SIG_IGN); 26     QUEUE que; 27     int fd; 28     fd_set read_set,revc; 29     pFD list; 30     memset(&que,0,sizeof(QUEUE)); 31     fd_init(&list); 32     int cnt = atoi(argv[2]); 33     pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t)); 34     pthread_mutex_init(&lock,NULL); 35     pthread_cond_init(&cond1,NULL); 36     pthread_cond_init(&cond2,NULL); 37     int index=0; 38     while(cnt > 0) 39     { 40         pthread_create(arr+index,NULL,hand,(void*)&que); 41         cnt--; 42         index++; 43     } 44     fd = open(argv[1],O_RDONLY); 45     if(fd == -1) 46     { 47         perror("管道打开失败!!\n"); 48         exit(1); 49     } 50     struct timeval tm; 51     int ret; 52     FD_ZERO(&read_set); 53     FD_ZERO(&revc); 54     FD_SET(fd,&read_set); 55     while(1) 56     { 57         tm.tv_sec=0; 58         tm.tv_usec=1000; 59         revc = read_set; 60         ret=select(1024,&revc,NULL,NULL,&tm); 61         if(ret == 0) 62             continue; 63         else if(ret > 0) 64         { 65             if(FD_ISSET(fd,&revc)) 66             { 67                 char buf[32]; 68                 memset(buf,0,32); 69                 if(read(fd,buf,32)==0) 70                     continue; 71                 else 72                 { 73                     char name[32]; 74                     int r_fd,w_fd; 75                     buf[strlen(buf)-1]=\0; 76                     memset(name,0,32); 77                     sprintf(name,"r.%s",buf); 78                     w_fd=open(name,O_WRONLY); 79                     memset(name,0,32); 80                     sprintf(name,"w.%s",buf); 81                     r_fd=open(name,O_RDONLY); 82                     fd_insert(&list,r_fd,w_fd); 83                     FD_SET(r_fd,&read_set); 84                 } 85             } 86         } 87         pFD pcur=list; 88         while(pcur) 89         { 90             if(FD_ISSET(pcur->s_rfd,&revc)) 91             { 92                 char buf[1024]; 93                 memset(buf,0,1024); 94                 if(read(pcur->s_rfd,buf,1024)==0) 95                 { 96                     FD_CLR(pcur->s_rfd,&read_set); 97                     int i=pcur->s_rfd; 98                     pcur=pcur->next; 99                     fd_del(&list,i);100                 }101                 else102                 {103                     TASK tk;104                     memset(&tk,0,sizeof(TASK));105                     tk.s_fd=pcur->s_wfd;106                     strcpy(tk.s_msg,buf);107                     add_task(&que,&tk);108                     pcur=pcur->next;109                 }110             }111             else112                 pcur=pcur->next;113         }114     }115     pthread_mutex_destory(&lock);116     pthread_cond_destory(&cond1);117     pthread_cond_destory(&cond2);118     return 0;    119 }
View Code

客户端实现代码如下:

 1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <fcntl.h> 4 #include <unistd.h> 5 #include <string.h> 6 #include <sys/stat.h> 7 #include <sys/types.h> 8 int main(int argc,char *argv[]) 9 {10     int fd_server,send,revc;11     char rname[32],wname[32];12     memset(rname,0,32);13     memset(wname,0,32);14     sprintf(rname,"r.%d",getpid());15     sprintf(wname,"w.%d",getpid());16     mkfifo(rname,0666);17     mkfifo(wname,0666);18     fd_server=open(argv[1],O_WRONLY);19     char msg[1024];20     memset(msg,0,1024);21     sprintf(msg,"%d\n",getpid());22     write(fd_server,msg,strlen(msg));23     revc=open(rname,O_RDONLY);24     send=open(wname,O_WRONLY);25     while(memset(msg,0,1024),fgets(msg,1024,stdin))26     {27         write(send,msg,strlen(msg));28         memset(msg,0,1024);29         read(revc,msg,1024);30         write(1,msg,strlen(msg));31     }32     close(fd_server);33     close(send);34     close(revc);35     unlink(rname);36     unlink(wname);37     return 0;38 }
View Code

 

Linux基础——多线程实现任务