首页 > 代码库 > POSIX 消息队列

POSIX 消息队列

POSIX消息队列相关函数:

mq_open函数
功能:用来创建和访问一个消息队列
原型
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
参数
name: 某个消息队列的名字
oflag:与open函数类似,可以是O_RDONLY、O_WRONLY、O_RDWR,还可以按位或上O_CREAT、O_EXCL、O_NONBLOCK等。
mode:如果oflag指定了O_CREAT,需要设置mode。
返回值:成功返回消息队列文件描述符;失败返回-1

POSIX IPC名字限定
必须以/打头,并且后续不能有其它/ ,形如/somename
长度不能超过NAME_MAX

mq_close函数
功能:关闭消息队列
原型
mqd_t mq_close(mqd_t mqdes);
参数
mqdes : 消息队列描述符
返回值:成功返回0;失败返回-1

mq_unlink函数
功能:删除消息队列
原型
mqd_t mq_unlink(const char *name);
参数
name: 消息队列的名字
返回值:成功返回0;失败返回-1

mq_getattr/mq_setattr
功能:获取/设置消息队列属性
原型
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
返回值:成功返回0;失败返回-1

mq_attr结构体
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};

mq_send函数
功能:发送消息
原型
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
参数
mqdes:消息队列描述符
msg_ptr:指向消息的指针
msg_len:消息长度
msg_prio:消息优先级
返回值:成功返回0;失败返回-1

mq_receive函数
功能:接收消息
原型
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
参数
mqdes:消息队列描述符
msg_ptr:返回接收到的消息
msg_len:消息长度
msg_prio:返回接收到的消息优先级
返回值:成功返回接收到的消息字节数;失败返回-1
注意:返回指定消息队列中最高优先级的最早消息

mq_notify函数
功能:建立或者删除消息到达通知事件
原型
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
参数
mqdes:消息队列描述符
notification:
非空表示当消息到达且消息队列先前为空,那么将得到通知;
NULL表示撤消已注册的通知
返回值:成功返回0;失败返回-1
通知方式:
产生一个信号
创建一个线程执行一个指定的函数


union sigval {          /* Data passed with notification */
int     sival_int; /* Integer value */
void   *sival_ptr; /* Pointer value */
};
struct sigevent {
int          sigev_notify; /* Notification method */
int          sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function) (union sigval);
/* Function for thread notification */
void *sigev_notify_attributes;
/* Thread function attributes */
};

注意点:
1、任何时刻只能有一个进程可以被注册为接收某个给定队列的通知
2、当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出。
3、当通知被发送给它的注册进程时,其注册被撤消。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在从消息队列读出消息之前而不是之后。

01mq_open.c
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)




int main(void)
{
mqd_t mqid;
mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");


printf("mq_open succ\n");
mq_close(mqid);
return 0;
}

02mq_unlink.c
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)




int main(void)
{
mq_unlink("/abc");
return 0;
}

03mq_getattr.c

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)




int main(void)
{
mqd_t mqid;
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");


printf("mq_open succ\n");
struct mq_attr attr;
mq_getattr(mqid, &attr);
printf("max #msg=%ld max #bytes/msg=%ld #currently on queue=%ld\n", attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);


mq_close(mqid);
return 0;
}


04mq_send.c

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)


typedef struct stu
{
char name[32];
int age;
} STU;


int main(int argc, char *argv[])
{
if (argc != 2)
{
fprintf(stderr, "Usage: %s <prio>\n", argv[0]);
exit(EXIT_FAILURE);
}


mqd_t mqid;
mqid = mq_open("/abc", O_WRONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");


STU stu;
strcpy(stu.name, "test");
stu.age = 20;


unsigned prio = atoi(argv[1]);
mq_send(mqid, (const char*)&stu, sizeof(stu), prio);
mq_close(mqid);
return 0;
}

05mq_recv.c

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)


typedef struct stu
{
char name[32];
int age;
} STU;


int main(int argc, char *argv[])
{
mqd_t mqid;
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");




struct mq_attr attr;
        mq_getattr(mqid, &attr);
size_t size = attr.mq_msgsize;
STU stu;


unsigned prio;
if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
ERR_EXIT("mq_receive");


printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);
mq_close(mqid);
return 0;
}

06mq_notify.c

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <signal.h>


#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>


#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)


typedef struct stu
{
char name[32];
int age;
} STU;


mqd_t mqid;
size_t size;
struct sigevent sigev;


void handle_sigusr1(int sig)
{
mq_notify(mqid, &sigev);
        STU stu;


        unsigned prio;
        if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
                ERR_EXIT("mq_receive");


        printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);


}


int main(int argc, char *argv[])
{
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");


struct mq_attr attr;
        mq_getattr(mqid, &attr);
size = attr.mq_msgsize;


signal(SIGUSR1, handle_sigusr1);


sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;


mq_notify(mqid, &sigev);

for (;;)
pause();


mq_close(mqid);
return 0;
}

makefile:

.PHONY:clean all
CC=gcc
CFLAGS=-Wall -g
BIN=01mq_open 02mq_unlink 03mq_getattr 04mq_send 05mq_recv \
06mq_notify
all:$(BIN)
%.o:%.c
$(CC) $(CFLAGS) -c $< -o $@
01mq_open:01mq_open.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
02mq_unlink:02mq_unlink.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
03mq_getattr:03mq_getattr.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
04mq_send:04mq_send.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
05mq_recv:05mq_recv.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
06mq_notify:06mq_notify.o
$(CC) $(CFLAGS) $^ -o $@ -lrt
clean:
rm -f *.o $(BIN)