首页 > 代码库 > ZeroMQ源码分析之Message

ZeroMQ源码分析之Message

使用ZeroMQ创建消息时的代码通常如下:


zmq_msg_tmsgName;	//1
zmq_msg_init(&msgName);	//2


这两条代码做了什么呢?

首先对第1行代码,在zmq.h中有如下定义:


typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t;


what?消息体就这样定义?也许这不是它的真面目。

看第2行代码,在zmq.cpp找到zmq_msg_init的实现方式。


<p style="margin-bottom: 0in; line-height: 100%">int zmq_msg_init(zmq_msg_t *msg_)</p><p style="margin-bottom: 0in; line-height: 100%">{</p><p style="margin-bottom: 0in; line-height: 100%">    return((zmq::msg_t*) msg_)->init ();</p><p style="margin-bottom: 0in; line-height: 100%">}</p>

晕,居然强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针。


现在来看一下这个神秘的msg_t,也就是ZeroMQ的消息类。

首先看msg.cppinit()的实现


<p style="margin-bottom: 0in; line-height: 100%">int zmq::msg_t::init()</p><p style="margin-bottom: 0in; line-height: 100%">{</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.type =type_vsm;</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.flags =0;</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.size = 0;</p><p style="margin-bottom: 0in; line-height: 100%">    return 0;</p><p style="margin-bottom: 0in; line-height: 100%">}</p>

可以大致看出就是初始化了消息的类型、标志和消息内容大小。对应的看一下在msg.hpp是如何定义这个消息结构体的:

msg.hppmsg_t类中有:


union {
            struct {
                unsigned char unused [max_vsm_size + 1];
                unsigned char type;
                unsigned char flags;
            } base;
            struct {
                unsigned char data [max_vsm_size];
                unsigned char size;
                unsigned char type;
                unsigned char flags;
            } vsm;
            struct {
                content_t *content;
                unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
                unsigned char type;
                unsigned char flags;
            } lmsg;
            struct {
                void* data;
                size_t size;
                unsigned char unused
                    [max_vsm_size + 1 - sizeof (void*) - sizeof (size_t)];
                unsigned char type;
                unsigned char flags;
            } cmsg;
            struct {
                unsigned char unused [max_vsm_size + 1];
                unsigned char type;
                unsigned char flags;
            } delimiter;
        } u;

可见这里利用union来压缩空间。union维护足够的空间来置放多个数据成员中的“一种”,而不是为每一个数据成员配置空间,在union中所有的数据成员共用一个空间,同一时间只能储存其中一个数据成员,所有的数据成员具有相同的起始地址。


从这里可以看出ZeroMQ的几种消息类型:vsm(verysmall message?), lmsg(long message?), cmsg(constant message) delimiter .


每个struct人为地控制为等长,其中unused数组就是用来控制每个struct的长度,使得后面的typeflags在每个struct中的存储位置是一样的.这样就可以做到,无论该消息是vsm或者lmsg或其他类型,只要调用u.base.type就能获取到这个消息的类型了.


enum {max_vsm_size =29};

通过vsm类型和lmsg类型的对比可以知道,ZeroMQ对短消息和长消息是区别对待的.对于短的消息,即不超过29字节的消息,直接复制赋值;而对于长消息,则需要在内存中分配空间,如下面代码所示:


//初始化消息大小

int zmq::msg_t::init_size (size_t size_)
{
    if (size_ <= max_vsm_size) {
	//当消息为小消息时
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
    }
    else {
		
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
		//消息为长消息,需要分配内存空间
        if (unlikely (!u.lmsg.content)) {
            errno = ENOMEM;
            return -1;
        }

        u.lmsg.content->data = http://www.mamicode.com/u.lmsg.content + 1;>

//初始化消息内容

int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
    void *hint_)
{
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
    assert (data_ != NULL || size_ == 0);
    
    //  Initialize constant message if there's no need to deallocate
    if(ffn_ == NULL) {
	//如果销毁函数为空,则该消息为常量消息
        u.cmsg.type = type_cmsg;
        u.cmsg.flags = 0;
        u.cmsg.data = http://www.mamicode.com/data_;>

这里有必要看一下上面出现的content的结构:


 struct content_t
        {
            void *data;
            size_t size;
            msg_free_fn *ffn;
            void *hint;
            zmq::atomic_counter_t refcnt;
        };


其中ffn为销毁消息时使用的函数指针,refcnt则是该消息被共享次数的计数器,当该计算器计数为0,即该消息以及没有被使用时,则该消息销毁.


在上面msg_t::init_data()中出现了这么一行:


new(&u.lmsg.content->refcnt) zmq::atomic_counter_t ();

使用了placementnew的写法.placementnew是用来实现定位构造的,也就是在取得了一块可以容纳指定类型对象的内存后,在这块内存上构造一个对象.new的深入了解,可以参考这个博客:http://blog.csdn.net/songthin/article/details/1703966.


再回过头来看最开头的地方,好像还有一个问题没解决:


typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t;

int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

这里做的强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针,为什么是32位呢,通过下面代码,对消息结构进行字节计算,不难发现每个消息结构就是占了32个字节的.只不过长消息中使用了指针指向了用于存储长消息数据的内存空间而已.所以不要被外表所蒙骗,要看到内在,才知道她的心是怎样的.


struct {
                unsigned char data [max_vsm_size];
                unsigned char size;
                unsigned char type;
                unsigned char flags;
            } vsm;
struct {
                content_t *content;
                unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
                unsigned char type;
                unsigned char flags;
            } lmsg;

enum {max_vsm_size =29}; 


ZeroMQ源码分析之Message