1,消费者代码:
package com.ryfchina.ipay.regionalpay.mq; import
com.ryfchina.ipay.regionalpay.vo.MqMessageVO; import
org.apache.rocketmq.client.producer.DefaultMQProducer; import
org.apache.rocketmq.client.producer.SendResult; import
org.apache.rocketmq.client.producer.SendStatus; import
org.apache.rocketmq.common.message.Message; import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient; import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.beans.factory.annotation.Value; import
org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** *
发送消息队列服务 * */ @Service("mqProducer") public class MqProducer { private static
final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);
@Value("${rocketmq.url}") private String namesrvAddr;
@Value("${rocketmq.producer.group}") private String producerGroup;
@Value("${rocketmq.producer.instance}") private String producerInstance;
private DefaultMQProducer producer; @Autowired private RedissonClient
redissonClient; @PostConstruct public void start() throws Exception {
LOGGER.info("初始化生产者开始……"); producer = new DefaultMQProducer();
producer.setNamesrvAddr(namesrvAddr); producer.setProducerGroup(producerGroup);
producer.setInstanceName(producerInstance); // 发送报文最大6M
producer.setMaxMessageSize(6 * 1024 * 1024); producer.start();
LOGGER.info("初始化生产者完成"); } @PreDestroy public void destroy() {
LOGGER.info("关闭rocketmq-producer连接。"); try { producer.shutdown(); } catch
(Exception e) { LOGGER.error("关闭rocketmq-producer异常。", e); } } /** * 发送消息 * *
@param tag 消息标识 * @param body 消息内容 */ public void send(String topic, String
tag, String key, String body) { try { Message msg = new Message(topic, tag,
key, body.getBytes("utf-8")); /*RMap<String, MqMessageVO> rMap =
redissonClient.getMap("RM:MESSAGE"); rMap.put(key, new MqMessageVO(msg));*/
RBucket<Object> bucket = redissonClient.getBucket("RM:MESSAGE:" + key);
bucket.set(new MqMessageVO(msg), 1, TimeUnit.DAYS); SendResult result =
producer.send(msg); if (!SendStatus.SEND_OK.equals(result.getSendStatus())) {
LOGGER.error("消息推送失败,topic : {}, tag : {}, key : {}, body : {}", topic, tag,
key, body); } LOGGER.info("发送MQ状态==> msgId:{}, key:{}, 状态:{}",
result.getMsgId(), msg.getKeys(), result.getSendStatus()); } catch (Exception
e) { LOGGER.error("投递消息异常。", e); } } /** * mq默认的:messageDelayLevel:1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * @param topic * @param tag *
@param key * @param body * @param delayTimeLevel 1-18分别对应上面的时间 */ public void
send(String topic, String tag, String key, String body, int delayTimeLevel) {
try { Message msg = new Message(topic, tag, key, body.getBytes("utf-8"));
msg.setDelayTimeLevel(delayTimeLevel); RBucket<Object> bucket =
redissonClient.getBucket("RM:MESSAGE:" + key); bucket.set(new MqMessageVO(msg),
1, TimeUnit.DAYS); SendResult result = producer.send(msg); if
(!SendStatus.SEND_OK.equals(result.getSendStatus())) {
LOGGER.error("消息推送失败,topic : {}, tag : {}, key : {}, body : {}", topic, tag,
key, body); } LOGGER.info("发送MQ状态==> msgId:{}, key:{}, 状态:{}",
result.getMsgId(), msg.getKeys(), result.getSendStatus()); } catch (Exception
e) { LOGGER.error("投递消息异常。", e); } } }
2,消费者代码:
package com.ryfchina.ipay.regionalpay.mq; import
com.ryfchina.ipay.regionalpay.common.kit.SpringUtil; import
com.ryfchina.ipay.regionalpay.vo.MqMessageVO; import jodd.util.StringUtil;
import org.apache.commons.lang3.StringUtils; import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import
org.apache.rocketmq.common.message.MessageExt; import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import
org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import
org.slf4j.Logger; import org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.beans.factory.annotation.Value; import
org.springframework.core.annotation.Order; import
org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import java.lang.reflect.Method; import
java.util.List; import java.util.concurrent.TimeUnit; /** * 消息中间件接收消息
此服务一定要在各业务模块的初始化完成后加载 * @since JDK 1.6 */ @Service @Order(10) public class
MqConsumer { private static final Logger logger =
LoggerFactory.getLogger(MqConsumer.class); /** * 消息中间件服务地址 */
@Value("${rocketmq.url}") private String namesrvAddr;
@Value("${rocketmq.consumer.group}") private String consumerGroup;
@Value("${rocketmq.consumer.instance}") private String consumerInstance; /** *
订阅的topic */ @Value("${rocketmq.consumer.subscribeTopics}") private String
subscribeTopics; @Autowired private RedissonClient redissonClient; @Autowired
private SpringUtil springUtil; private DefaultMQPushConsumer consumer; /*** *
执行线程方法 */ @PostConstruct public void start() {
logger.info("********初始化消息中间件消费者开始**************"); consumer = new
DefaultMQPushConsumer(); consumer.setConsumerGroup(consumerGroup);
consumer.setInstanceName(consumerInstance);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setMessageModel(MessageModel.CLUSTERING); try { String[] topics =
subscribeTopics.split(","); for (String topic : topics) {
if(StringUtil.isNotBlank(topic.trim())){ consumer.subscribe(topic.trim(), "*");
} } // 程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(1); consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(100); consumer.registerMessageListener(new
MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
MessageExt messageExt = list.get(0); String keys = messageExt.getKeys();
logger.info("消息延时等级:{}, 消息重发次数:{}", messageExt.getDelayTimeLevel(),
messageExt.getReconsumeTimes()); logger.info("mq消费, orderId : {}, msgId : {}",
keys, messageExt.getMsgId()); try { //业务逻辑处理 return
ConsumeConcurrentlyStatus.RECONSUME_LATER;//为了测试,故意返回ConsumeConcurrentlyStatus.RECONSUME_LATER
} catch (Exception e) { logger.error("MQ消费异常。", e); return
ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start();
logger.info("********初始化消息中间件消费者完成**************"); } catch (Exception e) {
logger.error("消息中间件消费异常。", e); } } @PreDestroy public void destroy() {
logger.info("关闭rocketmq-consumer连接。"); try { consumer.shutdown(); } catch
(Exception e) { logger.error("关闭rocketmq-consumer异常。", e); } } }
3,测试:
package com.ryfchina.ipay.regionalpay; import
com.ryfchina.ipay.regionalpay.mq.MqProducer; import org.junit.Test; import
org.junit.runner.RunWith; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.boot.test.context.SpringBootTest; import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * Unit
test for simple App. */ @RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CmsApplication.class) public class AppTest {
@Autowired private MqProducer mqProducer; @Test public void test() {
mqProducer.send("System_WithDrawOrderQuery_Topic",
"com.ryfchina.ipay.regionalpay.service.WithdrawOrderXysjService:withdrawQuery",
"654321", "654321", 9); } }
4,测试结果:
生产者发送日志:
[2019-04-10 16:33:10.306] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqProducer.send(MqProducer.java:111)
发送MQ状态==> msgId:C0A80D652E6818B4AAC231E682260000, key:654321, 状态:SEND_OK
消费者消费日志:
第一次消费,由于生产者发送的时候,设置了延时等级为9,发现相隔5m之后,消费者消费。
[2019-04-10 16:38:10.368] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:9, 消息重发次数:0

以下是消费者消费失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER,broker重发,消费者重新消费日志:
与上一次间隔10s
[2019-04-10 16:38:20.600] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:3, 消息重发次数:1
与上一次间隔30s
[2019-04-10 16:38:50.641] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:4, 消息重发次数:2
与上一次间隔1m
[2019-04-10 16:39:50.675] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:5, 消息重发次数:3
与上一次间隔2m
[2019-04-10 16:41:50.775] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:6, 消息重发次数:4
与上一次间隔3m
[2019-04-10 16:44:50.830] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:7, 消息重发次数:5
与上一次间隔4m
[2019-04-10 16:48:50.873] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:8, 消息重发次数:6
与上一次间隔5m
[2019-04-10 16:53:50.919] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:9, 消息重发次数:7
与上一次间隔6m
[2019-04-10 16:59:50.968] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:10, 消息重发次数:8
与上一次间隔7m
[2019-04-10 17:06:51.083] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:11, 消息重发次数:9
与上一次间隔8m
[2019-04-10 17:14:51.141] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:12, 消息重发次数:10
与上一次间隔9m
[2019-04-10 17:23:51.209] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:13, 消息重发次数:11
与上一次间隔10m
[2019-04-10 17:33:51.238] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:14, 消息重发次数:12
与上一次间隔20m
[2019-04-10 17:53:51.313] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:15, 消息重发次数:13
与上一次间隔30m
[2019-04-10 18:23:51.370] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:16, 消息重发次数:14
与上一次间隔1h
[2019-04-10 19:23:51.429] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:17, 消息重发次数:15
与上一次间隔2h
[2019-04-10 21:23:51.469] [ INFO] Caller+0 at
com.ryfchina.ipay.regionalpay.mq.MqConsumer$1.consumeMessage(MqConsumer.java:107)
消息延时等级:18, 消息重发次数:16

5,测试结果也可以从external看

总结:
发送延时消息:
rocketmq默认的延时等级是:
messageDelayLevel:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送消息的时候,设置延时等级,要等一段时间,消费者才能消费此消息。

当消费者消费消息的时候,因为业务逻辑或者异常返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,如果没有通过Context.setDelayLevelWhenNextConsume();那么broker会默认设置重试消息的延时等级是3,每重试一次,延时等级+1,默认重试16次,如果第16次仍然返回ConsumeConcurrentlyStatus.RECONSUME_LATER,那么broker会吧这个消息放入删除队列!
可以通过consumer.setMaxReconsumeTimes();设置重试次数。或者自己通过逻辑处理,例如:if(messageExt.getReconsumeTimes()
> 3){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}

技术
©2019-2020 Toolsou All rights reserved,
vue项目中对axios的全局封装单个按键控制多种流水灯状态软件测试之BUG描述随机森林篇 R语言实现TP6验证器的使用示例及正确验证数据C语言编程之查找某学号学生成绩一文揭秘阿里、腾讯、百度的薪资职级c语言的5种常用排序方法2021年1月程序员工资统计,平均14915元Bug数能否做为技术人员考核的KPI?