首页 > 代码库 > RocketMQ3.2.6安装部署及调用

RocketMQ3.2.6安装部署及调用

RocketMQ3.2.6安装部署及调用
1、RocketMQ部署架构
所有IP都是127.0.0.1,其中NameServer一个,Broker一个,Producer一个,Consumer一个
技术分享
2、 RocketMQ环境搭建
(1).下载安装包 下载地址:https://github.com/alibaba/RocketMQ/releases
技术分享
(2).解压,并进入bin目录
技术分享
(3).启动name server
技术分享
(4).启动broker
技术分享
(5).启动完成
技术分享
技术分享
3、 RocketMQ调用使用JAR:
技术分享
4、 RocketMQ一些概念
RocketMQ以Topic来管理不同应用的消息。对于生产者而言,发送消息是,需要指定消息的Topic,对于消费者而言,在启动后,需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。

RocketMQ中,producer将消息发送给Broker时,需要制定发送到哪一个队列中,默认情况下,producer会轮询的将消息发送到每个队列中(所有broker下的Queue合并成一个List去轮询)。

对于consumer而言,会为每个consumer分配固定的队列(如果队列总数没有发生变化),consumer从固定的队列中去拉取没有消费的消息进行处理。

Producer

Producer端(属于client)的逻辑概述:

producer端的逻辑都比较简单,将消息发送到某个Queue中即可,具体发送到那个Queue可以由用户控制(MessageQueueSelector接口),默认情况下,将轮询方式选择Queue。在producer端,会从NameServer将所有Broker的Topic及对应的Queue信息(即:TopicRoute信息)拉取到本地,然后根据(brokerName, queueId)组建成一个List。因此在MessageQueueSelector,可以看到所有的Queue信息。

RocketMQ将topic的消息以多个Queue来管理,使得其较为容易的就可以进行水平扩展,提供系统吞吐力。这样分布带来的问题,就是从全局上不能做到顺序性(很多时候也并不需要全局上的顺序性)。

RocketMQ提到支持顺序消息,实际上是指基于Queue级别的顺序。用户将某些需要满足顺序的一批消息(比如电商某个订单号的一系列后续操作、比如数据库的某个主键的insert、delete、update等操作)发送到固定的某个Queue中,则从这个Queue消费消息的consumer,针对这一批消息是顺序消费。

问题1:针对顺序消息的队列,是否可以做到不停服务下的集群动态扩展?

Consumer

consumer逻辑稍微复杂一点。初步思考,consumer端至少需要处理:

(1) 消息的获取

(2) offset(消费进度)的管理与存储

(3) 集群消费模式下,Queue的分配问题(rebalance)

RocketMQ对外提供了两种不同形式的Consumer:PushConsumer和PullConsumer。顾名思义,对于PullConsumer而言,用户需要主动调用相应的接口去拉取未消费的消息。对于PushConsumer而言,用户提供消息处理的CallBack,有未曾消费的消息时,会主动回调这个CallBack来处理消息。虽从用户角度而言,Consumer存在主动(pull)和被动(push),但RocketMQ本身的broker端仅仅保存所有的消息,并不负责push消息,因此PushConsumer的底层实现也是有一个长连接主动去broker上拉取未消费的消息,然后回调用户的callback逻辑。
5、 RocketMQ如何使用
A. 生产者
技术分享
技术分享
B. 消费者
技术分享
技术分享
技术分享
C. 控制器中调用
技术分享
D. 启动消费者任务。注意:切记不可以在每次发送消息时,都调用start方法
技术分享
6、 RocketMQ工作原理
技术分享

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    RocketMQ3.2.6安装部署及调用