首页 > 代码库 > 进程间通信之消息队列通信

进程间通信之消息队列通信

概念

消息队列

  • 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
  • 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
  • 消息队列也有管道一样的不足,就是每条消息的最大长度是有上限的(MSGMAX),每个消息队列的总字节数(内核缓冲上限)是有上限的(MSGMNB),系统上消息队列的总数(消息条目数)也有一个上限(MSGMNI)

对比:

管道 消息
流管道 有边界
先进先出 可以后进入、先出来

消息大小三大限制
cat /proc/sys/kernel/msgmax最大消息长度限制
cat /proc/sys/kernel/msgmnb 消息队列总的字节数
cat /proc/sys/kernel/msgmni 消息条目数

IPC对象数据结构

IPC对象数据结构
内核为每个IPC对象维护一个数据结构

struct ipc_perm {
    key_t          __key;       /* Key supplied to xxxget(2) */
    uid_t          uid;         /* Effective UID of owner */
    gid_t          gid;         /* Effective GID of owner */
    uid_t          cuid;        /* Effective UID of creator */
    gid_t          cgid;        /* Effective GID of creator */
    unsigned short mode;        /* Permissions */
    unsigned short __seq;       /* Sequence number */
};

struct msqid_ds {
    struct ipc_perm msg_perm;     /* Ownership and permissions */
    time_t       msg_stime;    /* Time of last msgsnd(2) */
    time_t       msg_rtime;    /* Time of last msgrcv(2) */
    time_t       msg_ctime;    /* Time of last change */
    unsigned long    __msg_cbytes; /* Current number of bytes in
                        queue (nonstandard) */
    msgqnum_t        msg_qnum;     /* Current number of messages
                                                            in queue */
    msglen_t         msg_qbytes;   /* Maximum number of bytes
                                                allowed in queue */
    pid_t                     msg_lspid;      /* PID of last msgsnd(2) */
    pid_t                  msg_lrpid;      /* PID of last msgrcv(2) */
};

消息队列在内核中的表示

技术分享

基本API

消息队列函数

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

msgget函数

  • 功能:用来创建和访问一个消息队列
  • 原型
    int msgget(key_t key, int msgflg);
  • 参数
    key: 某个消息队列的名字
    msgflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
  • 返回值:成功返回一个非负整数,即该消息队列的标识码;失败返回-1
  • 关系图
    技术分享

  • 示例代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>


int main00()
{
  int msg_id;

  //不存在就报错--必须存在
  //消息队列的id代表一个队列--队列里面可以有多个元素--具体的消息条目
  msg_id = msgget(0x1234,0666);//类似于打开文件,第一个参数表示消息队列的名字(文件名)--只不过用数字表示
  if(-1 == msg_id)//如果errno==ENOENT--不存在消息队列
  {
    perror("msgget");
    return -1;
  }

  printf("Create msg successfully msg_id:%d\n",msg_id);

  return 0;
}

int main01()
{
  int msg_id;
  //若存在使用旧队列,不会报错
  //不存在就创建
  //消息队列的id代表一个队列--队列里面可以有多个元素--具体的消息条目
  msg_id = msgget(0x1234,0666|IPC_CREAT);//类似于打开文件,第一个参数表示消息队列的名字(文件名)--只不过用数字表示
  if(-1 == msg_id)//如果errno==ENOENT--不存在消息队列
  {
    perror("msgget");
    return -1;
  }

  printf("Create msg successfully msg_id:%d\n",msg_id);

  return 0;
}


//IPC_CREAT|IPC_EXCL在一起--会检查消息队列是否已存在,如果存在不会创建,提示已存在
//单独使用IPC_EXCL无意义
int main02()
{
  int msg_id;
  //若存在使用旧队列
  //不存在就创建
  msg_id = msgget(0x1234,0666|IPC_CREAT|IPC_EXCL);//类似于打开文件,第一个参数表示消息队列的名字(文件名)
  if(-1 == msg_id)//如果errno==EEXIST--消息队列已存在
  {
    perror("msgget");
    return -1;
  }

  printf("Create msg successfully msg_id:%d\n",msg_id);

  return 0;
}


//创建的消息队列只在自己的进程家族(哪怕是同一个用户的进程都不一定可以,这里的家族针对的是进程关系,不是系统里的用户关系)中使用,
//不在没有血缘关系的进程之间使用,
//key_t最后是0,使用IPC_PRIVATE以后,IPC_CREAT|IPC_EXCL不会检查到已存在的消息队列,没有实质性的作用
//每次重新调用以后都会创建新的消息队列,msg_id都不一样
//意味着即使msg_id传送给其他进程,其他进程也不能用(可以通过fork--血缘关系使用)
//IPC_PRIVATE是一个宏定义--0
int main03()
{
  int msg_id;
  //若存在使用旧队列
  //不存在就创建
  //msg_id = msgget(IPC_PRIVATE,0666|IPC_CREAT|IPC_EXCL);//类似于打开文件,第一个参数表示消息队列的名字(文件名)
  msg_id = msgget(IPC_PRIVATE,0666);//和上一句话一样的效果
  if(-1 == msg_id)
  {
    perror("msgget");
    return -1;
  }

  printf("Create msg successfully msg_id:%d\n",msg_id);

  return 0;
}  

//针对消息队列的所属者,会进行权限检查,低权限创建的,不可用高权限打开
//至于同组用户和其他用户不进行检查
int main()
{
  int msg_id;
  //若存在使用旧队列
  //不存在就创建
  //msg_id = msgget(IPC_PRIVATE,0666|IPC_CREAT|IPC_EXCL);//类似于打开文件,第一个参数表示消息队列的名字(文件名)
  msg_id = msgget(0x1236,0666|IPC_CREAT);//和上一句话一样的效果
  if(-1 == msg_id)//如果errno==EEXIST--消息队列已存在
  {
    perror("msgget");
    return -1;
  }

  printf("Create msg successfully msg_id:%d\n",msg_id);

  msg_id = msgget(0x1236,0666);//和上一句话一样的效果
  if(-1 == msg_id)//如果errno==EEXIST--消息队列已存在
  {
    perror("msgget");
    return -1;
  }
  printf("Create msg successfully msg_id:%d\n",msg_id);

  return 0;
}

msgctl函数

  • 功能:消息队列的控制函数
  • 原型
    int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  • 参数
    msqid: 由msgget函数返回的消息队列标识码
    cmd:是将要采取的动作,(有三个可取值)
  • 返回值:成功返回0,失败返回-1
  • 将要采取的动作(有三个可取值),分别如下:

技术分享

  • 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>

//IPC对象是Linux内核实现持久化--从内核获取消息队列信息以及修改消息队列

int main()
{
  int msg_id;
  int ret = 0;
  struct msqid_ds buf;

  msg_id = msgget(0x1236,0666);
  if(-1 == msg_id)
  {
    perror("msgget");
    return -1;
  }

  memset(&buf,0,sizeof(struct msqid_ds));
  ret = msgctl(msg_id,IPC_STAT,&buf);//IPC_STAT获取消息队列的状态
  if(-1 == ret)
  {
    perror("msgctl");
    return -1;
  }

  printf("perm:%o\n",buf.msg_perm.mode);//权限
  printf("bytes:%d\n",(int)buf.__msg_cbytes);//消息队列中的字节数
  printf("number of msg:%d\n",(int)buf.msg_qnum);//消息条目数

  printf("Input any key to modify the perm...\n");
  getchar();
  buf.msg_perm.mode = 0644;
  ret = msgctl(msg_id,IPC_SET,&buf);//IPC_SET修改消息队列--修改之前先获取
  if(-1 == ret)
  {
    perror("msgctl");
    return -1;
  }

  printf("Input any key to delete the msg...\n");
  getchar();
  ret = msgctl(msg_id,IPC_RMID,&buf);//IPC_RMID从内核删除消息队列
  if(-1 == ret)
  {
    perror("msgctl");
    return -1;
  }
  printf("delete successfully\n");

  return 0;
}

msgsnd函数

  • 功能:把一条消息添加到消息队列中
  • 原型
    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • 参数
    msgid: 由msgget函数返回的消息队列标识码
    msgp:是一个指针,指针指向准备发送的消息,
    msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型
    msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情
  • 返回值:成功返回0;失败返回-1
  • msgflg=IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。
  • 消息结构在两方面受到制约。首先,它必须小于系统规定的上限值;其次,它必须以一个long int长整数开始,接收者函数将利用这个长整数确定消息的类型
  • 消息结构参考形式如下:
    struct msgbuf {
        long  mtype;
        char mtext[100];
    }
  • 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>

/*
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
*/

//定义消息结构体类型--只要long mtype; 开头就好,后面mtext的长度可以自由定义
typedef struct msgbuf {
   long mtype;       /* message type, must be > 0 */
   char mtext[1024];    /* message data */
}msgbuf_t;

int main(int argc,char **argv)
{
  int ret = 0;
  int msg_id;//消息id
  long type;//消息类型,对应struct msgbuf里的mtype
    int len;//消息长度,对应要发送到消息队列里的消息长度,不包括type的长度

  msgbuf_t buf;//消息结构体

  if (argc != 4)
    {
        fprintf(stderr, "Usage: %s <msg id> <msg type> <length bytes> \n", argv[0]);
        exit(EXIT_FAILURE);
    }

  //进行参数转换
  msg_id = atoi(argv[1]);
  type = atol(argv[2]);
    len = atoi(argv[3]);

  //填充消息结构体
  memset(&buf,0,sizeof(msgbuf_t));
  buf.mtype = type;
  strcpy(buf.mtext,"1122334455");//有效消息--发送的时候可以指定要发送的长度,也就是不一定全部发送完,但是注意不能越界--否则出现未定义行为

  //开始发送
  ret = msgsnd(msg_id,&buf,len,IPC_NOWAIT);//msg_id非法会自动记录错误类型到errno
  if(-1 == ret)
  {
    perror("msgsnd");
    return -1;
  }

  return 0;
}

msgrcv函数

  • 功能:是从一个消息队列接收消息
  • 原型
    ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  • 参数
    msgid: 由msgget函数返回的消息队列标识码
    msgp:是一个指针,指针指向准备接收的消息,
    msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型
    msgtype:它可以实现接收优先级的简单形式
    msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事
  • 返回值:成功返回实际放到接收缓冲区里去的字符个数,失败返回-1
  • 关于 msgtype
    msgtype=0返回队列第一条信息
    msgtype>0返回队列第一条类型等于msgtype的消息 
    msgtype<0返回队列第一条类型小于等于msgtype绝对值的消息,并且是满足条件的消息类型最小的消息
  • 关于 msgflg
    msgflg=IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。
    msgflg=MSG_NOERROR,消息大小超过msgsz时被截断
  • msgtype>0且msgflg=MSG_EXCEPT,接收类型不等于msgtype的第一条消息。
  • 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>

/*
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                      int msgflg);
*/


//定义消息结构体类型--只要long mtype; 开头就好,后面mtext的长度可以自由定义
typedef struct msgbuf {
   long mtype;       /* message type, must be > 0 */
   char mtext[1024];    /* message data */
}msgbuf_t;

int main(int argc,char **argv)
{
  int ret = 0;
  int msg_id;//消息id
  long type;//消息类型,对应struct msgbuf里的mtype
    int len;//消息长度,对应要要从消息队列里读取的消息长度,不包括type的长度

  msgbuf_t buf;//消息结构体--存储接收到的消息条目

  if (argc != 4)
    {
        fprintf(stderr, "Usage: %s <msg id> <msg type> <length bytes> \n", argv[0]);
        exit(EXIT_FAILURE);
    }

  //进行参数转换
  msg_id = atoi(argv[1]);
  type = atol(argv[2]);
    len = atoi(argv[3]);

  //初始化消息结构体
  memset(&buf,0,sizeof(msgbuf_t));

  //开始接收--会自动填充msgbuf_t的mtype成员,将指定长度len的数据从消息队列的消息条目里复制到缓冲区buf的mtext
  ret = msgrcv(msg_id,&buf,len,type,IPC_NOWAIT);//msg_id非法会自动记录错误类型到errno
  if(-1 == ret) 
  {
    perror("msgsnd");
    return -1;
  }
  buf.mtext[ret] = ‘\0‘;//补充尾部的字符串结束符
  printf("buf.mtext:%s\nbuf.mtype:%ld\n",buf.mtext,buf.mtype);

  return 0;
}

综合实例

  • API集成使用–自己给自己发消息
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>


#define MSG_BUF 256
typedef struct msgbuf
{
  long mtype;
  char mtext[MSG_BUF];
}msgbuf_t;
int main()
{
  int ret = 0;
  int msg_id = 0;
  msgbuf_t my_msg;
  key_t key;
  long type = getpid();

  key = ftok("./",‘a‘);

  msg_id = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建
  if(msg_id == -1)
  {
    if(errno == EEXIST)
    {
      printf("EEXIST\n");
      key = ftok("./",‘a‘);
      msg_id = msgget(key,IPC_CREAT|0666);
    }
    else
    {
      perror("msget");
      exit(-1);
    }
  }

  my_msg.mtype = type;//填充类型

  strcpy(my_msg.mtext,"asdfghjkl");//填充数据/消息

  ret = msgsnd(msg_id,&my_msg,sizeof(my_msg.mtext),IPC_NOWAIT);//发送
  if(ret == -1)
  {
    perror("msgsnd");
    exit(-1);
  }



  sleep(1);

  memset(&my_msg,0,sizeof(my_msg));

  ret = msgrcv(msg_id,&my_msg,sizeof(my_msg.mtext),type,IPC_NOWAIT);//接收/获取消息
  if(ret == -1)
  {
    perror("msgsnd");
    exit(-1);
  }

  printf("%s\n",my_msg.mtext);

  return 0;
}
  • C/S模型的多进程之间消息队列通信
    技术分享

服务器

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>


#define MSG_BUF 256
typedef struct msgbuf
{
  long mtype;
  char mtext[MSG_BUF];
}msgbuf_t;


#define MSGTOSRV 1
int main()
{
  int ret = 0;
  int msg_id = 0;
  msgbuf_t my_msg;
  key_t key;
  long type;

  key = ftok("./",‘a‘);

  msg_id = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建
  if(msg_id == -1)
  {
    if(errno == EEXIST)
    {
      printf("EEXIST\n");
      key = ftok("./",‘a‘);
      msg_id = msgget(key,IPC_CREAT|0666);
    }
    else
    {
      perror("msget");
      exit(-1);
    }
  }

  while(1)
  {
    memset(&my_msg,0,sizeof(my_msg));

    //取消息的时候要指定type
    ret = msgrcv(msg_id,&my_msg,sizeof(my_msg.mtext),MSGTOSRV,0);//接收/获取消息
    if(ret == -1)
    {
      perror("msgsnd");
      exit(-1);
    }

    printf("%d\t%s\n",(int)my_msg.mtype,my_msg.mtext+sizeof(long));

    type = (*(long *)my_msg.mtext);//填充类型
    my_msg.mtype = type;

    //发消息之前要封装type
    ret = msgsnd(msg_id,&my_msg,sizeof(my_msg.mtext),IPC_NOWAIT);//发送
    if(ret == -1)
    {
      perror("msgsnd");
      exit(-1);
    }

  }

  return 0;
}

客户端

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>


#define MSG_BUF 256
typedef struct msgbuf
{
  long mtype;
  char mtext[MSG_BUF];
}msgbuf_t;


#define MSGTOSRV 1
int main()
{
  int ret = 0;
  int msg_id = 0;
  msgbuf_t my_msg;
  key_t key;
  long type = getpid();

  key = ftok("./",‘a‘);

  msg_id = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建
  if(msg_id == -1)
  {
    if(errno == EEXIST)
    {
      printf("EEXIST\n");
      key = ftok("./",‘a‘);
      msg_id = msgget(key,IPC_CREAT|0666);
    }
    else
    {
      perror("msget");
      exit(-1);
    }
  }


  memset(&my_msg,0,sizeof(my_msg));


//思路:
//客户端发给服务器消息类型总是1--MSGTOSRV
//服务器端回给客户端的type是对方进程号
//相当于服务器端 从消息队列中收消息,然后服务器端分类型回复客户端(通过消息队列)


//n个进程通过消息队列进行交换
//有没有产生死锁的可能
//n个客户端向服务器发送消息(本质上是向内核消息队列发送消息),若消息队列满了;服务区回射时,会阻塞。。造成程序死锁
//即使,非阻塞。。。仍然可能阻塞。。
  while(1)
  {   
    my_msg.mtype = MSGTOSRV;
    strcpy(my_msg.mtext+sizeof(long),"TEST\n");
    (*(long *)my_msg.mtext) = type;
    ret = msgsnd(msg_id,&my_msg,sizeof(my_msg.mtext),IPC_NOWAIT);//发送
    if(ret == -1)
    {
      perror("msgsnd");
      exit(-1);
    }

    memset(&my_msg,0,sizeof(my_msg));

    ret = msgrcv(msg_id,&my_msg,sizeof(my_msg.mtext),type,0);//接收/获取消息
    if(ret == -1)
    {
      perror("msgsnd");
      exit(-1);
    }

    printf("%d\t%s\n",(int)my_msg.mtype,my_msg.mtext+sizeof(long));
  }

  return 0;
}

上述模型有待优化,因为可能会由于内核的消息缓冲区满了以后发生阻塞,导致死锁!!服务器端可以用多进程的方式实现消息队列单一方向的回流
技术分享

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    进程间通信之消息队列通信