当前位置: 首页 > news >正文

西安大网站建设公司/拓客软件排行榜

西安大网站建设公司,拓客软件排行榜,电脑培训零基础培训班,英文企业网站带后台有数据库企业中一般都会封装rocketmq 同步 异步 单向方法&#xff0c;你只需要配置好nameserver地址 topic tag 消息体等&#xff0c;然后调用封装方法进行发送即可。 流程差不多如下 1、导入mq依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifa…

企业中一般都会封装rocketmq 同步 异步 单向方法,你只需要配置好nameserver地址 topic tag 消息体等,然后调用封装方法进行发送即可。

流程差不多如下

1、导入mq依赖

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>

2、消息发送者步骤
创建消息生成者producer,指定组名
指定Nameserver mq地址
启动生产者
创建消息对象,指定Topic、Tag和消息体
发送消息
关闭生产者

3、消费者步骤
创建消费者Consumer,指定组名
指定Nameserver地址
订阅主题Topic和Tag
(listener监听消息,去消费先到的消息)
设置回调函数,处理消息
启动消费者

同步 异步 单向底层区别

//同步SendResult send = producer.send(msg);//异步producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("===" + sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println("e:" + throwable);}});//单向producer.sendOneway(msg);

异步相对同步来说send方法多了些参数SendCallback sendCallback。而这个sendCallback重写的方法有成功和失败的方法。
单向相对同步来说换了一个方法sendOneway.
在跟踪源码去看到
异步
在这里插入图片描述
同步
在这里插入图片描述

具体实现方法
一下源码传参异步,单向的区别在
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
以及switch里面
switch(communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
return sendResult;
}
default:
break label122;
}

具体如下
sendDefaultImpl方法(核心:所有同步、异步、单向)

    private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//校验 Producer 处于运行状态this.makeSureStateOK();//校验消息格式Validators.checkMessage(msg, this.defaultMQProducer);long invokeID = this.random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;//获取Topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;//发送的队列MessageQueue mq = null;//异常Exception exception = null;//最后一次发送结果SendResult sendResult = null;//计算调用发送消息到成功为止的最大次数:同步、单向调用一次 ;异步3次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//第几次发送int times = 0;//存储每次发送消息现在brokerString[] brokersSent = new String[timesTotal];while(true) {label122: {String info;if (times < timesTotal) {info = null == mq ? null : mq.getBrokerName();//选择要发送的消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mqSelected.getBrokerName();long endTimestamp;try {beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout >= costTime) {//Kernel内核的意思,调用发送消息核心方法sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();//更新Broker可用性信息,在选择发送到的消息队列时,会参考Broker发送消息的延迟。源码多次操作Broker,可见不懂Broker就无法真正理解mq源码。this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch(communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {return sendResult;}default:break label122;}}callTimeout = true;} catch (RemotingException var26) {//当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。//这也是我们在生产环境看到可能出现多次重复发送的情况。//更新Broker可用信息,更新继续循环endTimestamp = System.currentTimeMillis();this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);this.log.warn(msg.toString());exception = var26;break label122;} catch (MQClientException var27) {//更新Broker可用信息,继续循环endTimestamp = System.currentTimeMillis();this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);this.log.warn(msg.toString());exception = var27;break label122;} catch (MQBrokerException var28) {//更新Broker可用信息,部分情况下异常返回,结束循环endTimestamp = System.currentTimeMillis();this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var28);this.log.warn(msg.toString());exception = var28;switch(var28.getResponseCode()) {case 1:case 14:case 16:case 17:case 204:case 205:break label122;//如果有发送结果,返回,没有,抛出异常default:if (sendResult != null) {return sendResult;}throw var28;}} catch (InterruptedException var29) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var29);this.log.warn(msg.toString());this.log.warn("sendKernelImpl exception", var29);this.log.warn(msg.toString());throw var29;}}}if (sendResult != null) {return sendResult;}info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");MQClientException mqClientException = new MQClientException(info, (Throwable)exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(10001);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(10002);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(10003);}throw mqClientException;}++times;}} else {List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null != nsList && !nsList.isEmpty()) {throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);} else {throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);}}}

但是异步是在该sendDefaultImpl方法之前做一个异步处理
ExecutorService :那么mq异步用了线程池来提高异步请求的效率!!

    @Deprecatedpublic void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException {final long beginStartTime = System.currentTimeMillis();ExecutorService executor = this.getCallbackExecutor();try {executor.submit(new Runnable() {public void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {DefaultMQProducerImpl.this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception var4) {sendCallback.onException(var4);}} else {sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException var9) {throw new MQClientException("executor rejected ", var9);}}

在这里插入图片描述
NettyRemotingClient定义了多线程

在这里插入图片描述
并没有发现是哪种特殊的多线程,因此是通用的多线程。就好比map和hashmap,最后用了map一样。

倒回来看,这里是多线程执行,大大提高了运行效率,并且代码严谨和规范,有判定条件,不同程度的try catch
在这里插入图片描述

线程池执行sendSelectImpl方法,传了msg,async异步,sendCallback(成功和失败的方法,发送消息不用等待响应,不管有没有成功或失败,生产者还会发送消息),timeout - costTime(最大等待时间默认3000-消耗时间)
大概看一下入参的情况
在这里插入图片描述
这里timeTotal默认是1,异步的时候是3,影响后面循环次数。
yi
那么for循环里面要做什么事情呢
选择消息要发送的队列,准备发送
调用核心发送消息方法
更新Broker可用性信息
上面这个类sendDefaultImpl,有详细注释。
在这里插入图片描述
其实还可以继续深入下去,可以模仿rocketmq,自己写一个自定义mq框架那也是很厉害了!!!

http://www.lbrq.cn/news/1446715.html

相关文章:

  • 上海网站设计外包/seo入门书籍推荐
  • 深圳建设网站上市/企业网站seo
  • 酒店网站建设便宜/平台seo什么意思
  • 武汉 网站设计公司/百度搜索引擎技巧
  • 鞍山网站建设/宁波网络推广方式
  • 莱芜招聘的网站/种子搜索神器
  • 有网站开发专业吗/中国国家人事人才培训网
  • 公司刚做网站在那里找图片做/软文标题
  • 呼伦贝尔做网站/seo站长工具是什么
  • 晚上网站推荐靠谱的2021/学大教育培训机构怎么样
  • 能在线做实验的网站/找回今日头条
  • 乐清做网站的公司/专业制作网页的公司
  • 乐陵市住房和城乡建设局网站/搜狗seo排名软件
  • 哪个网站做任务赚钱/百度收录网站要多久
  • 给小学生做家教的网站/百度企业
  • web手机版网站开发框架/如何建立一个自己的网站啊
  • 如何做网站反链/全国疫情最新报告
  • 做网站的公司需要什么资质/百度网站关键词排名查询
  • 南博会官方网站建设投入/代码编程教学入门
  • 外链都没有的网站如何做排名的/google搜索
  • app和网站开发哪个难/门户网站制作
  • ps做网站页面设置为多大/制作网页设计公司
  • 网站备案号如何获得/互联网项目推广是什么
  • 福建银瑞建设工程有限公司网站/营销型网站策划书
  • 对其网站建设进行了考察调研/个人如何注册网站
  • 网站开发的目的及意义/营销策划方案模板
  • 网站的种类有哪些/宁波seo关键词如何优化
  • 长沙的汽车网站建设/营销方案怎么写
  • 网站网页设计在哪找/企业seo整站优化方案
  • 做设计用的素材下载网站有哪些/关键词推广软件
  • Cherryusb UAC例程对接STM32内置ADC和DAC播放音乐和录音(中)=>UAC+STM32 ADC+DAC实现录音和播放
  • 【Python 工具人快餐 · 第 2 份】
  • 【Linux】Tomcat
  • 微信原生小程序 Timeline 组件实现
  • MyBatis注解开发与接口映射:现代化ORM开发的技术革新
  • 用户组权限及高级权限管理:从基础到企业级 sudo 提权实战