国内h5 css3网站什么是搜索引擎优化推广
RabbitMq的六种通讯模式分析详解
RabbitMQ 实现消息可靠性实例【原版】
实现消息可靠性原理
1、交换机持久化
2、队列持久化
3、消息持久化
4、设置消费模式为手动消费【如果设置自动消费消息在异常的情况下会丢失消息,设置成手动可以在异常的时候让他进行重试处理】
代码实现
RabbitMQAction
enum {ACCEPT, // 处理成功RETRY, // 可以重试的错误REJECT, // 无需重试的错误
}
AbsRabbitMQConfig
@Data
@Slf4j
public abstract class AbsRabbitMQConfig {private String username;private String password;private String virtualHost;// private String host;private String addresses;private int port;protected abstract ConnectionFactory createConnectionFactory() throws Exception;protected abstract RabbitTemplate createRabbitTemplate() throws Exception;protected abstract DirectExchange createDirectExchange();protected abstract DirectExchange createDirectExchangeForDelayed();/*** 初始化connectionFactory** @return* @throws Exception*/protected ConnectionFactory connectionFactory() throws Exception {log.info("开始初始化connectionFactory,addresses:" + addresses);CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses, port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setAddresses(addresses);connectionFactory.setPublisherReturns(true); //设置返回回调connectionFactory.setPublisherConfirms(true); // 必须要设置,设置确认回调,保证消息推送到交换机log.info("connectionFactory初始化完成,addresses:" + addresses);return connectionFactory;}/*** 创建rabbitTemplate** @return* @throws Exception*/protected RabbitTemplate rabbitTemplate() throws Exception {log.info("开始创建rabbit template,addresses:" + addresses);RabbitTemplate template = new RabbitTemplate(connectionFactory());log.info("rabbit template创建完成,addresses:" + addresses);return template;}/*** 创建directExchange** @param name* @param durable* @param autoDelete* @return*/protected DirectExchange directExchange(String name, boolean durable, boolean autoDelete) {log.info("开始创建directExchange,addresses:" + addresses);DirectExchange directExchange = new DirectExchange(name, durable, autoDelete);log.info("directExchange创建完成,addresses:" + addresses);return directExchange;}/*** 创建延迟交换机** @param name* @param durable* @param autoDelete* @return*/protected DirectExchange directExchangeForDelayed(String name, boolean durable, boolean autoDelete) {log.info("开始创建directExchangeForDelayed,addresses:" + addresses);DirectExchange directExchange = new DirectExchange(name, durable, autoDelete);directExchange.setDelayed(true);log.info("directExchangeForDelayed创建完成,addresses:" + addresses);return directExchange;}
}
RabbitMQConfig
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMQConfig extends AbsRabbitMQConfig {public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";public static final String DIRECT_QUEUE_A = "queue";public static final String DIRECT_ROUTE_KEY_A = "direct.a.key";@Override@Beanprotected ConnectionFactory createConnectionFactory() throws Exception {return super.connectionFactory();}@Override@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)protected RabbitTemplate createRabbitTemplate() throws Exception {return super.rabbitTemplate();}@Override@Beanprotected DirectExchange createDirectExchange() {// String name,交换机名字// boolean durable , 交换机是否持久化// boolean autoDelete, 交换机未使用时是否就删除return super.directExchange(DIRECT_EXCHANGE, true, false);}@Overrideprotected DirectExchange createDirectExchangeForDelayed() {return super.directExchangeForDelayed(null, true, false);}@Beanpublic Queue directQuery() {//设置队列持久化// String queue,// boolean durable , 为true时server重启队列不会消失,是否持久化// boolean exclusive,如果为true只能被一个connection使用,其他连接建立时会抛出异常// boolean autoDelete,当没有任何消费者使用时,自动删除该队列// Map<String, Object> argumentsreturn new Queue(DIRECT_QUEUE_A, true, false, false);}/*** 队列和交换机进行绑定** @param directQuery 名字要和上面方法一致* @param createDirectExchange 名字要和上面方法一致* @return*/@Beanpublic Binding binding(Queue directQuery, DirectExchange createDirectExchange) {return BindingBuilder.bind(directQuery).to(createDirectExchange).with(DIRECT_ROUTE_KEY_A);}/*** 接受消息的监听* 针对消费者配置* 消费者实现ChannelAwareMessageListener的onMessage方法即可监听到directQuery()的消息** @return*/@Beanpublic SimpleMessageListenerContainer QueueMessageContainer() throws Exception {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory());//设置容器队列container.setQueues(directQuery());container.setExposeListenerChannel(true);//设置最大消费者数量container.setMaxConcurrentConsumers(3);//设置最小消费者数量container.setConcurrentConsumers(2);//设置确认模式手工确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置消息监听container.setMessageListener(new DirectConsumer());return container;}
}
RabbitMQCallback
@Component
@Slf4j
public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {/***yml配置publisher-confirms: true #消息发送到交换机确认机制,是否确认回调publisher-returns: true #消息发送到交换机确认机制,是否返回回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {//消息发送到交换机成功代码会走到这里log.info("消息发送到交换机成功,id:{}", correlationData.getId());} else {//发消息设置错误的交换机代码会走到这里log.error("消息发送到交换机失败,原因:{}", cause);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//发消息设置错误的路由代码会走到这里log.debug("消息:{},应答码:{},原因:{},交换机:{},路由:{}", message, replyCode, replyText, exchange, routingKey);}protected RabbitTemplate rabbitTemplate;/*** 构造方法初始化RabbitTemplate,实现多例,RabbitMQConfig配置@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)* 如果使用自动注入RabbitTemplate将抛出Only one ConfirmCallback is supported by each RabbitTemplate异常** @param rabbitTemplate*/public RabbitMQCallback(RabbitTemplate rabbitTemplate) {/*如果消息没有到exchange,则confirm回调,ack=false如果消息到达exchange,则confirm回调,ack=trueexchange到queue成功,则不回调returnexchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)*///消息成功发送到交换机时触发ConfirmCallbackthis.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);//消息处理失败返回给队列rabbitTemplate.setMandatory(true);//设置返回回调rabbitTemplate.setReturnCallback(this);}
}
DirectProducer
@Component
@Slf4j
public class DirectProducer extends RabbitMQCallback {public DirectProducer(RabbitTemplate rabbitTemplate) {super(rabbitTemplate);}public void send(Object obj) {String message = JSON.toJSONString(obj);log.info(">>>>>>发送消息:{}", message);//生成消息idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", ""));rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTE_KEY_A, message, msgConstruction -> {//设置消息持久化msgConstruction.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msgConstruction;}//消息id, correlationData);//设置错误的交换机【为了测试回调】//rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE + "c", RabbitMQConfig.DIRECT_ROUTE_KEY_A, message, correlationData);}
}
DirectConsumer
@Component
@Slf4j
public class DirectConsumer implements ChannelAwareMessageListener {/*@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_A)public void consume(Message message, Channel channel) {}*/@Overridepublic void onMessage(Message message, Channel channel) throws Exception {RabbitMQAction rabbitMQAction = RabbitMQAction.ACCEPT;//消息在队列中的索引Long tag = 0L;try {//MessageProperties// String contentType, String contentEncoding, Map<String, Object> headers, Integer deliveryMode, Integer priority, String correlationId, String replyTo, String expiration, String messageId, Date timestamp, String type, String userId, String appId, String clusterId)message.getMessageProperties().setContentType("text");tag = message.getMessageProperties().getDeliveryTag();log.info(">>>>>>>>>>接收消息:{}", new String(message.getBody()));
// int i = 1 / 0;} catch (Exception e) {log.error(">>>>>>>发生异常准备重试");rabbitMQAction = RabbitMQAction.RETRY;e.printStackTrace();} finally {try {// 通过finally块来保证Ack/Nack会且只会执行一次if (rabbitMQAction == RabbitMQAction.ACCEPT) {log.info(">>>>>>>手动消费消息成功!");channel.basicAck(tag, true);// 重试} else if (rabbitMQAction == RabbitMQAction.RETRY) {//long deliveryTag,tag// boolean multiple, 表示不提交但是requeue之后会从新处理// boolean requeue,是否从新入队,true表示是,否表示从队列删除log.error(">>>>>>>重试中。。。");channel.basicNack(tag, false, true);Thread.sleep(2000L);// 拒绝消息也相当于主动删除mq队列的消息} else {log.error(">>>>>>消息被拒!消息从mq中删除");channel.basicNack(tag, false, false);}} catch (Exception e) {e.printStackTrace();}}}
}
TestController
@RestController
@Slf4j
public class TestController extends BaseController {@Autowiredprivate DirectProducer directProducer;@GetMapping("test")public BaseResult test() {BaseResult<Object> vo = new BaseResult<>();vo.setRespCode("000");vo.setRespMsg("测试成功");directProducer.send("测试成功");return vo;}}
yml
server:servlet:context-path: /port: 8088spring:application:rabbitmq:#rabbitmq配置port: 15672username: rootpassword: rootaddresses: localhostvirtual-host: /
pom
多加如下依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
测试【前提下载rabbitMQ–依赖erlang环境】
http://localhost:8088/test
小知识
RabbitMQ可以解决高并发问题,jmeter模拟1000000个高并发请求毫无问题,注意模拟高并发你的电脑会异常发热哦!注意你的电脑不要被烧坏了,一般十万是OK的,一百万的话处理要一定时间,期间你的电脑会持续发热 如何使用jmeter模拟并发请求