河北建设集团有限公司 信息化网站网络营销软文范文
遇到的问题:消费者的topic一直注册不成功
解决:发现我JAVA Web工程中用的rocketMQ的版本是4.3.0,而我用的rocketmq服务端版本是4.2.0。然后把工程中的版本统一成4.2.0就可以了。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
XML模式的RocketMQ见这篇
一. 启动RocketMQ环境,如上几篇博客的配置。
1. 启动mqnamesrv
cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功打印:INFO main - The Name Server boot success
2. 启动broker
cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release
nohup sh bin/mqbroker -n localhost:9876 &
老版本 nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log // 查看broker日志
启动成功打印:The broker[TF012778.local, 10.50.62.53:10911] boot success
3. 关闭服务器
sh bin/mqshutdown broker //停止 broker
sh bin/mqshutdown namesrv //停止 nameserver
以上见下图:
二。消费端MqConsumer源码
package com.JXWork.util.MQ;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 接口说明** @author TF12778 2019/10/28 16:48*/
@Component
public class MqConsumer implements InitializingBean {private static final Logger log = LoggerFactory.getLogger(MqConsumer.class);@Value("${consumer.group}")private String consumerGroup;@Value("${producer.connect.list}")private String PRODUCER_CONNECT_LIST;@Value("${party.topic}")private String PARTY_TOPIC;@Value("${test.topic}")private String TEST_TOPIC;private static DefaultMQPushConsumer consumer = null;@Overridepublic void afterPropertiesSet() throws Exception {init();}private void init() {try {consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(PRODUCER_CONNECT_LIST);consumer.setInstanceName("Consumer");consumer.subscribe(PARTY_TOPIC, "");consumer.subscribe(TEST_TOPIC, "*");//注册消费的监听//在此监听中消费信息,并返回消费的状态信息consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(getMessageListenerConcurrently());//Launch the consumer instance.consumer.start();log.info("MsgConsumer init - success.");} catch (MQClientException e) {e.printStackTrace();}}private MessageListenerConcurrently getMessageListenerConcurrently() {return (msgs, context) -> {for (MessageExt msg : msgs) {log.info("收到的Topic:{},消息内容:{}" ,msg.getTopic() ,new String(msg.getBody()));if (PARTY_TOPIC.equals(msg.getTopic())) {handleChangePartyPhoneResult(msg);} else if (TEST_TOPIC.equals(msg.getTopic())) {handleChangePartyPhoneResult(msg);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;};}private ConsumeConcurrentlyStatus handleChangePartyPhoneResult(MessageExt msg) {log.info("handleChangePartyPhoneResult handle,:{}", JSON.toJSONString(msg));String msgStr = new String(msg.getBody());if (msgStr.length() == 0) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 解析json字符串JSONObject jsonObject = JSONObject.parseObject(msgStr);if (StringUtils.isBlank(jsonObject.getString("mobilenumber"))|| StringUtils.isBlank(jsonObject.getString("partyid"))|| StringUtils.isBlank(jsonObject.getString("event"))|| !"CHANGE_BINDING".equals(jsonObject.getString("event"))) {log.info("MessageListener handleChangePartyPhoneResult 参数异常, jsonObject={}", jsonObject);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 具体业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}
三。生产者MqProducer源码
package com.JXWork.util.MQ;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** 接口说明** @author TF12778 2019/10/28 16:43*/@Component
public class MqProducer implements InitializingBean {private static final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);@Value("${producer.group}")private String PRODUCER_GROUP;@Value("${producer.connect.list}")private String PRODUCER_CONNECT_LIST;public static DefaultMQProducer rocketMqProducer = null;@Overridepublic void afterPropertiesSet() throws Exception {try {rocketMqProducer = new DefaultMQProducer(PRODUCER_GROUP);rocketMqProducer.setNamesrvAddr(PRODUCER_CONNECT_LIST);rocketMqProducer.start();LOGGER.info("RocketMqProducer init - success.");} catch (MQClientException e) {LOGGER.info("RocketMqProducer init - error");}}/*** 方法 sendMsg 作用描述: rocketmq发送消息方法** @param topic 组名* @param tagName 同一个topic下的不同 分支,同一个消费者只能取一个组的下的不同的tag分支* @param key 保持唯一* @param msgBody 消息体*/public static void sendMsgIntime(String topic, String tagName, String key, byte[] msgBody) {//,int levelMessage msg = new Message(topic, tagName, key, msgBody);try {String result = rocketMqProducer.send(msg).toString();LOGGER.info("send rockmq uuid:" + new String(msgBody, "UTF-8") + " " + result);} catch (Exception e) {LOGGER.error("msgBody:" + new String(msgBody), e);}}public static void sendMsgIntimeNoUUID(String topic, String tagName, String key, byte[] msgBody) {//,int levelMessage msg = new Message(topic, tagName, key, msgBody);try {String result = rocketMqProducer.send(msg).toString();LOGGER.info("生产者 send msg 2 rockmq " + new String(msgBody) + " " + result);} catch (Exception e) {LOGGER.error("msgBody:" + new String(msgBody), e);}}public static void main(String[] xx) throws Exception {try {rocketMqProducer = new DefaultMQProducer("ljMQProducerGroup");rocketMqProducer.setNamesrvAddr("127.0.0.1:9876");rocketMqProducer.start();LOGGER.info("RocketMqProducer init - success.");} catch (MQClientException e) {LOGGER.info("RocketMqProducer init - error");}Map<String, Object> msg = new HashMap<>();msg.put("carNum", "浙A888888");msg.put("code", "778882222");// consumer.subscribe("partyTopic", "partyTopicTags");MqProducer.sendMsgIntime("PARTYTOPIC", "", "",(JSON.toJSONString(msg)).getBytes());}}
application.properties内容:
producer.group=ljMQProducerGroup
producer.connect.list=localhost:9876consumer.group=ljMQConsumerGroupparty.topic=partyTopic
test.topic=testTopic
四。测试
@Testpublic void mqTest() {Map<String, Object> msg = new HashMap<>();msg.put("carNum", "浙A888888");msg.put("code", "778882222");MqProducer.sendMsgIntime("partyTopic", "", "",(JSON.toJSONString(msg)).getBytes());MqProducer.sendMsgIntime("testTopic", "", "",(JSON.toJSONString("Test RocketMQ")).getBytes());}
测试结果 :