whatever MQ There may be various abnormalities in products , These exceptions may prevent messages from being sent to Broker, Or the message cannot be received by the consumer , So most of them MQ Products will provide retry mechanism for message failure .RocketMQ No exception , stay RocketMQ There are two types of message retries: producer retries and consumer retries .


Producer retry

Producer retries are when the producer Broker When sending a message , If the current network jitter and other reasons cause message sending failure , At this time, you can manually set the number of send failure retries to resend the message .
@RunWith(SpringRunner.class) @SpringBootTest public class
RockermqproducerApplicationTests {
@Value("${apache.rocketmq.producer.producerGroup}") private String
producerGroup; /** * NameServer address */ @Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr; @Test public void contextLoads() { // Group name of the producer
DefaultMQProducer producer=new DefaultMQProducer(producerGroup);
// appoint NameServer address , Multiple addresses to ; separate producer.setNamesrvAddr(namesrvAddr); // Message send failure retries
producer.setRetryTimesWhenSendFailed(3); // Number of asynchronous send failure retries
producer.setRetryTimesWhenSendAsyncFailed(3); // Message not sent successfully , Send to another Broker in
producer.setRetryAnotherBrokerWhenNotStoreOK(true); try { /** *
Producer Object must be called before it can be used start initialization , Initialization once * be careful : Remember not to send a message every time , All calls start method */
producer.start(); for (int i=0;i<=10000;i++) { Message msg=new
Message("topic_example_java","TagA",("Hello Java Demo
RocketMQ:"+i).getBytes(Charset.defaultCharset())); SendResult
result=producer.send(msg); System.out.println(" Message sending results :"+result); } }catch
(Exception e) { e.printStackTrace(); }finally { producer.shutdown(); } } }
You can see through org.apache.rocketmq.client.producer.DefaultMQProducer Class setRetryTimesWhenSendFailed and setRetryTimesWhenSendAsyncFailed Method sets the number of retries . Here, the main code to realize retry logic is DefaultMQProducerImpl Class sendDefaultImpl In the method .


Consumer retry

The failure of the consumer side is generally divided into two situations , One is that messages cannot be sent from the Internet Broker Send to consumer , At this time RocketMQ Internal will keep trying to send this message , Until sending is successful ( For example, to one of the clusters Broker Instance send failed , Try to send it to another Broker example ); Second, the consumer side has received the message normally , But an exception occurred while executing the subsequent message processing logic , Final feedback to MQ Consumer processing failed , For example, the received message data may not meet its own business requirements , If the current card number is not activated, business cannot be performed, etc , At this time, we need to control by returning different states of message consumption through business code .

Next, take ordinary consumption as an example , Take a look at how to retry when the business message consumption exception occurs on the consumer side . Next, register the message listener in the consumer code consumeMessage Message consumption status returned by method ConsumeConcurrentlyStatus Definition of :
public enum ConsumeConcurrentlyStatus { CONSUME_SUCCESS, RECONSUME_LATER;
private ConsumeConcurrentlyStatus() { } }
CONSUME_SUCCESS Indicates successful consumption , This is the status returned in the normal business code .RECONSUME_LATER Indicates current consumption failure , Need to retry later . stay RocketMQ Only the consumer side of the business has returned CONSUME_SUCCESS It will be considered successful when the message is consumed , If you return RECONSUME_LATER,RocketMQ It will be regarded as a failure of consumption , Redelivery required . To ensure that messages are successfully consumed at least once ,RocketMQ Send back the message that the consumption fails Broker, At some point in time ( The default is 10 second , Modifiable ) Deliver to consumers again . If it fails to repeat the message all the time , When failures accumulate to a certain number of times ( default 16 second ) Post message to dead letter queue (Dead
Letter Queue) in , At this time, we need to monitor the dead letter queue for manual intervention .
@Bean("consumer1") public DefaultMQPushConsumer consumer1() {
// Create a message consumer , And set up a message consumer group DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("niwei_consumer_group"); // appoint NameServer address
consumer.setInstanceName("PushConsumer1"); // set up Consumer
Start consumption from the queue head or the queue tail at the first startup
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); try {
// Subscription assignments Topic All messages below consumer.subscribe("topic_example_java", "*"); // Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() { public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext context) { // default list There's only one message in it , You can receive messages in batches by setting parameters if
(list != null) { for (MessageExt ext : list) { String msgBody=new
String(ext.getBody()); if (ext.getReconsumeTimes()==3) {
saveReconsumeStillMessage(ext); return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else { try { doBusiness(msgBody);
}catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Consumer objects must be called before they can be used
start initialization consumer.start(); System.out.println(" Message consumer started "); }catch (Exception
e) { e.printStackTrace(); } return consumer; }
In the example , Judge whether the current message has been retried 3 Issued after , No more business code execution , Directly record message data and return consumption success status . If the exception return status is not handled well in the callback of early business , Instead, throw an exception during method execution , that RocketMQ Think consumption is also a failure , Will be treated as RECONSUME_LATER To deal with .

  Callback when using sequential consumption ( Achieved MessageListenerOrderly Interface ) Time , Because sequential consumption can only continue if the previous message is successfully consumed , So in its message state definition (ConsumeOrderlyStatus) Not in RECONSUME_LATER state , But with SUSPEND_CURRENT_QUEUE_A_MOMENT To pause the consumption action of the current queue , Until the message is successfully retried .

©2019-2020 Toolsou All rights reserved,
Python+OpenCV Detailed explanation of face recognition technology SpringBoot practice ( five ):mybatis-plus In BaseMapper,Iservice and ServiceImpl Google says home office affects work efficiency !2021 Return to offline office in 2010 C Language programming to find a student's grade vue Of v-if And v-show The difference between C Language console games , Make bricks These songs , Programmers, don't listen ! What should I do if I suddenly encounter a question I can't answer during the interview ?python To solve the problem of dictionary writing list in c Linguistic 5 Three common sorting methods