首页 > 代码库 > Linux基础——多线程实现任务
Linux基础——多线程实现任务
这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。
声明代码如下:
1 #ifndef _HEAD_H 2 #define _HEAD_H 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 #include <string.h> 7 #include <sys/stat.h> 8 #include <sys/select.h> 9 #include <sys/types.h>10 #include <fcntl.h>11 #include <pthread.h>12 #include <sys/time.h>13 #include <signal.h>14 #define MSG_LEN 102415 #define TASK_CNT 102416 extern pthread_mutex_t lock;17 extern pthread_cond_t cond1;18 extern pthread_cond_t cond2;19 typedef struct tag_fd20 {21 int s_rfd;22 int s_wfd;23 struct tag_fd *next;24 }FD,*pFD;25 typedef struct tag_task26 {27 char s_msg[MSG_LEN];28 int s_fd;29 }TASK,*pTASK;30 typedef struct tag_que31 {32 TASK arr[TASK_CNT+1];33 int front;34 int tail;35 }QUEUE,*pQUEUE;36 void fd_insert(pFD *phead,int rfd,int wfd);37 void fd_init(pFD *phead);38 int fd_find(pFD phead,int rfd);39 void fd_del(pFD *phead,int rfd);40 void add_task(pQUEUE pq,pTASK pt);41 void get_task(pQUEUE pq,pTASK pt);42 void excute_task(pTASK pt);43 #endif
我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
实现代码如下:
1 #include "head.h" 2 void fd_init(pFD *phead) 3 { 4 *phead= NULL; 5 } 6 void fd_insert(pFD *phead,int rfd,int wfd) 7 { 8 pFD pnew = (pFD )calloc(1,sizeof(FD)); 9 pnew->s_rfd=rfd;10 pnew->s_wfd=wfd;11 pnew->next = *phead;12 *phead = pnew;13 }14 int fd_find(pFD phead,int rfd)15 {16 while(phead)17 {18 if(phead->s_rfd==rfd)19 break;20 else21 phead = phead->next;22 }23 if(phead == NULL)24 return -1;25 else26 return phead->s_wfd;27 }28 29 void fd_del(pFD *phead,int rfd)30 {31 pFD pcur,ppre;32 pcur=*phead;33 ppre=NULL;34 while(pcur)35 {36 if(pcur->s_rfd == rfd)37 break;38 else39 {40 ppre=pcur;41 pcur = pcur ->next;42 }43 }44 if(ppre==NULL)45 {46 *phead=pcur->next;47 free(pcur);48 pcur=NULL;49 }50 else51 {52 ppre->next=pcur->next;53 free(pcur);54 pcur=NULL;55 }56 }
然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。
实现代码如下:
1 #include "head.h" 2 static int que_empty(pQUEUE pq) 3 { 4 return pq->front == pq->tail; 5 } 6 static int que_full(pQUEUE pq) 7 { 8 return (pq->tail+1)%(TASK_CNT+1)==pq->front; 9 }10 static int que_cnt(pQUEUE pq)11 {12 return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1);13 }14 void add_task(pQUEUE pq ,pTASK pt)15 {16 pthread_mutex_lock(&lock);17 while(que_full(pq))18 pthread_cond_wait(&cond1,&lock);19 pq->arr[pq->tail]=*pt;20 pq->tail = (pq->tail+1)%(TASK_CNT+1);21 if(que_cnt(pq)==1)22 pthread_cond_broadcast(&cond2);23 printf("添加了一个任务!!\n");24 pthread_mutex_unlock(&lock);25 }26 void get_task(pQUEUE pq ,pTASK pt)27 {28 pthread_mutex_lock(&lock);29 while(que_empty(pq))30 pthread_cond_wait(&cond2,&lock);31 *pt=pq->arr[pq->front];32 pq->front = (pq->front+1)%(TASK_CNT+1);33 if(que_cnt(pq)== TASK_CNT -1)34 pthread_cond_broadcast(&cond1);35 printf("获得了一个任务!!\n");36 pthread_mutex_unlock(&lock);37 }38 39 40 void excute_task(pTASK pt)41 {42 char buf[1024];43 memset(buf,0,1024);44 strcpy(buf,pt->s_msg);45 int index;46 for(index=0;index < strlen(buf);index++)47 buf[index]=toupper(buf[index]);48 buf[index]=‘\0‘;49 write(pt -> s_fd,buf,strlen(buf));50 }
最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。
服务器实现代码如下:
1 #include "head.h" 2 pthread_mutex_t lock; 3 pthread_cond_t cond1,cond2; 4 void* hand(void* arg) 5 { 6 pthread_detach(pthread_self()); 7 TASK task; 8 pQUEUE pq = (pQUEUE)arg; 9 while(1) 10 { 11 get_task(pq,&task); 12 excute_task(&task); 13 sleep(1); 14 } 15 } 16 int main(int argc,char *argv[]) 17 { 18 if(argc != 3) 19 { 20 perror("参数错误!!\n"); 21 exit(1); 22 } 23 signal(SIGINT,SIG_IGN); 24 signal(SIGPIPE,SIG_IGN); 25 signal(SIGQUIT,SIG_IGN); 26 QUEUE que; 27 int fd; 28 fd_set read_set,revc; 29 pFD list; 30 memset(&que,0,sizeof(QUEUE)); 31 fd_init(&list); 32 int cnt = atoi(argv[2]); 33 pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t)); 34 pthread_mutex_init(&lock,NULL); 35 pthread_cond_init(&cond1,NULL); 36 pthread_cond_init(&cond2,NULL); 37 int index=0; 38 while(cnt > 0) 39 { 40 pthread_create(arr+index,NULL,hand,(void*)&que); 41 cnt--; 42 index++; 43 } 44 fd = open(argv[1],O_RDONLY); 45 if(fd == -1) 46 { 47 perror("管道打开失败!!\n"); 48 exit(1); 49 } 50 struct timeval tm; 51 int ret; 52 FD_ZERO(&read_set); 53 FD_ZERO(&revc); 54 FD_SET(fd,&read_set); 55 while(1) 56 { 57 tm.tv_sec=0; 58 tm.tv_usec=1000; 59 revc = read_set; 60 ret=select(1024,&revc,NULL,NULL,&tm); 61 if(ret == 0) 62 continue; 63 else if(ret > 0) 64 { 65 if(FD_ISSET(fd,&revc)) 66 { 67 char buf[32]; 68 memset(buf,0,32); 69 if(read(fd,buf,32)==0) 70 continue; 71 else 72 { 73 char name[32]; 74 int r_fd,w_fd; 75 buf[strlen(buf)-1]=‘\0‘; 76 memset(name,0,32); 77 sprintf(name,"r.%s",buf); 78 w_fd=open(name,O_WRONLY); 79 memset(name,0,32); 80 sprintf(name,"w.%s",buf); 81 r_fd=open(name,O_RDONLY); 82 fd_insert(&list,r_fd,w_fd); 83 FD_SET(r_fd,&read_set); 84 } 85 } 86 } 87 pFD pcur=list; 88 while(pcur) 89 { 90 if(FD_ISSET(pcur->s_rfd,&revc)) 91 { 92 char buf[1024]; 93 memset(buf,0,1024); 94 if(read(pcur->s_rfd,buf,1024)==0) 95 { 96 FD_CLR(pcur->s_rfd,&read_set); 97 int i=pcur->s_rfd; 98 pcur=pcur->next; 99 fd_del(&list,i);100 }101 else102 {103 TASK tk;104 memset(&tk,0,sizeof(TASK));105 tk.s_fd=pcur->s_wfd;106 strcpy(tk.s_msg,buf);107 add_task(&que,&tk);108 pcur=pcur->next;109 }110 }111 else112 pcur=pcur->next;113 }114 }115 pthread_mutex_destory(&lock);116 pthread_cond_destory(&cond1);117 pthread_cond_destory(&cond2);118 return 0; 119 }
客户端实现代码如下:
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <fcntl.h> 4 #include <unistd.h> 5 #include <string.h> 6 #include <sys/stat.h> 7 #include <sys/types.h> 8 int main(int argc,char *argv[]) 9 {10 int fd_server,send,revc;11 char rname[32],wname[32];12 memset(rname,0,32);13 memset(wname,0,32);14 sprintf(rname,"r.%d",getpid());15 sprintf(wname,"w.%d",getpid());16 mkfifo(rname,0666);17 mkfifo(wname,0666);18 fd_server=open(argv[1],O_WRONLY);19 char msg[1024];20 memset(msg,0,1024);21 sprintf(msg,"%d\n",getpid());22 write(fd_server,msg,strlen(msg));23 revc=open(rname,O_RDONLY);24 send=open(wname,O_WRONLY);25 while(memset(msg,0,1024),fgets(msg,1024,stdin))26 {27 write(send,msg,strlen(msg));28 memset(msg,0,1024);29 read(revc,msg,1024);30 write(1,msg,strlen(msg));31 }32 close(fd_server);33 close(send);34 close(revc);35 unlink(rname);36 unlink(wname);37 return 0;38 }
Linux基础——多线程实现任务
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。