首页 > 代码库 > Linux环境编程之IPC进程间通信(五):Posix消息队列1
Linux环境编程之IPC进程间通信(五):Posix消息队列1
对于管道和FIFO来说,必须应该先有读取者存在,否则先有写入者是没有意义的。而消息队列则不同,它是一个消息链表,有足够写权限的线程可往别的队列中放置消息,有足够读权限的线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。消息队列是随内核的持续性,一个进程可以往某个队列写入一些消息,然后终止,再让另外一个进程在以后的某个时刻读出这些消息。这跟管道和FIFO不一样,当一个管道或FIFO的最后一次关闭时,仍在该管道或FIFO上的数据将丢弃。队列中的每个消息都具有如下属性:
1、一个无符号整数优先级(Posix)或一个长整数类型(System V)
2、消息的数据部分长度(可以为0)
3、数据本身(如果长度大于0)
链表的头文件中含有当前队列的两个属性:队列中允许的最大消息数以及每个消息的最大大小。
消息队列的打开、关闭、删除函数
#include <mqueue.h> mqd_t mq_open(const char *name, int oflag, ..../*mode_t mode, struct mq_attr *attr */) // 返回值:成功返回消息队列描述符,出错返回-1 int mq_close(mqd_t mqdes) // 返回值:成功返回0,失败返回-1 int mq_unlink(const char *name) // 返回值:成功为0,失败-1调用mq_close函数后,调用进程可以不再使用该描述符,但其消息队列并不从系统中删除。一个进程终止时,它的所有打开着的消息队列都关闭,就像调用了mq_close一样。要删除消息队列,必须调用mq_unlink函数。其实,每个消息队列都有一个记录打开着描述符数的引用计数器,因而:当一个消息队列的引用计数仍大于0时,其name就能删除,但该队列的析构要到最后一个mq_close发生时才进行。Posix消息队列具有随内核的持续性,即使当前没有进城打开着某个消息队列,该队列及其上的各个消息也将一直存在,直到调用mq_unlink并让它的引用计数达到0以删除该队列为止。
创建消息队列:
#include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <mqueue.h> #include <sys/stat.h> #define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH) int Getopt(int argc, char *const *argv, const char *str) { int opt; if((opt = getopt(argc, argv, str)) == '?') exit(1); return (opt); } int main(int argc, char **argv) { int c, flags; mqd_t mqd; flags = O_RDWR | O_CREAT; while((c = Getopt(argc, argv, "e")) != -1){ switch(c){ case 'e': flags |= O_EXCL; break; } } if(optind != argc -1){ printf("usage:mqcreate [-e] <name>.\n"); return -1; } mqd = mq_open(argv[optind], flags, FILE_MODE, NULL); mq_close(mqd); exit(0); }注意此处的编译所用命令要加上链接选项"-lrt":gcc -lrt mqcreate1.c -o mqcreate1
存在的一个疑问:执行可执行文件后,我也不知道所创建的消息队列跑哪去了……
从系统中删除一个消息队列。
#include <mqueue.h> #include <stdlib.h> #include <stdio.h> int main(int argc, char **argv) { if(argc != 2){ printf("usage:mqunlink <name>.\n"); return 0; } if(mq_unlink(argv[1])){ printf("unlink mq error.\n"); return -1; } exit(0); }
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr); // 返回值:成功为0,出错为-1
每个消息队列都有一些属性,mq_getattr返回这些属性,mq_setattr则设置其中的某个属性,这些属性包含在结构体mq_attr里:
struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ };指向某个mq_attr结构的指针可作为mq_open的第四个参数传递,从而允许在创建新队列时,指定mq_maxmsg和mq_msgsize属性。mq_open忽略该结构的另外两个成员。
mq_getattr把所指定队列的当前属性填入由attr指向的结构。mq_setattr给所指定队列设置属性,但只使用attr指向的mq_attr结构的mq_flags成员,以设置或清除非阻塞标志。该结构的其他三个成员被忽略:每个队列的最大消息数和每个消息的最大字节数只能在创建队列时设置,队列中的当前消息数则只能获取而不能设置。
mqgetattr.c程序
#include <unistd.h> #include <mqueue.h> int main(int argc, char **argv) { mqd_t mqd; struct mq_attr attr; if(argc != 2){ printf("usage:mqgetattr <name>.\n"); return -1; } mqd = mq_open(argv[1], O_RDONLY); mq_getattr(mqd, &attr); printf("max #msgs = %ld, max #bytes/msg = %ld," "#currently on queue = %ld\n", attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); mq_close(mqd); exit(0); }创建消息队列时指定创建队列的最大消息数和每个消息的最大大小程序如下:
#include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <mqueue.h> #include <sys/stat.h> #define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH) struct mq_attr attr; /*mq_maxmsg and mq_msgsize both init to 0*/ int Getopt(int argc, char *const *argv, const char *str) { int opt; if((opt = getopt(argc, argv, str)) == '?') exit(1); return (opt); } int main(int argc, char **argv) { int c, flags; mqd_t mqd; flags = O_RDWR | O_CREAT; while((c = Getopt(argc, argv, "e")) != -1){ switch(c){ case 'e': flags |= O_EXCL; break; case 'm': attr.mq_maxmsg = atol(optarg); break; case 'z'; attr.mq_msgsize = atol(optarg); break; } } if(optind != argc -1){ printf("usage:mqcreate [-e] <name>.\n"); return -1; } if((attr.mq_maxmsg != 0 && attr.mq_msgsize == 0) || attr.mq_maxmsg == 0 && attr.mq_msgsize != 0){ printf("must specify both -m maxmsg and -z msgsize"); return -1; } mqd = mq_open(argv[optind], flags, FILE_MODE, (attr.mq_maxmsg != 0)?&attr:NULL); mq_close(mqd); exit(0); }mq_send和mq_receive函数分别用于往一个队列中放置一个消息和从一个队列中取走一个消息。每个消息有一个优先级,它是一个小于MQ_PRIO_MAX的无符号整数。
mq_receive总是返回所指定队列中最高优先级的最早小子,而且该优先级能随消息的内容及其长度一同返回。
#include <mqueue.h> int mq_send(mqd_t mqdes, const char ptr, size_t len, unisgned int prio); //返回值:成功返回0,出错-1 ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop); //返回值:成功返回0,出错-1mq_receive的len参数的值不能小于能加到所指定队列中的消息的最大大小(该队列mq_attr结构的mq_msgsize成员)。要是len小于该值,mq_receive理解返回EMSGSIZE错误。mq_send的prio参数是待发送消息的优先级,其必须小于MQ_PRIO_MAX。如果mq_receive的priop参数是一个非空指针,所返回消息的优先级就通过该指针存放。
mqsend.c程序:
#include <unistd.h> #include <mqueue.h> #include <stdlib.h> #include <stdio.h> int main(int argc, char **argv) { mqd_t mqd; void *ptr; size_t len; uint_t prio; if(argc != 4){ printf("usage:mqsen <name> <#bytes> <priority>.\n"); return -1; } len = atoi(argv[2]); prio = atoi(argv[3]); mqd = mq_open(argv[1], O_WRONLY); mq_send(mqd, ptr, len, prio); exit(0); }
#include <unistd.h> #include <mqueue.h> #include <stdlib.h> #include <stdio.h> int main(int argc, char **argv) { int c, flags; mqd_t mqd; ssize_t n; uint_t prio; void *buff; struct mq_attr attr; flags = O_RDONLY; while((c = getopt(argc, argv, "n")) != -1){ switch(c){ case 'n'; flags |= O_NONBLOCK; break; } } if(optind != argc -1){ printf("usage:mqreceive [-n] <name>.\n"); return -1; } mqd = mq_open(argv[optind], flags); mq_getattr(mqd, &attr); buff = malloc(attr.mq_msgsize); n = mq_receive(mqd, buff, attr.mq_msgsize, &prio); printf("read %ld bytes, priotiry = %u\n", (long)n, prio); exit(0); }
消息队列的限制:
1、创建消息队列时的队列中的最大消息数限制mq_mqxmsg、给定消息的最大字节数mq_msgsize
2、一个进程能够同时拥有的打开消息队列的最大数目MQ_OPEN_MAX、任意消息的最大优先级值MQ_PRIO_MAX加1(通过调用sysconf函数获取)
mqsysconf.c
#include <mqueue.h> #include <unistd.h> #include <stdio.h> int main(int argc, char **argv) { printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n", sysconf(_SC_MQ_OPEN_MAX), sysconf(_SC_MQ_PRIO_MAX)); return 0; }
参考:《UNP2》