perl网站开发建立免费个人网站
RocketMQ发送三种类型消息:同步消息,异步消息和单向消息,其中前两种是可靠,因为会有发送是否成功的应答,而最后一种没有应答,一般用于可靠性要求比较低的情况,例如日志收集。
1.Produce端发送同步消息:这种使用比较普遍,比如重要的消息通知,短信通知。
public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
2.发送异步消息:异步消息通常在对响应时间敏感的业务场景,即发送端不能容忍长时间等待Broker的响应。
public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 100; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
3.单向发送消息:这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
4.顺序消息发送:消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**
* Producer,发送顺序消息
*/
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 订单列表List<OrderStep> orderList = new Producer().buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间前缀String body = dateStr + " Hello RocketMQ " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg; //根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}
/*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}}/*** 生成模拟订单数据*/private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
5.发送延迟消息:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}
6.发送批量消息:批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(messages);
} catch (Exception e) {e.printStackTrace();//处理error
}
7.Logappender样例:RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例
按下面样例使用log4j属性配置
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
按下面样例使用log4j xml配置来使用异步添加日志
<appender name="mqAppender1"class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"><param name="Tag" value="yourTag" /><param name="Topic" value="yourLogTopic" /><param name="ProducerGroup" value="yourLogGroup" /><param name="NameServerAddress" value="yourRocketmqNameserverAddress"/><layout class="org.apache.log4j.PatternLayout"><param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /></layout>
</appender>
<appender name="mqAsyncAppender1"class="org.apache.log4j.AsyncAppender"><param name="BufferSize" value="1024" /><param name="Blocking" value="false" /><appender-ref ref="mqAppender1"/>
</appender>
log4j2样例
用log4j2时,配置如下,如果想要非阻塞,只需要使用异步添加引用即可
<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"topic="yourLogTopic" tag="yourTag"><PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>
logback样例
<appender name="mqAppender1"class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"><tag>yourTag</tag><topic>yourLogTopic</topic><producerGroup>yourLogGroup</producerGroup><nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress><layout><pattern>%date %p %t - %m%n</pattern></layout>
</appender>
<appender name="mqAsyncAppender1"class="ch.qos.logback.classic.AsyncAppender"><queueSize>1024</queueSize><discardingThreshold>80</discardingThreshold><maxFlushTime>2000</maxFlushTime><neverBlock>true</neverBlock><appender-ref ref="mqAppender1"/>
</appender>