首页 > 代码库 > 消息队列
消息队列
什么是nmq
消息中间件是一个完备的、易于使用的消息队列系统,替代现有cm/transfer所有的功能,力求解决当前社区提交系统难运维、不通用等弊病,提供一个全流程支持、功能完善、性能可扩展、运维方便、可靠的消息队列及整套提交系统解决方案。 开发代号是NMQ。
背景
长期以来,社区几大产品线(贴吧、空间、知道、ks等)都独立维护着自己的提交系统。虽然产品逻辑和规模不同,但从实现上来讲,面对的问题相近,解决的思 路相似,维护重复、运维分散、人力浪费。为避免重复工作和促进提交集群收敛,我们希望完成一个大社区范畴下消息中间件的统一解决方案。
和cm/transfer区别
-
nmq是一套包含服务、运维平台的整体解决方案。
-
模型上,nmq 推的模型和cm/transfer一样。topic等同于cm,pusher等同于transfer。
-
除了推模式之外,nmq还支持拉的模型。
-
nmq自身支持提交消息的按产品线、按业务、按性能拆分。cm里用于标示命令号的cmd_no被扩展为product、topic、cmd。可以按product、topic将产品线和业务进行拆分。如果单机性能出现瓶颈,可以将同一个topic拆 分为不同的子topic。
-
nmq不分配自增id;nmq没有和业务相关的字段填充和字段检查的功能。
-
nmq自身支持发送消息时的并发、时序。
-
专门为nmq设计开发的mqp运维平台让运维上更方便。
-
pusher 支持transfer的延时发送功能,只需简单配置 sending_delay_time
名词解释
topic:按业务划分的消息序列,逻辑上的概念。产品线可以根据自己的业务特点划分,可大 可小。一般情况下,大产品线按各个子业务划分为不同的topic(例如:空间可以划分为博客、相册、用户等topic);小产品线可以整体作为一个 topic,如果业务复杂也可以划分为不同的topic。
子topic: 按性能划分的消息序列,实体上的概念。topic可以按partition-key的自定义规则,拆分为若干个子topic,每个子topic内的消息是严格保证时序的。
partition-key:消息划分或归属的依据,用于topic拆分子topic、竞争 消费时保证时序、多IDC时主IDC的归属。消息需自己指定partition-key,且只能有一个partition-key。如果不拆分子 topic或不关心时序,可以不指定partition-key。
竞争模式:同一个模块部署很多机器,但所有机器竞争消费同一份消息,比如c模块+mysql 主从模式下,c模块多机部署,但由于不同机器上的c模块都更新同一个主库,因此每条消息只需要被一个机器上的模块消费即可,不需要每条消息都分发给所有机 器;竞争模式,为以后的主流更新模式。
多主模式:同竞争模式不同,同一个模块的各个机器,都需要消费全量消息;一般模块内维护全量数据副本的场景下使用;大产品线的专有实现模块,一般是这种模式。
架构图和数据流图
上游模块向nmqproxy发送消息,消息经过消息中间件的nmqproxy转发到对应的topic server模块,topicserver将消息序列化到磁盘上,pusher扫描并读取到最新的消息,然后发送给下游模块。 除了主流的推模型外,nmq也支持“拉”模式,在拉模式下,模块所有的信息都保存在模块本地,模块通过nmq提供的pulllib同nmq保持心跳。
如何提交消息
1.接入
1.1怎么接入nmq
产品线之前接入cm/transfer有2种方式:
c/c++通过nshead+mcpack、php通过ral等方式。 产品线接入nmq与接入cm/transfer一般没有太大变化,一般来说只需要加入以下字段:
-
_product(产品线名称,7个字节以内)
-
_topic(产品线业务名称,7个字节以内)
-
_cmd(需要做的事,与之前cmd_no含义类似,只是_cmd为string,例如"addblog"等)
1.2注意项
nmq不再具有cm的id分配功能。解决方法,产品线调idalloc(arch或者space的idalloc)模块进行id分配,然后再提交到nmq;因此接入前先判断是否需要id分配功能。
下游转发不支持mysql.so和comdb.so。因此如果之前的业务是transfer下游直接是mysql或者comdb的话,需要产品线自行开发下游c或者php模块,实现更新db和comdb功能。
2.提交请求和响应
请求
向nmqproxy提交消息,使用nshead+mcpack2协议,同时兼任mcpack1格式
//请求包 { _product: "space", _topic: "album", _cmd: "album_add", _partition_key: "10001", //可选 _msg_unique_key:12333234, //可选 // 消息自定义的内容 }
响应
响应包中含_error_no和_error_msg字段,表明发送是否成功。注意,响应包中不含消息的原有字段和nmqproxy填充的字段。
//响应包 { _error_no: (int32)0, _error_msg: "OK", }
3.字段和规范
字段名约束:一个下划线开头的字段为保留字段。应用不能使用一个下划线开头的字段名。
字段值约束:只能使用大小写字母、数字、和下划线。
必须填的字段:_product、_topic、_cmd。
长度限制(含‘\0‘):product字段8个字符,topic字段8个字符,cmd字段16个字符。
-
product是产品线的唯一标示。
-
topic是业务划分的消息序列的表示,详见"名词解释"。
-
cmd是用有意义的字符串表示的命令号。
-
可选填的字段:_idc。长度限制(含‘\0‘).product字段8个字符
【二期才支持】idc是机房的标示,用于多IDC提交。
可选填的字段:_partition_key,_msg_unique_key。长度无限制。
partition_key用于拆分子topic、多IDC转发,一般使用用户id、吧id作为partition_key。
【暂不支持】msg_unique_key用于提交去重,建议使用随机性和区分度非常好的uuid。
4.配置实例
1.nmqproxy的ip是用资源定位发布的,因此需要上游模块配置上nmqproxy在zk的服务器和路径。
注:整个集群公用一组nmqproxy,所以使用的nmqproxy的资源定位是一样。
2.在nmqproxy中配置本应用的消息的路由信息。
修改nmqproxy/conf/nmqproxy.conf文件,在[topics]中增加:
#每个topic的配置 [.@topic] #产品线名称 product : tieba #topic名称 topic : post #是否启用,1或0。可选,默认为1 enable : 1 #是否走多IDC提交流程,1或0。可选,默认为0 multi_idc : 0 #拆分子topic的个数。可选,默认为1,表示不拆分 sub_topic_num : 1 #topic server的url,名称需要和ubclient配置里面的名称的一致,拆分时,自动在后面补0/1… topic_server_name_prefix : topic-tieba-post-commit pusher_server_name_prefix : pusher-tieba-post
product和topic是和提交消息里的字段保持一致。
最后的topic_server_name_prefix是topic_group的名称,需要和nmqproxy/conf/ubclient.conf保持一致的。
3.topic server的配置不需要修改
如何接收消息
一个后端模块,想从nmq接受消息,对rd来说,只需要提供该模块的2个配置文件即可。
建议该模块的模块命名为$(product)_$(module),那么需要下面2个配置文件:
-
module_$(product)_$(module).conf
-
machine_$(product)_$(module).conf
模块文件命名规范: 以下游名字来命名,比如下游叫abc模块,如果abc模块只用了某个product的数据,建议名为module_$(product)_abc.conf.如果abc是个很大的系统,用到很多模块的数据,允许module_abc.conf
module_$(product)_$(module).conf主要配置该模块的模块级配置,包括发送协议、接受的消息类型、并发时序控制、字段复制等等。
machine_$(product)_$(module).conf主要配置该模块的后端机器具体访问设置,包括ip、端口、超时等配置。
目前支持本地配置,也支持资源定位(zk/webfoot),该配置文件,同ubclient的配置文件基本一致,但在针对多主模块的配置时,针对每个机器增加了identifer和flag字段。另外多主模式的模块,不支持资源定位方式。
pusher配置范例
module_$(product)_$(module).conf + 竞争模式+ mcpack协议
下面是pusher的下游是c模块,mcpack接收的范例配置, 参考space_follow点击下载
[modules] [.@module] #模块名,需要跟ubclient配置里面的名称一致; name : space_follow #是否启用 1启用 0停用 默认0 flag : 1 #是竞争模式还是多主模式;0:竞争,1:多主 sending_type : 0 #转发协议配置; sending_protocol : mcpack #发送窗口大小,需要比线程数大; sending_window_size : 10 #发送线程数。竞争模式为竞争线程数,多主模式下,为每个机器的并发线程数 sending_thread_num : 8 #出错后重试的时间间隔(单位MS) sending_retry_time: 500 #命令过滤规则 ,接受的命令配置; # *表示通配,只能出现在字符串的最后,不能出现在中间 [..msg_filter] @filter:space.follow.* #时序控制规则,支持按照某个partition key的时序支持 [..sequence_control] #制定mutex key,必须是整形数类型的字段; mutex_key : qid #指定对于 没有PK的命令,是否强制串行处理; #1:是:该命令必须等待所有靠前的命令都执行完成,再执行。该命令执行前,靠后的命令也不能执行。(主要用于存在批量命令时,保证时序)。 #0:否:该命令完全无序£??到了就执行。 force_sequence_when_no_mutex_key : 1 #支持任意模块自定义配置、so自定义配置 [..ext_config] [..mcpack] #发送模式,0:nshead+mcpack, 1:ubrpc send_type : 0 #mcpack字段复制 #在消息中间件中,内部填充字段都是以"_"开头,比如_trans_id, _log_id等,为了兼容,可以在so中将这些字段复制改名; [...@mcpack_key_copy] from : _transid to : trans_id [...@mcpack_key_copy] from : _log_id to : log_id
machine_$(product)_$(module).conf + mcpack协议
下面是pusher的下游是c模块,mcpack接收的范例配置, 参考space_follow点击下载
[UbClientConfig] [.UbClient] [..@Service] Name : space_follow ConnectAll : 0 DefaultConnectTimeOut : 300 DefaultReadTimeOut : 1000 DefaultWriteTimeOut : 1000 DefaultMaxConnect : 10 #DefaultRetry : 5 #LONG / SHORT DefaultConnectType : SHORT #DefaultLinger : 0 #ReqBuf : 100 #ResBuf : 100 #DefaultAsyncWaitingNum : 100 [...CurrStrategy] ClassName : UbClientStrategy [...CurrHealthy] ClassName : UbClientHealthyChecker [...@Server] IP : 10.32.52.30 Port : 29003 [...@Server] IP : 10.32.52.31 Port : 29003
module_$(product)_$(module).conf + 竞争模式 + 本地配置
下面是pusher的下游是php模块,http接收的范例配置. 参考lbs_attent 点击下载
[modules] #下面是一个模块的配置 [.@module] #模块名,需要跟ubclient配置里面的名称一致; name : lbs_attent #是否启用 1启用 0停用 flag : 1 #是竞争模式还是多主模式;0:竞争,1:多主 sending_type : 0 #转发协议配置; sending_protocol : http #发送窗口大小,需要比线程数大; sending_window_size : 10 #发送线程数。竞争模式为竞争线程数,多主模式下,为每个机器的并发线程数 sending_thread_num : 8 #出错后重试的时间间隔(单位MS) sending_retry_time: 500 #命令过滤规则 ,接受的命令配置; # *表示通配,只能出现在字符串的最后,不能出现在中间 [..msg_filter] @filter : promo.atten.* #@filter : promo.* #时序控制规则,支持按照某个partition key的时序支持 [..sequence_control] #制定mutex key,必须是整形数类型的字段; mutex_key : qid #指定对于 没有PK的命令,是否强制串行处理; #1:是:该命令必须等待所有靠前的命令都执行完成,再执行。该命令执行前,靠后的命令也不能执行。(主要用于存在批量命令时,保证时序)。 #0:否:该命令完全无序,到了就执行。 force_sequence_when_no_mutex_key : 1 #支持任意模块自定义配置、so自定义配置 [..ext_config] custom_key : sample [..http] #第二部分 #用户自定义配置内容 [...default_conf] #重试次数,默认为-1,一直重试; 0:不重试; max_retry_times : -1 #是否将提交的mcpack作为post数据发送,1发送0不发送,缺省1,0的情况下只往后端发url send_pack : 1 #发送mcpack的方式:0:二进制模式直接post, 1:转化成text,使用post $key=$pack_str 的方式发送; # 2: 转换成json,使用post $key=$pack_str 的方式发送;默认为0模式; send_pack_type : 0 #在send_pack_type=1/2的时候,配置post的key;默认"data”; send_pack_key : data #server 冗余均衡策略: 0:random,对多个ip进行轮询;1:master-slave,只在出错时切换到下一个ip; #默认0; server_redundancy_policy : 0 #uri的模版,其中{{}}里面的字段将从mcpack里面获取替换; 支持req_download.uid这种多级的字段; uri : /promov1/commit/attention?mod=commit&transid={{_transid}} #http method, 0:get, 1:post, default: 1 #http_method : 0 #过滤器,会判断mcpack中的uid字段的值%2后是否为1,只有为1时才发送 #目前只支持"=" 和 "%" 这两种; #filter : {{user_id}}%2=1 #http header, 多个header用\\r\\n隔开; http_header : User-Agent: NuSOAP/0.6.6\r\ncharset=UTF-8 #命令号相关配置 #针对每个不同的命令号,可以重设上面的uri/http_method/filter/send_pack/http_header,覆盖上面的默认配置; #[...space_photo_add] #uri : /so/test22?service=Commit&pid=test&tk=test&transid={{_transid}} #filter : {{is_xxx}}=1 #[...space_post_add] #send_pack : 1 #uri : /so/test33?service=Commit&pid=test&tk=test&transid={{_transid}} #filter : {{uid}}%2=1 #mcpack字段复制 #在消息中间件中,内部填充字段都是以"_"开头,比如_transid, _log_id等,为了兼容,可以在so中将这些字段复制改名; #[...@mcpack_key_copy] #from : _transid #to : trans_id
machine_$(product)_$(module).conf + 竞争模式 + 本地配置
下面是pusher的下游是php模块,http接收的范例配置. 参考lbs_attent 点击下载
[UbClientConfig] [.UbClient] #竞争的例子(不用zk时) [..@Service] Name : lbs_attent ConnectAll : 0 DefaultConnectTimeOut : 300 DefaultReadTimeOut : 1000 DefaultWriteTimeOut : 1000 DefaultMaxConnect : 10 #DefaultRetry : 5 #LONG / SHORT DefaultConnectType : SHORT #DefaultLinger : 0 #ReqBuf : 100 #ResBuf : 100 #DefaultAsyncWaitingNum : 100 #声明将要使用的策略类及属性 [...CurrStrategy] ClassName : UbClientStrategy [...CurrHealthy] ClassName : UbClientHealthyChecker [...@Server] IP : 10.40.71.100 Port : 8016 [...@Server] IP : 10.40.71.101 Port : 8016
从cm/transfer迁移
每个产品线可能有自己特殊的情况,以下仅列出一些常见的问题,供参考。 cm/transfer到nmq不能做到透明的迁移,需要产品线做上下游的兼容。
1.迁移方式
双写或顺切。一般考虑到平滑过渡和不丢数据的可回滚,会使用双写的方案。
2.上游兼容
上游将消息提交从cm/transfer迁移到nmq,需要做几类事情的改变:
-
命令号从cmd_no迁移到product+topic+cmd
-
和id分配服务交互需要自行实现
-
其他在cm单点做的事情,比如字段检查等,需要自行实现
可选的方案有两种:升级上游模块,或者增加单独的adapter层。视产品线的情况而定。
3.下游兼容
下游兼容最主要的是发送协议的兼容。简单的讲,下游的兼容主要通过so来做。
- nmq不支持transfer的default协议(协议中带transid,并且下游可以使 用transid反向控制)。最佳建议:升级下游支持裸的mcpack方式,自己无需保存transid,也不再使用下游反向控制transid的功能。 为兼容default协议使用伪造的方式很trick,无论是在so中实现,还是加一个中间层。
- 其他mysql、comdb的发送方式。 a) 建议不要再使用transfer直接更新mysql的方式,改用下游自己去更新。 b) comdb等so视情况,在pusher下实现新的so即可。
- 一些细节的兼容,可以在so中支持,比如transid等字段的兼容。
自定义转发流程
如果协议使用mcpack和http,但默认so不能满足的功能,请联系nmq维护者。 如果使用其他的转发协议如comdb、memcache,请联系nmq维护者,如果不是通用的需求可能需要产品线自己来开发。 自定义so需要实现若干个回调函数,定义在talk_ext.h中。
/* * @brief 初始化操作,成功返回0,失败返回-1 * * @param server 服务器配置 * @param [out] handle_out 自定义处理的句柄,如果有的话 * @param [in] module_conf 自定义处理的命令文件 * * @return 成功返回0, 失败返回-1 */ typedef int (*np_init_f)(talk_svr_t * server, void **handle_out, comcfg::Configure *module_conf); /* * @brief 发送命令逻辑 * * @param server 服务器配置 * @param handle 自定义处理的句柄,如果有的话 * @param logdi logid * @param req_head 同步请求头 * @param buf 数据 * @param buf_len 数据的长度 * * @return 成功返回0,失败返回-1, 命令格式错误返回1 */ typedef int (*np_send_f)( talk_svr_t * server, void *handle, unsigned logdi, ub::UbClientManager *ubmgr, const char *mod_name, void *buf, unsigned buf_len); /* * @brief 析构操作 * * @param server server信息 * @param handle 自定义处理的句柄,如果有的话 * * @return 成功返回0,失败返回-1 */ typedef int (*np_free_f)(talk_svr_t * server, void **handle); /* * @brief 重设连接 * * @param server server信息 * @param handle 自定义句柄 */ typedef void (*np_reset_f)(talk_svr_t * server, void *handle);
消息队列