rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。
并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。
其他逻辑
1、consumer会定期向broker同步ack消息偏移量,也就是已经消费的位置。
2、极端情况下consumer会因为一个消息一直失败导致ack消息偏移量无法前进,但是因为会有定时任务去清楚过期消息,所以ack进度正常便宜。
并发消费整体流程
并发消费过程
说明:
1、Rebalance负责生成pullRequest放置到pullRequestQueue当中。
2、PullMessageService负责消费pullRequest来完成数据的拉取。
3、数据拉取后生成ConsumeRequest对象投递到consumeExecutor的线程池当中
4、ConsumeRequest是一个线程实例,负责消费拉取的消息。
5、消费消息成功就从ConsumeRequest的ProcessQueue中删除,消费失败就投递到broker的重试队列中,重试次数和延迟粒度在broker端处理。
6、consumeRequest内部维持的processQueue作为一个TreeMap对象可以维持消息的有序性,用于判断消费进度。
7、pullRequest在消费完以后还是再次投递到pullRequestQueue当中。
pullRequest执行过程
consumer消费入口
说明:参见PullMessageService类
1、单线程循环消费pullRequest。
消费流速控制
说明:参见PullMessageService类
1、消费过程中进行一些状态判断以及流速控制
有序消费和无须消费处理逻辑
说明:参见DefaultMQPushConsumerImpl类
1、区分有序消费和无须消费
2、无序消费会判断消费偏移量是否差别过大
拉取消息的回调函数
说明:参见DefaultMQPushConsumerImpl类
1、处理拉取消息的后续操作
2、处理完以后再次投递pullRequest请求
消息拉取执行部分
说明:参见PullAPIWrapper类
真正执行拉取的地方
说明:参见PullAPIWrapper类
处理拉取的消息结果
说明:参见ConsumeMessageConcurrentlyService类。
1、拉取消息成功后设置下一次拉取的偏移量。
2、更新拉取的消息到processQueue当中。
3、再次投递pullRequest发起下一次拉取。
处理拉取消息的分配处理
说明:参见ConsumeMessageConcurrentlyService类
1、分一次能够处理完成和分多次能够处理完成。
consumer消费对象的核心
说明:
1、processQueue是待处理消息保存位置,里面核心数据结构之一为TreeMap
2、messageQueue就是这个ConsumeRequest负责处理的messageQueue
回调函数消费并进行结果处理
说明:参见ConsumeMessageConcurrentlyService类
1、consumer消费拉取消息的逻辑及后续处理
持久化消费位移
说明:参见ConsumeMessageConcurrentlyService类
1、消费成功就删除所有拉取的消息
broker端存储重试消息
说明:参见SendMessageProcessor类
1、处理逻辑在consumerSendMsgBack方法中
2、里面涉及到延迟粒度和重试次数的设置
3、消息是被投递到延迟队列当中的
定期持久化消费位移
说明:参见MQClientInstance类
1、在persistAllConsumerOffset定期持久化消费偏移量
2、消费偏移量由ConsumerRequest请求在处理的过程中变更的
重新发送拉取请求
说明:参见DefaultMQPushConsumerImpl类
1、处理没有从broker拉取消息的过程
2、再次投递pullRequest请求
技术
今日推荐