首页 > 代码库 > rocketmq源码分析3-consumer消息获取

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下:

技术分享

在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作。。

 

1. 如何找到入口(MQ-broker端)

分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口。但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较绕的,主要有很多异步动作,还有循环调用(当然不是一个线程上,而且中间有阻塞队列缓冲),这对调试式分析代码造成了一些不方便。
回到正题,怎么找到这里入口?在具备上篇分析的基础上,我直接分析broker的代码,broker接收消息的时候是靠SendMessageProcessor,那么在消息传递给消费端的时候是不是也是靠某个processor完成的?据这些processor的命名观察,猜测PullMessageProcessor比较像。
为了验证这一想法,注释掉BrokerController中使用这个processor的地方,再重新测试,发现consumer就收不到producer发过来的消息了。想法初步正确。

2. 调试PullMessageProcessor(MQ-broker端)

RemotingServer在注册processor的时候,是根据RequestCode进行注册的。
PullMessageProcessor 对应的RequestCode的PULL_MESSAGE,即11。猜测:consumer客户端不断(或定时轮询,或循环调用,或其他方式)发起pull message请求给broker,broker会处理这些请求,后面会验证这个猜测。
PullMessageProcessor在注册的时候对应的线程池是pullMessageExecutor,线程池的corePoolSize以及maxPoolSize都可以在broker中进行config,字段名是pullMessageThreadPoolNums。默认值16+处理器个数*2。

3. 哪里发送了RequestCode为PULL_MESSAGE的请求(consumer客户端)

通过全局搜索,很容易发现是MQClientAPIImpl.pullMessage422行,发送了PULL_MESSAGE类型的请求。
加断点(consumer客户端需要debug方式启动),看调用堆栈。

技术分享

很容易发现 是 PullMessageService.run()发出了PULL_MESSAGE的request。 run的代码如下:

while (!this.isStoped()) {    try {        PullRequest pullRequest = this.pullRequestQueue.take();        if (pullRequest != null) {            this.pullMessage(pullRequest);        }    }    catch (InterruptedException e) {    }    catch (Exception e) {        log.error("Pull Message Service Run Method exception", e);    }}

在线程没有停止的情况下,一直循环发拉取消息的请求,过程中被pullRequestQueue阻塞队列阻塞。
分析谁向pullRequestQueue put了元素?是PullMessageService.executePullRequestImmediately(PullRequest)方法。
谁调了上面的方法,同样断点分析,调用堆栈如下图:

技术分享

划蓝色线的ResponseFuture地方,是阿里对这种通过发送网络请求调用后还能回调回来的一个特性封装,值得学习。 划红色线的 MQClientAPIImpl$2地方是在处理业务逻辑,位于方法pullMessageAsync(String, RemotingCommand, long, PullCallback)内。此处又是一个异步。

private void pullMessageAsync(//        final String addr,// 1        final RemotingCommand request,//        final long timeoutMillis,//        final PullCallback pullCallback//) throws RemotingException, InterruptedException {    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {        @Override        public void operationComplete(ResponseFuture responseFuture) {            RemotingCommand response = responseFuture.getResponseCommand();            if (response != null) {                try {                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);                    assert pullResult != null;                    pullCallback.onSuccess(pullResult);// 457行对应此处                }                catch (Exception e) {                    pullCallback.onException(e);                }            }            else {                //... 省略            }        }    });}

4. 调试MQClientAPIImpl.pullMessageAsync(consumer客户端)

谁调用了MQClientAPIImpl.pullMessageAsync?
449行打断点,堆栈如下:

技术分享

发现又回到上面的PullMessageService的run中。:-(
回头去看看3段落中pullCallback.onSuccess(pullResult);// 457行对应此处
这一行代码,跟进去就会发现玄机,在这里面又调用了PullMessageService.executePullRequestImmediately(PullRequest)方法。 是一个匿名内部类,位于DefaultMQPushConsumerImpl.pullMessage(PullRequest)中。这个方法太长,贴一些简略的,

final long beginTimestamp = System.currentTimeMillis();PullCallback pullCallback = new PullCallback() {    @Override    public void onSuccess(PullResult pullResult) {        if (pullResult != null) {            pullResult =                    DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(                        pullRequest.getMessageQueue(), pullResult, subscriptionData);            switch (pullResult.getPullStatus()) {            case FOUND:                //...省略                long firstMsgOffset = Long.MAX_VALUE;                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                }                else {                    //...省略                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                    }                    else {                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                    }                }                //...省略                break;            case NO_NEW_MSG:                //...省略                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                break;            case NO_MATCHED_MSG:                //...省略                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                break;            case OFFSET_ILLEGAL:                //...省略

上述代码基本上很清楚地看出来拉取代码的发起逻辑了。在case FOUND分支打个断点,当你从producer的客户端发一条消息过来的时候,能看到断点被命中。当没有producer没有发消息的时候,一直走的就是case NO_NEW_MSG分支。

未完,待分析拉取实际过程。

rocketmq源码分析3-consumer消息获取