深圳做网站的公司的区域成都百度seo优化公司
异步投递
activemq支持同步和异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间),主要受发送延时的影响,使用异步发送能够显著的提高发送的性能。
activemq默认使用异步发送的模式:
除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化消息,这两种情况是同步发送到。
如果没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到了磁盘。确认机制提供了消息安全的保障,但同时阻塞客户端也带来了很大的延迟。
所以允许在失败的情况下丢失少量的数据,可以使用异步发送来提高生产率。
异步发送可以最大化producer的发送效率。通常在发送消息量比较密集的情况下使用异步发送,可以提高producer的性能,但也会有额外的问题:
需要消耗较多的client内存,也会使broker端性能消耗增加,且不能有效的保证消息的发送成功。
所以在userAsyncSend=true
的情况下客户端需要容忍消息丢失的可能。
配置方式:有3种
tcp://106.13.187.36:61616?jms.useAsyncSend=true
activeMQConnectionFactory.setUseAsyncSend(true);
((ActiveMQConnection) connection).setUseAsyncSend(true);
异步发送如何确保成功
异步发送消息丢失的场景:
生产者设置useAsyncSend=true
,使用producer.send(msg)持续发送消息。
由于消息是不阻塞的,生产者会认为所有send的消息都是发送成功到mq的。但是如果mq突然宕机后,此时在生产者端内存中未发送的消息将会丢失。
所以,正确的异步发送是需要接收回调的。
同步发送和异步发送的区别:
同步发送完之后,send不阻塞了就表示发送成功了
异步发送完之后,需要接收回执并由客户端再次判断是否发送成功
通过ActiveMQMessageProducer实现的回调,之前的代码使用MessageProducer发送的消息
public class JmsProducerAsyncSend {public static final String ACTIVEMQ_URL = "tcp://localhost61616";public static final String USERNAME = "admin";public static final String PASSWORD = "hll123";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws Exception {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);// 设置异步发送activeMQConnectionFactory.setUseAsyncSend(true);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);// 使用ActiveMQMessageProducer实现回调ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);TextMessage textMessage = null;for (int i = 1; i <= 3; i++) {textMessage = session.createTextMessage("message=" + i);// 设置消息idtextMessage.setJMSMessageID(IdUtil.fastSimpleUUID() + "-callback");String msgId = textMessage.getJMSMessageID();// 实现异步回调,确认消息发送成功activeMQMessageProducer.send(textMessage, new AsyncCallback() {@Overridepublic void onSuccess() {System.out.println(msgId + " success");}@Overridepublic void onException(JMSException e) {System.out.println(msgId + " error");}});}activeMQMessageProducer.close();session.close();connection.close();System.out.println(" **** 消息发送到MQ完成 **** ");}
}