首页 > 代码库 > linux进程间通信之消息队列
linux进程间通信之消息队列
我们已经知道进程通信的方式是有多种的,在上一篇博客中讲述了通过管道实现简单的进程间通信,那么接下来我们看看与之类似的另一种方式,通过消息队列来实现进程间通信。
什么是消息队列
消息队列提供了一种由一个进程向另一个进程发送块数据的方法。另外,每一个数据块被看作有一个类型,而接收进程可以独立接收具有不同类型的数据块。消息队列的好处在于我们几乎可以完全避免同步问题,并且可以通过发送消息屏蔽有名管道的问题。更好的是,我们可以使用某些紧急方式发送消息。坏处在于,与管道类似,在每一个数据块上有一个最大尺寸限制,同时在系统中所有消息队列上的块尺寸上也有一个最大尺寸限制。
如何使用消息队列
我们先来了解一下System V IPC
System V IPC
System V IPC指的是AT&T在System V.2发行版中引入的三种进程间通信工具:(1)信号量,用来管理对共享资源的访问 (2)共享内存,用来高效地实现进程间的数据共享 (3)消息队列,用来实现进程间数据的传递。我们把这三种工具统称为System V IPC的对象,每个对象都具有一个唯一的IPC标识符(identifier)。要保证不同的进程能够获取同一个IPC对象,必须提供一个IPC关键字(IPC key),内核负责把IPC关键字转换成IPC标识符。
System V IPC具有相似的语法,一般操作如下:
(1)选择IPC关键字,可以使用如下三种方式:
a)IPC_PRIVATE。由内核负责选择一个关键字然后生成一个IPC对象并把IPC标识符直接传递给另一个进程。
b)直接选择一个关键字。
c)使用ftok()函数生成一个关键字。
(2)使用semget()/shmget()/msgget()函数根据IPC关键字key和一个标志flag创建或访问IPC对象。如果key是IPC_PRIVATE;或者key尚未与已经存在的IPC对象相关联且flag中包含IPC_CREAT标志,那么就会创建一个全新的IPC对象。
(3)使用semctl()/shmctl()/msgctl()函数修改IPC对象的属性。
(4)使用semctl()/shmctl()/msgctl()函数和IPC_RMID标志销毁IPC实例。
System V IPC为每个IPC对象设置了一个ipc_perm结构体并在创建IPC对象的时候进行初始化。这个结构体中定义了IPC对象的访问权限和所有者:
struct ipc_perm{
uid_t uid; //所有者的用户id
gid_t gid; //所有者的组id
uid_t cuid; //创建者的用户id
gid_t cgid; //创建者的组id
mode_t mode; //访问模式
…
};
可以在/usr/include/linux/ipc.h目录进行查看。
shell中管理IPC对象的命令是ipcs、ipcmk和ipcrm。
很多人看到这里估计就看不下去了,我也是,怎么这么麻烦啊,这么多相关的东西。别急,我们再来继续分析,我们要使用消息队列、信号量和共享内存就必须知道上面这些,具体怎样实际的应用呢,我们继续来看。
在上面提到了IPC关键字,这里我们采用ftok函数来生成关键字,下面是一些在使用中会涉及的函数
》ftok函数
函数ftok把一个已存在的路径名和一个整数标识得转换成一个key_t值,称为IPC键:
# include <sys/types.h> ?
# include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
参数介绍:*pathname, 路径名 proj_id,项目号,不为0
》int msgget(key_t key, int msgflg); 创建消息队列
参数介绍:
>key,IPC关键字
>msgflag,IPC_CREAT 如果IPC不存在,则创建一个IPC资源,否则打开操作。 IPC_EXCL:只有在共享内存不存在的时候,新的共享内存才建立,否则就产生错误。
如果单独使用IPC_CREAT, XXXget()函数要么返回一个已经存在的共享内存的操作符,要
么返回一个新建的共享内存的标识符。
如果将IPC_CREAT和IPC_EXCL标志一起使用, XXXget()将返回一个新建的IPC标识符
;如果该IPC资源已存在,或者返回-1。 IPC_EXEL标志本来并没有太大的意义,但是和IPC_CREAT标志一起使用可以来保证
》向队列读/写消息
msgrcv从队列中取?消息:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msgsnd将数据放到消息队列中:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数: msqid:消息队列的标识码 msgp:指向消息缓冲区的指针,此位置?来暂时存储发送和接收的消息,是一个用户可定义的通用结构,形态如下:
struct msgstru
{ long mtype; //大于于0
char mtext[用户指定大小]; };
msgsz:消息的大小。
msgtyp:从消息队列内读取的消息形态。如果值为零,则表示消息队列中的所有消息都会被读取。
msgflg:用来指明核心程序在队列没有数据的情况下所应采取的行动。如果msgflg和常数IPC_NOWAIT合用,则在msgsnd()执行时若是消息队列已满,则msgsnd()将不会阻塞,而会立即返回-1,如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。当msgflg为0时, msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取阻塞等待的处理模式。
》设置消息队列属性
原型: int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );
参数: msgctl 系统调用对 msgqid 标识的消息队列执行 cmd 操作,系统定义了 3 种 cmd 操作: IPC_STAT , IPC_SET , IPC_RMID?
IPC_STAT : 该命令用来获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指
定的地址空间。 ?
IPC_SET : 该命令用来设置消息队列的属性,要设置的属性存储在buf中。
IPC_RMID : 从内核中删除 msqid 标识的消息队列
示例代码:
comm.h:定义和函数声明
1 #pragma once 2 #include<stdio.h> 3 #include<sys/ipc.h> 4 #include<sys/msg.h> 5 #include<stdlib.h> 6 #include<string.h> 7 #include<unistd.h> 8 #include <sys/types.h> 9 #include<time.h>10 extern const int ser_send_type;//server11 extern const int cli_send_type;//client12 #define _PATH_NAME_ "/tmp"13 #define _PROJ_ID_ 0x11114 #define _SIZE_ 51215 16 int create_msg_queue();17 int get_msg_queue();18 int recv_msg(int msg_id,int type,char* out);19 int send_mag(int msg_id,int type,char* in);20 21 typedef struct msgbuf22 {23 long mtype;24 char mtext[_SIZE_];25 } msg_t;
comm.c:对server和client需要调用的一些函数的一些封装
1 #include"comm.h" 2 const int ser_send_type=1;//server 3 const int cli_send_type=2;//client 4 int comm_msg_queue(int flag) 5 { 6 key_t key = ftok(_PATH_NAME_,_PROJ_ID_); 7 if(key == -1) 8 { 9 perror("ftok");10 return -2;11 }12 int msg_id=msgget(key,flag);13 if(msg_id<0)14 perror("msgget");15 return msg_id;16 }17 int create_msg_queue()18 {19 int flag=IPC_CREAT|IPC_EXCL|0664;20 return comm_msg_queue(flag);21 }22 int get_msg_queue()23 {24 int flag=IPC_CREAT;25 return comm_msg_queue(flag);26 }27 28 int recv_msg(int msg_id,int type,char* out)29 {30 msg_t msg;31 msg.mtype=type;32 size_t ret=msgrcv(msg_id,&msg,sizeof(msg.mtext),type,0);33 if(ret<0)34 {35 perror("msgrcv");36 return 1;37 }38 strcpy(out,msg.mtext);39 return 0;40 }41 int send_msg(int msg_id,int type,char* msg_in)42 {43 msg_t msg;44 msg.mtype=type;45 strncpy(msg.mtext,msg_in, strlen(msg_in)+1);46 size_t ret=msgsnd(msg_id,&msg,sizeof(msg.mtext),0);47 if(ret<0)48 {49 perror("msgsnd");50 return 2;51 }52 return 0;53 }54 int destroy_msg(int msg_id)55 {56 msgctl(msg_id,IPC_RMID,NULL);57 }
server.c:
1 #include "comm.h" 2 3 int main() 4 { 5 int msg_id=create_msg_queue(); 6 char buf[_SIZE_]; 7 while(1) 8 { 9 memset(buf,0,sizeof(buf));10 printf("please enter >:");11 fgets(buf,sizeof(buf)-1,stdin);12 if( strncmp(buf,"quit",4)==0)13 {14 printf("server bye!\n");15 break;16 }17 send_msg(msg_id,ser_send_type,buf);18 memset(buf,0,sizeof(buf));19 recv_msg(msg_id,cli_send_type,buf);20 printf("client:%s",buf);21 }22 destroy_msg(msg_id);23 return 0;24 }
client.c:
1 #include"comm.h" 2 int main() 3 { 4 int msg_id=get_msg_queue(); 5 if(msg_id < 0){ 6 return 1; 7 } 8 char buf[_SIZE_]; 9 while(1)10 {11 recv_msg(msg_id,ser_send_type,buf);12 printf("server:%s",buf);13 memset(buf,0,sizeof(buf));14 15 printf("please enter:");16 fgets(buf,sizeof(buf)-1,stdin);17 if(strncmp(buf,"quit",4)==0)18 {19 printf("client bye!\n");20 break;21 } 22 send_msg(msg_id,cli_send_type,buf);23 } 24 destroy_msg(msg_id);25 return 0;26 27 }
最后再贴出makefile文件:
.PHONY: allall:server client server:server.o comm.o gcc -o $@ $^client:client.o comm.o gcc -o $@ $^%.o:%.c gcc -c $<.PHONY:cleanclean: rm -f server client *.o
最后看一下我们高大上的结果:
linux进程间通信之消息队列