首页 > 代码库 > 消息队列非阻塞

消息队列非阻塞

消息队列(也叫做报文队列)能够克服早期unix通信机制的一些缺点。作为早期unix通信机制之一的信号能够传送的信息量有限,后来虽然POSIX 1003.1b在信号的实时性方面作了拓广,使得信号在传递信息量方面有了相当程度的改进,但是信号这种通信方式更像"即时"的通信方式,它要求接受信号的进程在某个时间范围内对信号做出反应,因此该信号最多在接受信号进程的生命周期内才有意义,信号所传递的信息是接近于随进程持续的概念(process-persistent),管道及有名管道及有名管道则是典型的随进程持续IPC,并且,只能传送无格式的字节流无疑会给应用程序开发带来不便,另外,它的缓冲区大小也受到限制。

 

详见: http://www.ibm.com/developerworks/cn/linux/l-ipc/part3/

 

[cpp] view plaincopy
  1. /* 
  2.  * 
  3.  *       Filename:  producer.c 
  4.  * 
  5.  *    Description:  生产者进程 
  6.  * 
  7.  *        Version:  1.0 
  8.  *        Created:  09/30/2011 04:52:23 PM 
  9.  *       Revision:  none 
  10.  *       Compiler:  gcc(g++) 
  11.  * 
  12.  *         Author:  |Zhenghe Zhang|, |zhenghe.zhang@gmail.com| 
  13.  *        Company:  |Shenzhen XXX Technology Co., Ltd.| 
  14.  * 
  15.  */  
  16.   
  17. #include <stdio.h>  
  18. #include <stdlib.h>  
  19. #include <string.h>  
  20. #include <unistd.h>  
  21. #include <sys/ipc.h>  
  22. #include <sys/msg.h>  
  23. #include <error.h>  
  24.   
  25. #define BUFFER      10 //定义buf大小  
  26.   
  27. struct msgtype {  
  28.     long mtype;  
  29.     char buf1[BUFFER + 1];  
  30.     char buf2[BUFFER + 1];  
  31.     long size;  
  32. };  
  33.   
  34. int main()  
  35. {  
  36.     key_t msgkey;  
  37.   
  38.     /*消息队列*/  
  39.     int msgid;  
  40.     struct msgtype msg;  
  41.   
  42.     msgkey = ftok("/home/zhang/shmipcx", 10001);  
  43.     if(msgkey == -1)  
  44.     {  
  45.         perror("ftok");  
  46.         exit(1);  
  47.     }  
  48.   
  49.     /*得到消息队列标识符或创建一个消息队列对象并返回消息队列标识符*/  
  50.     msgid = msgget(msgkey, IPC_CREAT | 0666);  
  51.     if(msgid == -1)  
  52.     {  
  53.         perror("msgget");  
  54.         exit(1);  
  55.     }  
  56.   
  57.     int i = 0;  
  58.     while(i < 10)  
  59.     {  
  60.         memset(msg.buf1, 0, BUFFER + 1);  
  61.         memset(msg.buf2, 0, BUFFER + 1);  
  62.   
  63.         sprintf(msg.buf1, "buf1_0x%x", i);         
  64.         sprintf(msg.buf2, "buf2_0x%x", i + ‘a‘);  
  65.          
  66.         msg.mtype = 1001;  
  67.         msg.size  = i;  
  68.   
  69.         printf("msg.mtype = %ld, msg.size = %ld\t", msg.mtype, msg.size);  
  70.         printf("msg.buf1 = %s, msg.buf2 = %s\n", msg.buf1, msg.buf2);  
  71.   
  72.         /*将msgp消息写入到标识符为msqid的消息队列*/  
  73.         /*msgsz, 要发送消息的大小,不含消息类型占用的4个字节,即mtext的长度*/  
  74.         /*msgflg 0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列*/  
  75.         /*msgflg IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回*/  
  76.         /*msgflg IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。*/          
  77.         if(msgsnd(msgid, &msg, sizeof(struct msgtype) - sizeof(long), 0) == -1)  
  78.         {  
  79.             perror("msgsnd");  
  80.             exit(1);  
  81.         }  
  82.   
  83.         i++;  
  84.         sleep(1);     
  85.     }  
  86.   
  87.     sleep(30); //等待消费者进程退出  
  88.   
  89.     /*获取和设置消息队列的属性*/  
  90.     /*cmd IPC_STAT:获得msgid的消息队列头数据到buf中*/  
  91.     /*cmd IPC_RMID:删除消息队列*/  
  92.     if(msgctl(msgid, IPC_RMID, 0) == -1)  
  93.     {  
  94.         perror("msgctl");  
  95.         exit(1);  
  96.     }  
  97.   
  98.     return 0;  
  99. }  
  100.   
  101. /* 
  102.  * 
  103.  *       Filename:  consumer.c 
  104.  * 
  105.  *    Description:  消费者进程 
  106.  * 
  107.  *        Version:  1.0 
  108.  *        Created:  09/30/2011 04:52:23 PM 
  109.  *       Revision:  none 
  110.  *       Compiler:  gcc(g++) 
  111.  * 
  112.  *         Author:  |Zhenghe Zhang|, |zhenghe.zhang@gmail.com| 
  113.  *        Company:  |Shenzhen XXX Technology Co., Ltd.| 
  114.  * 
  115.  */  
  116.   
  117. #include <stdio.h>  
  118. #include <stdlib.h>  
  119. #include <string.h>  
  120. #include <unistd.h>  
  121. #include <sys/ipc.h>  
  122. #include <sys/msg.h>  
  123. #include <error.h>  
  124.   
  125. #define BUFFER      10 //定义buf大小  
  126.   
  127. struct msgtype {  
  128.     long mtype;  
  129.     char buf1[BUFFER + 1];  
  130.     char buf2[BUFFER + 1];  
  131.     long size;  
  132. };  
  133.   
  134. int main()  
  135. {  
  136.     key_t msgkey;    
  137.   
  138.     /*消息队列*/  
  139.     int msgid;  
  140.     struct msgtype msg;  
  141.   
  142.   
  143.     msgkey = ftok("/home/zhang/shmipcx", 10001);  
  144.     if(msgkey == -1)  
  145.     {  
  146.         perror("ftok");  
  147.         exit(1);  
  148.     }  
  149.   
  150.     msgid = msgget(msgkey, IPC_EXCL | 0666);  
  151.     if(msgid == -1)  
  152.     {  
  153.         perror("msgget");  
  154.         exit(1);  
  155.     }  
  156.    
  157.     int i = 0;  
  158.   
  159.     while(i < 10) //运行一个consumenr,为 10 ,同时运行两个consumer进程,为 5    
  160.     {  
  161.         memset(msg.buf1, 0, BUFFER + 1);  
  162.         memset(msg.buf2, 0, BUFFER + 1);  
  163.         msg.mtype = 1001;  
  164.         msg.size  = -1;  
  165.    
  166.         /*从标识符为msqid的消息队列读取消息并存于msgp中,读取后把此消息从消息队列中删除*/  
  167.         /*msgp 存放消息的结构体,结构体类型要与msgsnd函数发送的类型相同*/  
  168.         /*msgsz 要接收消息的大小,不含消息类型占用的4个字节*/  
  169.         /*msgtyp 0 接收第一个消息*/  
  170.         /*msgtyp <0 接收类型等于或者小于msgtyp绝对值的第一个消息*/  
  171.         /*msgtyp >0 接收类型等于msgtyp的第一个消息*/  
  172.         /*msgflg 0 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待*/  
  173.         /*msgflg IPC_NOWAIT 如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG*/  
  174.         /*msgflg IPC_EXCEPT 与msgtype配合使用返回队列中第一个类型不为msgtype的消息*/  
  175.         /*msgflg IPC_NOERROR 如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃*/  
  176.         if(msgrcv(msgid, &msg, sizeof(struct msgtype) - sizeof(long), msg.mtype, 0) == -1)  
  177.         {  
  178.             perror("msgrcv");  
  179.             exit(1);  
  180.         }  
  181.   
  182.         printf("msg.mtype = %ld, msg.size = %ld\t", msg.mtype, msg.size);  
  183.         printf("msg.buf1 = %s, msg.buf2 = %s\n", msg.buf1, msg.buf2);  
  184.   
  185.         i++;  
  186.         sleep(2);  
  187.     }  
  188.   
  189.     return 0;  
  190. }