当前位置: 首页 > news >正文

perl网站开发建立免费个人网站

perl网站开发,建立免费个人网站,网站建设实训 课程标准,网站没有备案 合法吗RocketMQ发送三种类型消息:同步消息,异步消息和单向消息,其中前两种是可靠,因为会有发送是否成功的应答,而最后一种没有应答,一般用于可靠性要求比较低的情况,例如日志收集。 1.Produce端发送同…

RocketMQ发送三种类型消息:同步消息,异步消息和单向消息,其中前两种是可靠,因为会有发送是否成功的应答,而最后一种没有应答,一般用于可靠性要求比较低的情况,例如日志收集。

1.Produce端发送同步消息:这种使用比较普遍,比如重要的消息通知,短信通知。

public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

2.发送异步消息:异步消息通常在对响应时间敏感的业务场景,即发送端不能容忍长时间等待Broker的响应。

public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 100; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

3.单向发送消息:这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

4.顺序消息发送:消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**
* Producer,发送顺序消息
*/
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 订单列表List<OrderStep> orderList = new Producer().buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间前缀String body = dateStr + " Hello RocketMQ " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  //根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}
  /*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}}/*** 生成模拟订单数据*/private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

5.发送延迟消息:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

6.发送批量消息:批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(messages);
} catch (Exception e) {e.printStackTrace();//处理error
}

7.Logappender样例:RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例

按下面样例使用log4j属性配置

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

按下面样例使用log4j xml配置来使用异步添加日志

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"><param name="Tag" value="yourTag" /><param name="Topic" value="yourLogTopic" /><param name="ProducerGroup" value="yourLogGroup" /><param name="NameServerAddress" value="yourRocketmqNameserverAddress"/><layout class="org.apache.log4j.PatternLayout"><param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /></layout>
</appender>
<appender name="mqAsyncAppender1"class="org.apache.log4j.AsyncAppender"><param name="BufferSize" value="1024" /><param name="Blocking" value="false" /><appender-ref ref="mqAppender1"/>
</appender>

log4j2样例

用log4j2时,配置如下,如果想要非阻塞,只需要使用异步添加引用即可

<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"topic="yourLogTopic" tag="yourTag"><PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>

logback样例

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"><tag>yourTag</tag><topic>yourLogTopic</topic><producerGroup>yourLogGroup</producerGroup><nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress><layout><pattern>%date %p %t - %m%n</pattern></layout>
</appender>
<appender name="mqAsyncAppender1"class="ch.qos.logback.classic.AsyncAppender"><queueSize>1024</queueSize><discardingThreshold>80</discardingThreshold><maxFlushTime>2000</maxFlushTime><neverBlock>true</neverBlock><appender-ref ref="mqAppender1"/>
</appender>

 

 

http://www.lbrq.cn/news/2598823.html

相关文章:

  • 用六类网站做电话可以吗沈阳今日新闻头条
  • 凡科建网站怎么做阴影立体军事新闻今日最新消息
  • 什么网站上做推广效果比较好百度竞价排名广告定价鲜花
  • 益阳建设局网站北京新闻最新消息
  • php网站制作商品结算怎么做网站优化方法
  • 织梦整形医院网站开发网站搭建策略与方法
  • 家用电脑如何做网站服务器小学生摘抄新闻2024
  • 如何确认建设银行网站不是假的淘宝关键词排名查询工具免费
  • wordpress 获取子页面如何点击优化神马关键词排名
  • 自己有网站 做app凡科建站代理
  • 搭建网站 阿里云网站建设优化的技巧
  • 鞍山网站建设鞍山百度推广代理怎么加盟
  • dreamweaver安装包株洲seo快速排名
  • 网站建设公司机构爱站网长尾关键词搜索
  • 网站建设题目以及答案营销方式方案案例
  • 网站开发及企业推广51趣优化网络seo工程师教程
  • 西宁企业网站营销推广软文营销文章
  • 中国能建平台淘宝seo排名优化的方法
  • 网站开发工程师简历百度竞价推广方案范文
  • 如何外贸seo网站建设电商网站设计
  • 惠州建设局官方网站淘宝标题优化网站
  • web网站开发流程全国各大新闻网站投稿
  • 个人备案网站能做商城吗整合营销方案怎么写
  • asp网站伪静态教程上海推广seo
  • 启迪网站建设重庆seo教程搜索引擎优化
  • 站群网站内容恢复正常百度
  • 本地唐山网站建设windows优化大师要会员
  • 网站代百度指数平台
  • 嘉定区做网站网店培训
  • c 博客网站开发教程宁波网站推广网站优化
  • 数据结构04 栈和队列
  • 检索召回率优化探究四:基于LangChain0.3集成Milvu2.5向量数据库构建的智能问答系统
  • 数据集相关类代码回顾理解 | np.mean\transforms.Normalize\transforms.Compose\xxx.transform
  • Apache IoTDB(3):时序数据库 IoTDB Docker部署实战
  • 源代码本地安装funasr
  • Python篇---环境变量软件安装