首页 > 代码库 > 消息队列

消息队列

什么是nmq

消息中间件是一个完备的、易于使用的消息队列系统,替代现有cm/transfer所有的功能,力求解决当前社区提交系统难运维、不通用等弊病,提供一个全流程支持、功能完善、性能可扩展、运维方便、可靠的消息队列及整套提交系统解决方案。 开发代号是NMQ。

背景

长期以来,社区几大产品线(贴吧、空间、知道、ks等)都独立维护着自己的提交系统。虽然产品逻辑和规模不同,但从实现上来讲,面对的问题相近,解决的思 路相似,维护重复、运维分散、人力浪费。为避免重复工作和促进提交集群收敛,我们希望完成一个大社区范畴下消息中间件的统一解决方案。

和cm/transfer区别

  • nmq是一套包含服务、运维平台的整体解决方案。

  • 模型上,nmq 推的模型和cm/transfer一样。topic等同于cm,pusher等同于transfer。

  • 除了推模式之外,nmq还支持拉的模型。

  • nmq自身支持提交消息的按产品线、按业务、按性能拆分。cm里用于标示命令号的cmd_no被扩展为product、topic、cmd。可以按product、topic将产品线和业务进行拆分。如果单机性能出现瓶颈,可以将同一个topic拆 分为不同的子topic。

  • nmq不分配自增id;nmq没有和业务相关的字段填充和字段检查的功能。

  • nmq自身支持发送消息时的并发、时序。

  • 专门为nmq设计开发的mqp运维平台让运维上更方便。

  • pusher 支持transfer的延时发送功能,只需简单配置 sending_delay_time

 

名词解释

topic:按业务划分的消息序列,逻辑上的概念。产品线可以根据自己的业务特点划分,可大 可小。一般情况下,大产品线按各个子业务划分为不同的topic(例如:空间可以划分为博客、相册、用户等topic);小产品线可以整体作为一个 topic,如果业务复杂也可以划分为不同的topic。
子topic: 按性能划分的消息序列,实体上的概念。topic可以按partition-key的自定义规则,拆分为若干个子topic,每个子topic内的消息是严格保证时序的。
partition-key:消息划分或归属的依据,用于topic拆分子topic、竞争 消费时保证时序、多IDC时主IDC的归属。消息需自己指定partition-key,且只能有一个partition-key。如果不拆分子 topic或不关心时序,可以不指定partition-key。
竞争模式:同一个模块部署很多机器,但所有机器竞争消费同一份消息,比如c模块+mysql 主从模式下,c模块多机部署,但由于不同机器上的c模块都更新同一个主库,因此每条消息只需要被一个机器上的模块消费即可,不需要每条消息都分发给所有机 器;竞争模式,为以后的主流更新模式。
多主模式:同竞争模式不同,同一个模块的各个机器,都需要消费全量消息;一般模块内维护全量数据副本的场景下使用;大产品线的专有实现模块,一般是这种模式。

架构图和数据流图

技术分享

上游模块向nmqproxy发送消息,消息经过消息中间件的nmqproxy转发到对应的topic server模块,topicserver将消息序列化到磁盘上,pusher扫描并读取到最新的消息,然后发送给下游模块。 除了主流的推模型外,nmq也支持“拉”模式,在拉模式下,模块所有的信息都保存在模块本地,模块通过nmq提供的pulllib同nmq保持心跳。

如何提交消息

1.接入

1.1怎么接入nmq

产品线之前接入cm/transfer有2种方式:
c/c++通过nshead+mcpack、php通过ral等方式。 产品线接入nmq与接入cm/transfer一般没有太大变化,一般来说只需要加入以下字段:

  • _product(产品线名称,7个字节以内)

  • _topic(产品线业务名称,7个字节以内)

  • _cmd(需要做的事,与之前cmd_no含义类似,只是_cmd为string,例如"addblog"等)

1.2注意项

nmq不再具有cm的id分配功能。解决方法,产品线调idalloc(arch或者space的idalloc)模块进行id分配,然后再提交到nmq;因此接入前先判断是否需要id分配功能。
下游转发不支持mysql.so和comdb.so。因此如果之前的业务是transfer下游直接是mysql或者comdb的话,需要产品线自行开发下游c或者php模块,实现更新db和comdb功能。

2.提交请求和响应

请求

向nmqproxy提交消息,使用nshead+mcpack2协议,同时兼任mcpack1格式

	//请求包
	{
	_product: "space",
	_topic: "album",
	_cmd: "album_add",
	_partition_key: "10001", //可选
	_msg_unique_key:12333234, //可选
	//	消息自定义的内容
	}
									

响应

响应包中含_error_no和_error_msg字段,表明发送是否成功。注意,响应包中不含消息的原有字段和nmqproxy填充的字段。

	//响应包
	{
	_error_no: (int32)0,
	_error_msg: "OK",
	}
									

3.字段和规范

字段名约束:一个下划线开头的字段为保留字段。应用不能使用一个下划线开头的字段名。
字段值约束:只能使用大小写字母、数字、和下划线。
必须填的字段:_product、_topic、_cmd。
长度限制(含‘\0‘):product字段8个字符,topic字段8个字符,cmd字段16个字符。

  • product是产品线的唯一标示。

  • topic是业务划分的消息序列的表示,详见"名词解释"。

  • cmd是用有意义的字符串表示的命令号。

  • 可选填的字段:_idc。长度限制(含‘\0‘).product字段8个字符

【二期才支持】idc是机房的标示,用于多IDC提交。 
可选填的字段:_partition_key,_msg_unique_key。长度无限制。 
partition_key用于拆分子topic、多IDC转发,一般使用用户id、吧id作为partition_key。 
【暂不支持】msg_unique_key用于提交去重,建议使用随机性和区分度非常好的uuid。

4.配置实例

1.nmqproxy的ip是用资源定位发布的,因此需要上游模块配置上nmqproxy在zk的服务器和路径。
注:整个集群公用一组nmqproxy,所以使用的nmqproxy的资源定位是一样。
2.在nmqproxy中配置本应用的消息的路由信息。
修改nmqproxy/conf/nmqproxy.conf文件,在[topics]中增加:

#每个topic的配置
[.@topic]
#产品线名称
product : tieba
#topic名称
topic : post
#是否启用,1或0。可选,默认为1
enable : 1
#是否走多IDC提交流程,1或0。可选,默认为0
multi_idc : 0
#拆分子topic的个数。可选,默认为1,表示不拆分
sub_topic_num : 1
#topic server的url,名称需要和ubclient配置里面的名称的一致,拆分时,自动在后面补0/1…
topic_server_name_prefix : topic-tieba-post-commit
pusher_server_name_prefix : pusher-tieba-post
									

product和topic是和提交消息里的字段保持一致。 
最后的topic_server_name_prefix是topic_group的名称,需要和nmqproxy/conf/ubclient.conf保持一致的。
3.topic server的配置不需要修改

如何接收消息

一个后端模块,想从nmq接受消息,对rd来说,只需要提供该模块的2个配置文件即可。
建议该模块的模块命名为$(product)_$(module),那么需要下面2个配置文件: 

  • module_$(product)_$(module).conf

  • machine_$(product)_$(module).conf

模块文件命名规范: 以下游名字来命名,比如下游叫abc模块,如果abc模块只用了某个product的数据,建议名为module_$(product)_abc.conf.如果abc是个很大的系统,用到很多模块的数据,允许module_abc.conf

module_$(product)_$(module).conf主要配置该模块的模块级配置,包括发送协议、接受的消息类型、并发时序控制、字段复制等等。
machine_$(product)_$(module).conf主要配置该模块的后端机器具体访问设置,包括ip、端口、超时等配置。
目前支持本地配置,也支持资源定位(zk/webfoot),该配置文件,同ubclient的配置文件基本一致,但在针对多主模块的配置时,针对每个机器增加了identifer和flag字段。另外多主模式的模块,不支持资源定位方式。

pusher配置范例

module_$(product)_$(module).conf + 竞争模式+ mcpack协议

下面是pusher的下游是c模块,mcpack接收的范例配置, 参考space_follow点击下载

[modules]
[.@module]
#模块名,需要跟ubclient配置里面的名称一致;
name : space_follow
#是否启用 1启用 0停用 默认0
flag : 1
#是竞争模式还是多主模式;0:竞争,1:多主
sending_type : 0
#转发协议配置;
sending_protocol : mcpack
#发送窗口大小,需要比线程数大;
sending_window_size : 10
#发送线程数。竞争模式为竞争线程数,多主模式下,为每个机器的并发线程数
sending_thread_num : 8
#出错后重试的时间间隔(单位MS)
sending_retry_time: 500

#命令过滤规则 ,接受的命令配置;
# *表示通配,只能出现在字符串的最后,不能出现在中间
[..msg_filter]
@filter:space.follow.*

#时序控制规则,支持按照某个partition key的时序支持
[..sequence_control]
#制定mutex key,必须是整形数类型的字段;
mutex_key : qid

#指定对于 没有PK的命令,是否强制串行处理;
#1:是:该命令必须等待所有靠前的命令都执行完成,再执行。该命令执行前,靠后的命令也不能执行。(主要用于存在批量命令时,保证时序)。
#0:否:该命令完全无序£??到了就执行。
force_sequence_when_no_mutex_key : 1



#支持任意模块自定义配置、so自定义配置
[..ext_config]

[..mcpack]
#发送模式,0:nshead+mcpack, 1:ubrpc
send_type : 0

#mcpack字段复制
#在消息中间件中,内部填充字段都是以"_"开头,比如_trans_id, _log_id等,为了兼容,可以在so中将这些字段复制改名;
[...@mcpack_key_copy]
from : _transid
to : trans_id

[...@mcpack_key_copy]
from : _log_id
to : log_id
									

machine_$(product)_$(module).conf + mcpack协议

下面是pusher的下游是c模块,mcpack接收的范例配置, 参考space_follow点击下载

[UbClientConfig]
[.UbClient]

[..@Service]
Name  : space_follow
ConnectAll :  0
DefaultConnectTimeOut  :  300
DefaultReadTimeOut  :  1000
DefaultWriteTimeOut  :  1000
DefaultMaxConnect  :  10
#DefaultRetry  :  5
#LONG / SHORT
DefaultConnectType  :  SHORT
#DefaultLinger  :  0
#ReqBuf  :  100
#ResBuf  :  100
#DefaultAsyncWaitingNum  :  100
[...CurrStrategy]
ClassName  :  UbClientStrategy
[...CurrHealthy]
ClassName  :  UbClientHealthyChecker

[...@Server]
IP : 10.32.52.30
Port : 29003 
[...@Server]
IP : 10.32.52.31
Port : 29003 
									

module_$(product)_$(module).conf + 竞争模式 + 本地配置

下面是pusher的下游是php模块,http接收的范例配置. 参考lbs_attent 点击下载

[modules]
#下面是一个模块的配置

[.@module]
#模块名,需要跟ubclient配置里面的名称一致;
name : lbs_attent
#是否启用 1启用 0停用
flag : 1 
#是竞争模式还是多主模式;0:竞争,1:多主
sending_type : 0 
#转发协议配置;
sending_protocol : http
#发送窗口大小,需要比线程数大;
sending_window_size : 10
#发送线程数。竞争模式为竞争线程数,多主模式下,为每个机器的并发线程数
sending_thread_num : 8
#出错后重试的时间间隔(单位MS)
sending_retry_time: 500 

#命令过滤规则 ,接受的命令配置;
# *表示通配,只能出现在字符串的最后,不能出现在中间
[..msg_filter]
@filter : promo.atten.*
#@filter : promo.*

#时序控制规则,支持按照某个partition key的时序支持
[..sequence_control]
#制定mutex key,必须是整形数类型的字段;
mutex_key : qid

#指定对于 没有PK的命令,是否强制串行处理;
#1:是:该命令必须等待所有靠前的命令都执行完成,再执行。该命令执行前,靠后的命令也不能执行。(主要用于存在批量命令时,保证时序)。
#0:否:该命令完全无序,到了就执行。
force_sequence_when_no_mutex_key : 1


#支持任意模块自定义配置、so自定义配置                                                                                                      
[..ext_config]
custom_key : sample

[..http]
#第二部分
#用户自定义配置内容
[...default_conf]

#重试次数,默认为-1,一直重试; 0:不重试;
max_retry_times : -1

#是否将提交的mcpack作为post数据发送,1发送0不发送,缺省1,0的情况下只往后端发url
send_pack : 1 

#发送mcpack的方式:0:二进制模式直接post, 1:转化成text,使用post $key=$pack_str 的方式发送;
# 2: 转换成json,使用post $key=$pack_str 的方式发送;默认为0模式; 
send_pack_type : 0 

#在send_pack_type=1/2的时候,配置post的key;默认"data”;
send_pack_key : data

#server 冗余均衡策略: 0:random,对多个ip进行轮询;1:master-slave,只在出错时切换到下一个ip;
#默认0;
server_redundancy_policy : 0 

#uri的模版,其中{{}}里面的字段将从mcpack里面获取替换; 支持req_download.uid这种多级的字段;
uri : /promov1/commit/attention?mod=commit&transid={{_transid}}

#http method, 0:get, 1:post, default: 1
#http_method : 0

#过滤器,会判断mcpack中的uid字段的值%2后是否为1,只有为1时才发送
#目前只支持"=" 和 "%" 这两种;
#filter : {{user_id}}%2=1

#http header, 多个header用\\r\\n隔开;
http_header : User-Agent: NuSOAP/0.6.6\r\ncharset=UTF-8


#命令号相关配置
#针对每个不同的命令号,可以重设上面的uri/http_method/filter/send_pack/http_header,覆盖上面的默认配置;
#[...space_photo_add]
#uri : /so/test22?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{is_xxx}}=1

#[...space_post_add]
#send_pack : 1
#uri : /so/test33?service=Commit&pid=test&tk=test&transid={{_transid}}
#filter : {{uid}}%2=1

#mcpack字段复制
#在消息中间件中,内部填充字段都是以"_"开头,比如_transid, _log_id等,为了兼容,可以在so中将这些字段复制改名;
#[...@mcpack_key_copy]
#from : _transid
#to : trans_id
									

machine_$(product)_$(module).conf + 竞争模式 + 本地配置

下面是pusher的下游是php模块,http接收的范例配置. 参考lbs_attent 点击下载

[UbClientConfig]                                                                                                                           
[.UbClient]

#竞争的例子(不用zk时)

[..@Service]
Name  : lbs_attent
ConnectAll :  0
DefaultConnectTimeOut  :  300 
DefaultReadTimeOut  :  1000
DefaultWriteTimeOut  :  1000
DefaultMaxConnect  :  10  
#DefaultRetry  :  5
#LONG / SHORT
DefaultConnectType  :  SHORT
#DefaultLinger  :  0
#ReqBuf  :  100
#ResBuf  :  100
#DefaultAsyncWaitingNum  :  100
#声明将要使用的策略类及属性
[...CurrStrategy]
ClassName  :  UbClientStrategy
[...CurrHealthy]
ClassName  :  UbClientHealthyChecker
[...@Server]
IP : 10.40.71.100
Port : 8016
[...@Server]
IP : 10.40.71.101
Port : 8016
									

从cm/transfer迁移

每个产品线可能有自己特殊的情况,以下仅列出一些常见的问题,供参考。 cm/transfer到nmq不能做到透明的迁移,需要产品线做上下游的兼容。

1.迁移方式

双写或顺切。一般考虑到平滑过渡和不丢数据的可回滚,会使用双写的方案。

2.上游兼容

上游将消息提交从cm/transfer迁移到nmq,需要做几类事情的改变:

  • 命令号从cmd_no迁移到product+topic+cmd

  • 和id分配服务交互需要自行实现

  • 其他在cm单点做的事情,比如字段检查等,需要自行实现

可选的方案有两种:升级上游模块,或者增加单独的adapter层。视产品线的情况而定。

3.下游兼容

下游兼容最主要的是发送协议的兼容。简单的讲,下游的兼容主要通过so来做。

  • nmq不支持transfer的default协议(协议中带transid,并且下游可以使 用transid反向控制)。最佳建议:升级下游支持裸的mcpack方式,自己无需保存transid,也不再使用下游反向控制transid的功能。 为兼容default协议使用伪造的方式很trick,无论是在so中实现,还是加一个中间层。
  • 其他mysql、comdb的发送方式。 a) 建议不要再使用transfer直接更新mysql的方式,改用下游自己去更新。 b) comdb等so视情况,在pusher下实现新的so即可。
  • 一些细节的兼容,可以在so中支持,比如transid等字段的兼容。

自定义转发流程

如果协议使用mcpack和http,但默认so不能满足的功能,请联系nmq维护者。 如果使用其他的转发协议如comdb、memcache,请联系nmq维护者,如果不是通用的需求可能需要产品线自己来开发。 自定义so需要实现若干个回调函数,定义在talk_ext.h中。

/*
 * @brief 初始化操作,成功返回0,失败返回-1
 *
 * @param server 服务器配置
 * @param [out] handle_out 自定义处理的句柄,如果有的话
 * @param [in] module_conf 自定义处理的命令文件
 *
 * @return 成功返回0, 失败返回-1
 */
typedef int (*np_init_f)(talk_svr_t * server, void **handle_out, comcfg::Configure *module_conf);
/*
 * @brief 发送命令逻辑
 *
 * @param server 服务器配置
 * @param handle 自定义处理的句柄,如果有的话
 * @param logdi  logid
 * @param req_head  同步请求头
 * @param buf 数据
 * @param buf_len  数据的长度
 *
 * @return 成功返回0,失败返回-1, 命令格式错误返回1
 */
typedef int (*np_send_f)(
    talk_svr_t * server,
    void *handle,
    unsigned logdi,
    ub::UbClientManager *ubmgr,
    const char *mod_name,
    void *buf,
    unsigned buf_len);
/*
 * @brief 析构操作
 *
 * @param server  server信息
 * @param handle 自定义处理的句柄,如果有的话
 *
 * @return 成功返回0,失败返回-1
 */
typedef int (*np_free_f)(talk_svr_t * server, void **handle);

/*
 * @brief 重设连接
 *
 * @param server server信息
 * @param handle    自定义句柄
 */
typedef void (*np_reset_f)(talk_svr_t * server, void *handle);

消息队列