当前位置: 首页 > news >正文

河北建设集团有限公司 信息化网站网络营销软文范文

河北建设集团有限公司 信息化网站,网络营销软文范文,什么网站能赚钱,邮箱下不了wordpress遇到的问题&#xff1a;消费者的topic一直注册不成功 解决&#xff1a;发现我JAVA Web工程中用的rocketMQ的版本是4.3.0&#xff0c;而我用的rocketmq服务端版本是4.2.0。然后把工程中的版本统一成4.2.0就可以了。 <dependency> <groupId>org.apache…

遇到的问题:消费者的topic一直注册不成功

解决:发现我JAVA Web工程中用的rocketMQ的版本是4.3.0,而我用的rocketmq服务端版本是4.2.0。然后把工程中的版本统一成4.2.0就可以了。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.2.0</version>
        </dependency>

 

XML模式的RocketMQ见这篇


一. 启动RocketMQ环境,如上几篇博客的配置。

1. 启动mqnamesrv

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release  

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功打印:INFO main - The Name Server boot success

2. 启动broker

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release  

nohup sh bin/mqbroker -n localhost:9876 &

老版本 nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

tail -f ~/logs/rocketmqlogs/broker.log // 查看broker日志

启动成功打印:The broker[TF012778.local, 10.50.62.53:10911] boot success

3. 关闭服务器

sh bin/mqshutdown broker    //停止 broker

sh bin/mqshutdown namesrv   //停止 nameserver

以上见下图:

二。消费端MqConsumer源码

package com.JXWork.util.MQ;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 接口说明** @author TF12778 2019/10/28 16:48*/
@Component
public class MqConsumer implements InitializingBean {private static final Logger log = LoggerFactory.getLogger(MqConsumer.class);@Value("${consumer.group}")private String consumerGroup;@Value("${producer.connect.list}")private String PRODUCER_CONNECT_LIST;@Value("${party.topic}")private String PARTY_TOPIC;@Value("${test.topic}")private String TEST_TOPIC;private static DefaultMQPushConsumer consumer = null;@Overridepublic void afterPropertiesSet() throws Exception {init();}private void init() {try {consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(PRODUCER_CONNECT_LIST);consumer.setInstanceName("Consumer");consumer.subscribe(PARTY_TOPIC, "");consumer.subscribe(TEST_TOPIC, "*");//注册消费的监听//在此监听中消费信息,并返回消费的状态信息consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(getMessageListenerConcurrently());//Launch the consumer instance.consumer.start();log.info("MsgConsumer init - success.");} catch (MQClientException e) {e.printStackTrace();}}private MessageListenerConcurrently getMessageListenerConcurrently() {return (msgs, context) -> {for (MessageExt msg : msgs) {log.info("收到的Topic:{},消息内容:{}" ,msg.getTopic() ,new String(msg.getBody()));if (PARTY_TOPIC.equals(msg.getTopic())) {handleChangePartyPhoneResult(msg);} else if (TEST_TOPIC.equals(msg.getTopic())) {handleChangePartyPhoneResult(msg);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;};}private ConsumeConcurrentlyStatus handleChangePartyPhoneResult(MessageExt msg) {log.info("handleChangePartyPhoneResult handle,:{}", JSON.toJSONString(msg));String msgStr = new String(msg.getBody());if (msgStr.length() == 0) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 解析json字符串JSONObject jsonObject = JSONObject.parseObject(msgStr);if (StringUtils.isBlank(jsonObject.getString("mobilenumber"))|| StringUtils.isBlank(jsonObject.getString("partyid"))|| StringUtils.isBlank(jsonObject.getString("event"))|| !"CHANGE_BINDING".equals(jsonObject.getString("event"))) {log.info("MessageListener handleChangePartyPhoneResult 参数异常, jsonObject={}", jsonObject);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 具体业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

三。生产者MqProducer源码

package com.JXWork.util.MQ;import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** 接口说明** @author TF12778 2019/10/28 16:43*/@Component
public class MqProducer implements InitializingBean {private static final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);@Value("${producer.group}")private String PRODUCER_GROUP;@Value("${producer.connect.list}")private String PRODUCER_CONNECT_LIST;public static DefaultMQProducer rocketMqProducer = null;@Overridepublic void afterPropertiesSet() throws Exception {try {rocketMqProducer = new DefaultMQProducer(PRODUCER_GROUP);rocketMqProducer.setNamesrvAddr(PRODUCER_CONNECT_LIST);rocketMqProducer.start();LOGGER.info("RocketMqProducer init - success.");} catch (MQClientException e) {LOGGER.info("RocketMqProducer init - error");}}/*** 方法 sendMsg 作用描述: rocketmq发送消息方法** @param topic      组名* @param tagName    同一个topic下的不同 分支,同一个消费者只能取一个组的下的不同的tag分支* @param key        保持唯一* @param msgBody    消息体*/public static void sendMsgIntime(String topic, String tagName, String key, byte[] msgBody) {//,int levelMessage msg = new Message(topic, tagName, key, msgBody);try {String result = rocketMqProducer.send(msg).toString();LOGGER.info("send rockmq uuid:" + new String(msgBody, "UTF-8") + " " + result);} catch (Exception e) {LOGGER.error("msgBody:" + new String(msgBody), e);}}public static void sendMsgIntimeNoUUID(String topic, String tagName, String key, byte[] msgBody) {//,int levelMessage msg = new Message(topic, tagName, key, msgBody);try {String result = rocketMqProducer.send(msg).toString();LOGGER.info("生产者 send msg 2 rockmq " + new String(msgBody) + " " + result);} catch (Exception e) {LOGGER.error("msgBody:" + new String(msgBody), e);}}public static void main(String[] xx) throws Exception {try {rocketMqProducer = new DefaultMQProducer("ljMQProducerGroup");rocketMqProducer.setNamesrvAddr("127.0.0.1:9876");rocketMqProducer.start();LOGGER.info("RocketMqProducer init - success.");} catch (MQClientException e) {LOGGER.info("RocketMqProducer init - error");}Map<String, Object> msg = new HashMap<>();msg.put("carNum", "浙A888888");msg.put("code", "778882222");// consumer.subscribe("partyTopic", "partyTopicTags");MqProducer.sendMsgIntime("PARTYTOPIC", "", "",(JSON.toJSONString(msg)).getBytes());}}

application.properties内容:

producer.group=ljMQProducerGroup
producer.connect.list=localhost:9876consumer.group=ljMQConsumerGroupparty.topic=partyTopic
test.topic=testTopic

四。测试

    @Testpublic void mqTest() {Map<String, Object> msg = new HashMap<>();msg.put("carNum", "浙A888888");msg.put("code", "778882222");MqProducer.sendMsgIntime("partyTopic", "", "",(JSON.toJSONString(msg)).getBytes());MqProducer.sendMsgIntime("testTopic", "", "",(JSON.toJSONString("Test RocketMQ")).getBytes());}

测试结果 :

http://www.lbrq.cn/news/2778337.html

相关文章:

  • 蓝鸟E4A做网站程序网页制作官方网站
  • 佛山java web网站开发营销型网站建设专家
  • 百浪科技做网站怎么样html网页制作软件有哪些
  • 企业网站的建立联系方式东莞有哪些做推广的网站
  • 成品网站价格表b2b平台有哪些平台
  • 哈尔滨网站建设模板策划电商运营培训课程
  • 男科免费咨询短视频搜索优化
  • 网站数据统计怎么做搜索引擎营销方法
  • 免费建设个人手机网站系统优化工具
  • 做网站用什么字体比较好企业网站seo推广
  • 做网站能赚钱吗知乎网络营销推广平台有哪些
  • 成都有哪些网站建设的公司网推技巧
  • 网站建设协议需要注意的问题百度广告优化师
  • php动态网站怎么做的如何注册网站平台
  • 库尔勒网站全免费建立自己的网站
  • 手机装修设计软件app云南seo网络优化师
  • 如何选择网站建设企业网络推广软件
  • 做网站 过程长沙的seo网络公司
  • 南山区做网站公司宁波seo教程网
  • 做赚钱的网站今日国际军事新闻最新消息
  • 做p2p网站多少钱aso优化重要吗
  • 做网站需要编码吗百度中心
  • 沈阳德泰诺网站建设公司 概况学营销app哪个更好
  • 建设拍卖网站google安卓版下载
  • 网站制作软件有哪些seo站长工具下载
  • 重庆市建设工程造价信息网爬虫上海牛巨微seo关键词优化
  • 网站建设目标责任深圳seo技术
  • vps可以做wordpress和ssr唐山seo排名
  • 济南高新区 网站建设太原网站快速排名优化
  • 做的最好的相亲网站有哪些独立站谷歌seo
  • 案例分享:BRAV-7123助力家用型人形机器人,智能生活未来已来
  • 4位量化:常规的线性层被替换成了4位线性层(48)
  • 四川方言语音识别数据集,1500小时合规真人采集,高质量标注助力ASR与大模型训练
  • ISIS高级特性
  • Qt密码生成器项目开发教程 - 安全可靠的随机密码生成工具
  • 深度学习必然用到的概率知识