rocketMq There are two kinds of consumption process , They are concurrent consumption and orderly consumption , Each consumption mode can be shared separately , This article is used alone to analyze concurrent consumption .

Several core points of concurrent consumption : Message pull for concurrent consumption , Message retry for concurrent consumption , Of concurrent messages ack mechanism , Sustainability of consumption progress , This sharing will break down these problems .

Other logic

    1,consumer Regularly broker synchronization ack Message offset , That is, the position of consumption .

    2, In extreme cases consumer Because a message fails all the time ack Message offset cannot advance , But because there will be regular tasks to clear the overdue messages , therefore ack Normal and cheap progress .

Concurrent consumption overall process

Concurrent consumption process
explain :

    1,Rebalance Responsible for generation pullRequest Place to pullRequestQueue among .

    2,PullMessageService Responsible for consumption pullRequest To pull data .

    3, Generated after data pulling ConsumeRequest Object posted to consumeExecutor In the thread pool of

    4,ConsumeRequest Is a thread instance , Responsible for consumption pull message .

5, The success of consumption news comes from ConsumeRequest Of ProcessQueue Delete from , If the consumption fails, it will be delivered to broker In the retry queue of , The number of retries and latency granularity are broker End treatment .

    6,consumeRequest Internally maintained processQueue As a TreeMap Object can maintain the order of messages , Used to judge consumption progress .

    7,pullRequest Deliver it again after consumption pullRequestQueue among .

pullRequest Execution process
consumer Consumption entrance
explain : See PullMessageService class

    1, Single thread circular consumption pullRequest.

Consumption flow control
explain : See PullMessageService class

    1, Some state judgment and flow control in consumption process

Orderly consumption and no consumption processing logic
explain : See DefaultMQPushConsumerImpl class

    1, Distinguish between orderly consumption and non consumption

    2, Disordered consumption will judge whether the difference of consumption offset is too large

Callback function for pull message
explain : See DefaultMQPushConsumerImpl class

    1, Handle subsequent operations of pull message

    2, Deliver again after processing pullRequest request

Message pull execution part
explain : See PullAPIWrapper class

Where the pull is actually performed
explain : See PullAPIWrapper class

Handling pulled message results
explain : See ConsumeMessageConcurrentlyService class .

    1, Set the offset of the next pull after the pull message succeeds .

    2, Update pulled messages to processQueue among .

    3, Redelivery pullRequest Initiate next pull .

Handle the allocation processing of pull message
explain : See ConsumeMessageConcurrentlyService class

    1, Can be processed once and can be processed many times .

consumer The core of consumption object
explain :

    1,processQueue Is where to save messages , One of the core data structures is TreeMap

    2,messageQueue That's it ConsumeRequest Responsible for handling messageQueue

Callback function consumption and result processing
explain : See ConsumeMessageConcurrentlyService class

    1,consumer Logic and subsequent processing of consumption pull message

Persistent consumption shift
explain : See ConsumeMessageConcurrentlyService class

    1, Delete all pull messages if consumption succeeds

broker End store retry message
explain : See SendMessageProcessor class

    1, Processing logic in consumerSendMsgBack In the method

    2, It involves setting delay granularity and retry times

    3, Messages are delivered to the delay queue

Regular and lasting consumption displacement
explain : See MQClientInstance class

    1, stay persistAllConsumerOffset Periodic persistent consumption offset

    2, Consumption offset from ConsumerRequest Requested to change during processing

Resend pull request
explain : See DefaultMQPushConsumerImpl class

    1, Processing not from broker The process of pulling messages

    2, Redelivery pullRequest request

©2019-2020 Toolsou All rights reserved,
JAVA Detailed explanation of anomalies MySQL An interview is a must ! How to use it quickly html and css Write static page R Language cluster analysis case Dialogue between apple and Nissan suspended ,Apple Car How's it going ?java Realize the function of grabbing red packets SpringBoot practice ( five ):mybatis-plus In BaseMapper,Iservice and ServiceImpl Google says home office affects work efficiency !2021 Return to offline office in 2010 about keras use fit_generator Encountered in StopIteration Programmer Tanabata Valentine's Day confession code