1, Consumer code :
package com.ryfchina.ipay.regionalpay.mq; import
com.ryfchina.ipay.regionalpay.vo.MqMessageVO; import
org.apache.rocketmq.client.producer.DefaultMQProducer; import
org.apache.rocketmq.client.producer.SendResult; import
org.apache.rocketmq.client.producer.SendStatus; import
org.apache.rocketmq.common.message.Message; import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient; import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.beans.factory.annotation.Value; import
org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** *
Send Message Queuing service * */ @Service("mqProducer") public class MqProducer { private static
final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);
@Value("${rocketmq.url}") private String namesrvAddr;
@Value("${rocketmq.producer.group}") private String producerGroup;
@Value("${rocketmq.producer.instance}") private String producerInstance;
private DefaultMQProducer producer; @Autowired private RedissonClient
redissonClient; @PostConstruct public void start() throws Exception {
LOGGER.info(" Initialize producer start ……"); producer = new DefaultMQProducer();
producer.setNamesrvAddr(namesrvAddr); producer.setProducerGroup(producerGroup);
producer.setInstanceName(producerInstance); // Send message maximum 6M
producer.setMaxMessageSize(6 * 1024 * 1024); producer.start();
LOGGER.info(" Initialize producer complete "); } @PreDestroy public void destroy() {
LOGGER.info(" close rocketmq-producer connect ."); try { producer.shutdown(); } catch
(Exception e) { LOGGER.error(" close rocketmq-producer abnormal .", e); } } /** * send message * *
@param tag Message ID * @param body Message content */ public void send(String topic, String
tag, String key, String body) { try { Message msg = new Message(topic, tag,
key, body.getBytes("utf-8")); /*RMap<String, MqMessageVO> rMap =
redissonClient.getMap("RM:MESSAGE"); rMap.put(key, new MqMessageVO(msg));*/
RBucket<Object> bucket = redissonClient.getBucket("RM:MESSAGE:" + key);
bucket.set(new MqMessageVO(msg), 1, TimeUnit.DAYS); SendResult result =
producer.send(msg); if (!SendStatus.SEND_OK.equals(result.getSendStatus())) {
LOGGER.error(" Message push failed ,topic : {}, tag : {}, key : {}, body : {}", topic, tag,
key, body); } LOGGER.info(" send out MQ state ==> msgId:{}, key:{}, state :{}",
result.getMsgId(), msg.getKeys(), result.getSendStatus()); } catch (Exception
e) { LOGGER.error(" Abnormal delivery message .", e); } } /** * mq default :messageDelayLevel:1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * @param topic * @param tag *
@param key * @param body * @param delayTimeLevel 1-18 Respectively corresponding to the above time */ public void
send(String topic, String tag, String key, String body, int delayTimeLevel) {
try { Message msg = new Message(topic, tag, key, body.getBytes("utf-8"));
msg.setDelayTimeLevel(delayTimeLevel); RBucket<Object> bucket =
redissonClient.getBucket("RM:MESSAGE:" + key); bucket.set(new MqMessageVO(msg),
1, TimeUnit.DAYS); SendResult result = producer.send(msg); if
(!SendStatus.SEND_OK.equals(result.getSendStatus())) {
LOGGER.error(" Message push failed ,topic : {}, tag : {}, key : {}, body : {}", topic, tag,
key, body); } LOGGER.info(" send out MQ state ==> msgId:{}, key:{}, state :{}",
result.getMsgId(), msg.getKeys(), result.getSendStatus()); } catch (Exception
e) { LOGGER.error(" Delivery message exception .", e); } } }
2, Consumer code :
package com.ryfchina.ipay.regionalpay.mq; import
com.ryfchina.ipay.regionalpay.common.kit.SpringUtil; import
com.ryfchina.ipay.regionalpay.vo.MqMessageVO; import jodd.util.StringUtil;
import org.apache.commons.lang3.StringUtils; import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import
org.apache.rocketmq.common.message.MessageExt; import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import
org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import
org.slf4j.Logger; import org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.beans.factory.annotation.Value; import
org.springframework.core.annotation.Order; import
org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import java.lang.reflect.Method; import
java.util.List; import java.util.concurrent.TimeUnit; /** * Message middleware receives messages
This service must be loaded after initialization of each business module * @since JDK 1.6 */ @Service @Order(10) public class
MqConsumer { private static final Logger logger =
LoggerFactory.getLogger(MqConsumer.class); /** * Message middleware service address */
@Value("${rocketmq.url}") private String namesrvAddr;
@Value("${rocketmq.consumer.group}") private String consumerGroup;
@Value("${rocketmq.consumer.instance}") private String consumerInstance; /** *
Subscribed topic */ @Value("${rocketmq.consumer.subscribeTopics}") private String
subscribeTopics; @Autowired private RedissonClient redissonClient; @Autowired
private SpringUtil springUtil; private DefaultMQPushConsumer consumer; /*** *
Execute thread method */ @PostConstruct public void start() {
logger.info("******** Initialize message middleware consumer start **************"); consumer = new
DefaultMQPushConsumer(); consumer.setConsumerGroup(consumerGroup);
consumer.setInstanceName(consumerInstance);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setMessageModel(MessageModel.CLUSTERING); try { String[] topics =
subscribeTopics.split(","); for (String topic : topics) {
if(StringUtil.isNotBlank(topic.trim())){ consumer.subscribe(topic.trim(), "*");
} } // The first time the program starts fetching data from the message queue header
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(1); consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(100); consumer.registerMessageListener(new
MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
MessageExt messageExt = list.get(0); String keys = messageExt.getKeys();
logger.info(" Message delay level :{}, Message resend times :{}", messageExt.getDelayTimeLevel(),
messageExt.getReconsumeTimes()); logger.info("mq consumption , orderId : {}, msgId : {}",
keys, messageExt.getMsgId()); try { // Business logic processing return
ConsumeConcurrentlyStatus.RECONSUME_LATER;// For testing , Intentional return ConsumeConcurrentlyStatus.RECONSUME_LATER
} catch (Exception e) { logger.error("MQ Abnormal consumption .", e); return
ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start();
logger.info("******** Initialize message middleware consumer complete **************"); } catch (Exception e) {
logger.error(" Message middleware consumption exception .", e); } } @PreDestroy public void destroy() {
logger.info(" close rocketmq-consumer connect ."); try { consumer.shutdown(); } catch
(Exception e) { logger.error(" close rocketmq-consumer abnormal .", e); } } }
3, test :
package com.ryfchina.ipay.regionalpay; import
com.ryfchina.ipay.regionalpay.mq.MqProducer; import org.junit.Test; import
org.junit.runner.RunWith; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.boot.test.context.SpringBootTest; import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * Unit
test for simple App. */ @RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CmsApplication.class) public class AppTest {
@Autowired private MqProducer mqProducer; @Test public void test() {
mqProducer.send("System_WithDrawOrderQuery_Topic",
"com.ryfchina.ipay.regionalpay.service.WithdrawOrderXysjService:withdrawQuery",
"654321", "654321", 9); } }
4, test result :
Producer send log :
[2019-04-10 16:33:10.306] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqProducer.send(MqProducer.java:111)
send out MQ state ==> msgId:C0A80D652E6818B4AAC231E682260000, key:654321, state :SEND_OK
Consumer consumption log :
First consumption , Because when the producer sends , Set the delay level to 9, Find the gap 5m after , Consumer consumption .
[2019-04-10 16:38:10.368] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :9, Message resend times :0

Here is the consumer Failure Return ConsumeConcurrentlyStatus.RECONSUME_LATER,broker retransmission , Consumer re consumption log :
Last interval 10s
[2019-04-10 16:38:20.600] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :3, Message resend times :1
Last interval 30s
[2019-04-10 16:38:50.641] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :4, Message resend times :2
Last interval 1m
[2019-04-10 16:39:50.675] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :5, Message resend times :3
Last interval 2m
[2019-04-10 16:41:50.775] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :6, Message resend times :4
Last interval 3m
[2019-04-10 16:44:50.830] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :7, Message resend times :5
Last interval 4m
[2019-04-10 16:48:50.873] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :8, Message resend times :6
Last interval 5m
[2019-04-10 16:53:50.919] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :9, Message resend times :7
Last interval 6m
[2019-04-10 16:59:50.968] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :10, Message resend times :8
Last interval 7m
[2019-04-10 17:06:51.083] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :11, Message resend times :9
Last interval 8m
[2019-04-10 17:14:51.141] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :12, Message resend times :10
Last interval 9m
[2019-04-10 17:23:51.209] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :13, Message resend times :11
Last interval 10m
[2019-04-10 17:33:51.238] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :14, Message resend times :12
Last interval 20m
[2019-04-10 17:53:51.313] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :15, Message resend times :13
Last interval 30m
[2019-04-10 18:23:51.370] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :16, Message resend times :14
Last interval 1h
[2019-04-10 19:23:51.429] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :17, Message resend times :15
Last interval 2h
[2019-04-10 21:23:51.469] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
Message delay level :18, Message resend times :16

5, Test results can also be obtained from external see

summary :
Send delay message :
rocketmq The default delay level is :
messageDelayLevel:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
When sending a message , Set delay level , It's going to take a while , Consumer can consume this message .

When consumers consume news , Because of business logic or exception return ConsumeConcurrentlyStatus.RECONSUME_LATER after , If not Context.setDelayLevelWhenNextConsume(); that broker By default, the delay level of retry message will be set as 3, Every retry , Delay level +1, Default retry 16 second , If the 16 Times still return ConsumeConcurrentlyStatus.RECONSUME_LATER, that broker Will put this message in the delete queue !
Through consumer.setMaxReconsumeTimes(); Set number of retries . Or by logic , for example :if(messageExt.getReconsumeTimes()
> 3){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}

Technology
©2019-2020 Toolsou All rights reserved,
One and a half years JAVA Summary of work experience Jsp+Ajax+Servlet+Mysql Add, delete, modify and query ( one ) cartoon | CPU Warfare 40 year , The real king finally appeared ! Don't annoy the panda with any cat !「 Kung Fu Panda 」20 It's the year of man 4 blood IAR Installation and use tutorial Classical algorithm - recursion ( The case of raw rabbit ) Thorough explanation from Zhongtai Random forest R Language implementation R Language cluster analysis case These songs , Programmers, don't listen !