当前位置: 首页 > news >正文

武进网站制作公司/seo排名点击软件

武进网站制作公司,seo排名点击软件,做a视频在线观看网站,西宁今天最新官方消息Springboot 配置使用 RabbitMQ 并实现延时队列前言一、安装 RabbitMQWindows安装Linux安装二、新建项目1、引入依赖2、配置 yml3、启动类开启 Rabbitmq 注解4、配置 provider 的 RabbitmqConfig5、provider 发送消息6、consumer 接收消息7、演示三、延时队列1、在 provider 的 …

Springboot 配置使用 RabbitMQ 并实现延时队列

  • 前言
  • 一、安装 RabbitMQ
    • Windows安装
    • Linux安装
  • 二、新建项目
    • 1、引入依赖
    • 2、配置 yml
    • 3、启动类开启 Rabbitmq 注解
    • 4、配置 provider 的 RabbitmqConfig
    • 5、provider 发送消息
    • 6、consumer 接收消息
    • 7、演示
  • 三、延时队列
    • 1、在 provider 的 RabbitmqConfig 中增加配置
    • 2、在 provider 增加一个接口
    • 3、consumer 接收延时消息
  • 总结


前言

RabbitMQ作用:举几个例子,1、系统解耦,A系统无需关心B系统是否执行成功,无需等待B系统响应,直接把操作扔给mq就可以干其他事情了。2、系统使用高峰期,每秒产生10000条消息需要存储,一次性存入数据库恐怕不太行,所以先把数据发送到 RabbitMQ ,然后设置延时队列,每秒从队列取出1000条存入数据库,这样可以减少数据库压力。3、购买商品下订单以后,发送到延时队列,如果20分钟后没有付款,则从队列删除订单,也就是自动取消订单,如果支付了,则取出存入数据库,下单成功。


一、安装 RabbitMQ

Windows安装

太简单,自己bing一下

Linux安装

rabbitmq需要erlang语言环境
更新 apt 库,安装 erlang 环境,然后执行 erl 查看是否安装成功

apt update
apt install erlang
erl

安装 rabbitmq

apt install rabbitmq-server

查看 rabbitmq 运行状态

systemctl status rabbitmq-server

开启图形化管理界面,然后就可以访问 ip:15672,默认账号密码是 guest

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

默认的guest用户是只能通过本机访问的,所以远程管理后台界面登录需要配置个用户,才能通过外网浏览器访问

#账号root,密码root
rabbitmqctl add_user root root
# 设置为管理员账户
rabbitmqctl set_user_tags root administrator
# 分配所有权限
rabbitmqctl set_permissions -p / root “.*” “.*” “.*”

开放防火墙 5672 和 15672 端口

# Debian/Ubuntu ufw
ufw allow 5672
ufw allow 15672
ufw reload
# Debian/Ubuntu iptables(这个叼毛防火墙好麻烦,我没用过,不知道是不是这样)
iptables -A INPUT -p tcp --dport 5672 -j ACCEPT
iptables -A INPUT -p tcp --dport 15672 -j ACCEPT
iptables-restore
# CentOS
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload

二、新建项目

新建一个 provider 一个 consumer,两个 springboot 项目,都需要引入下面的依赖,或者新建的时候勾选自动添加 rabbitmq 的依赖
在这里插入图片描述

1、引入依赖

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope>
</dependency>

2、配置 yml

provider 和 consumer 都这样配置,端口改成不一样就行了

server:port: 8081
spring:rabbitmq:host: 192.168.0.105port: 5672username: rootpassword: rootvirtualHost: /# 确认机制publisher-confirm-type: correlated# 发布确认,如果不配置确认机制,发布确认也不用配置publisher-returns: true

3、启动类开启 Rabbitmq 注解

consumer 和 provider 都需要这个注解

在这里插入图片描述

4、配置 provider 的 RabbitmqConfig

大家可以根据 15672 那个图形化管理界面看看下面的一些概念

  • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息的载体,每个消息都会被投到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
  • Producer:消息生产者,就是投递消息的程序.
  • Consumer:消息消费者,就是接受消息的程序.
  • Channel:消息通道,在客户端的每个连接里,可建立多个channel.
package icu.xuyijie.provider.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.web.filter.CharacterEncodingFilter;import java.util.HashMap;
import java.util.Map;/*** @author 徐一杰* @date 2022/9/30 9:38* @description*/
@SpringBootConfiguration
public class RabbitmqConfig {public static final String QUEUE_MESSAGE = "queue_message";public static final String QUEUE_ORDER = "queue_order";public static final String EXCHANGE_A = "exchange_A";/*** # 是通配符,可以匹配任意,如下面的可以匹配到 aa.bb.message.cc.dd* 还有 * 是匹配一个.里面的字符,如 .*.message 只能匹配 .aa.message,不能匹配 .aa.bb.message*/public static final String ROUTING_KEY_MESSAGE = "#.message.#";public static final String ROUTING_KEY_ORDER = "#.order.#";/*** 这个可以不写,写了的话,以这个为准,会覆盖掉 yml 的配置* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.105", 5672);connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");//确认机制connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//发布确认,如果不配置确认机制,发布确认也不用配置connectionFactory.setPublisherReturns(true);return connectionFactory;}/*** 使用 yml 连接的话这个可以不写,这个是配合 connectionFactory 的* RabbitMQ的使用入口。scope必须是prototype类型* @return*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(this.connectionFactory());template.setMessageConverter(this.jsonMessageConverter());template.setMandatory(true);return template;}/*** 序列化json* @return*/@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic CharacterEncodingFilter characterEncodingFilter() {CharacterEncodingFilter filter = new CharacterEncodingFilter();filter.setEncoding("UTF-8");filter.setForceEncoding(true);return filter;}/*** 声明交换机*  针对消费者配置*  设置交换机类型*  将队列绑定到交换机*    FanoutExchange: 将消息分发到所有的绑定队列,无routing key的概念*    HeadersExchange:通过添加属性key-value匹配*    DirectExchange: 按照routing key分发到指定队列*    TopicExchange: 多关键字匹配* @return*/@Beanpublic Exchange exchangeA(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_A).durable(true).build();}/*** 声明QUEUE_MESSAGE队列* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。* 一般设置一下队列的持久化就好,其余两个就是默认false* @return*/@Beanpublic Queue queueMessage(){return new Queue(QUEUE_MESSAGE, true, false, false);}/*** 声明QUEUE_ORDER队列* @return*/@Beanpublic Queue queueOrder(){return new Queue(QUEUE_ORDER);}/*** 队列绑定交换机,指定routingKey* @return*/@Beanpublic Binding bindingQueueMessage(){return BindingBuilder.bind(queueMessage()).to(exchangeA()).with(ROUTING_KEY_MESSAGE).noargs();}/*** 队列绑定交换机,指定routingKey* @return*/@Beanpublic Binding bindingQueueOrder(){return BindingBuilder.bind(queueOrder()).to(exchangeA()).with(ROUTING_KEY_ORDER).noargs();}}

5、provider 发送消息

我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback,sendExchange 方法没有使用回调方法,使用回调方法的话需要像 sendCallback 方法一样多传一个值 correlationId

package icu.xuyijie.provider.controller;import icu.xuyijie.provider.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;
import java.util.concurrent.ExecutionException;/*** @author 徐一杰* @date 2022/9/30 10:08* @description*/
@RestController
@RequestMapping("/provider")
public class ProviderController implements RabbitTemplate.ConfirmCallback {/*** 我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置* 回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback*/private final RabbitTemplate rabbitTemplate;public ProviderController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;//设置回调为当前类对象this.rabbitTemplate.setConfirmCallback(this);}@RequestMapping("/sendExchange")public void sendExchange(){//使用rabbitTemplate发送消息String message = "这是一条发送到exchangeA的消息";/*** 参数:* 1、交换机名称* 2、routingKey* 3、消息内容*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message);}/*** 如果使用回调方法,则需要多传一个参数 correlationId*/@RequestMapping("/sendCallback")public void sendCallback(){String message = "这是一条发送到exchangeA的消息";//构建回调id为uuidString callBackId = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(callBackId);//发送消息到消息队列rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message, correlationId);System.out.println("发送回调id: " + callBackId);}/*** 消息回调确认方法* @param correlationData 请求数据对象* @param ack 是否发送成功* @param s*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {assert correlationData != null;System.out.println("这是回调方法打印的:回调id: " + correlationData.getId());try {System.out.println("这是回调方法打印的:回调message: " + correlationData.getFuture().get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}if (ack) {System.out.println("这是回调方法打印的:消息发送成功");} else {System.out.println("消息发送失败" + s);}}}

6、consumer 接收消息

@RabbitListener就是监听的队列,可以监听多个

package icu.xuyijie.consumer.consumer;import com.rabbitmq.client.Channel;
import icu.xuyijie.consumer.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 徐一杰* @date 2022/9/30 10:10* @description*/
@Component
public class ReceiveHandler {@RabbitListener(queues = {"queue_message", "queue_order"})public void receiveMessage(Message message, Channel channel){System.out.println("接收 queue_message 和 queue_order 队列的消息 " + message);System.out.println("对应Channel " + channel);}@RabbitListener(queues = {"queue_order"})public void receiveOrder(Message message, Channel channel){System.out.println("接收 queue_order  队列的消息" + message);System.out.println("对应Channel " + channel);}}

7、演示

我们直接调用 sendCallback 这个接口

在这里插入图片描述

consumer 接收到消息

在这里插入图片描述

provider 触发回调方法

在这里插入图片描述


三、延时队列

1、在 provider 的 RabbitmqConfig 中增加配置

增加了一个一个交换机、一个队列、一个路由键的配置,注意 delayQueue() 方法,我的注释有解释

	public static final String QUEUE_DELAY = "queue_delay";public static final String EXCHANGE_DELAY = "exchange_delay";public static final String ROUTER_DELAY_KEY = "router_delay_key";/*** 延迟交换机** @return*/@Beanpublic DirectExchange delayExchange() {return new DirectExchange(EXCHANGE_DELAY);}/*** 延迟队列* map 的设置意思是接收此队列的延迟消息需要监听 EXCHANGE_RECEIVE 队列,直接监听 QUEUE_DELAY 无法实现延时队列** @return*/@Beanpublic Queue delayQueue() {Map<String, Object> map = new HashMap<>(16);map.put("x-dead-letter-exchange", EXCHANGE_A);map.put("x-dead-letter-routing-key", ROUTING_KEY_ORDER);return new Queue(QUEUE_DELAY, true, false, false, map);}/*** 给延迟队列绑定交换机** @return*/@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTER_DELAY_KEY);}

2、在 provider 增加一个接口

发送消息到我们刚刚配置的延时交换机

	/*** 给延迟队列发送消息*/@RequestMapping("/sendDelay")public void sendDelay(){rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_DELAY, RabbitmqConfig.ROUTER_DELAY_KEY, "这是一条延时队列的消息", message -> {message.getMessageProperties().setExpiration("3000");return message;}, new CorrelationData(UUID.randomUUID().toString()));System.out.println("延时队列发送成功");}

3、consumer 接收延时消息

注意,上面我们把延时消息发送到了延时队列 EXCHANGE_DELAY,但是我们接收,要监听 RabbitmqConfig中 delayQueue() 方法 配置的 EXCHANGE_A,路由键为 ROUTING_KEY_ORDER,也就是说无需改动 consumer ,consumer 的两个方法都能收到延时消息,因为他们都监听了 ROUTING_KEY_ORDER 对应的队列

直接调用 sendDelay 方法,2次(因为两个方法都监听 queue_order,所以他们会交替获得消息)

在这里插入图片描述
3秒后 consumer 的两个方法都能接收到延时消息

在这里插入图片描述


总结

http://www.lbrq.cn/news/1573417.html

相关文章:

  • 邯郸做移动网站费用/网站搜索引擎优化
  • 打开网站后直接做跳转页面吗/推广联系方式
  • 昆山张浦做网站/产品策划方案怎么做
  • 织梦建站教程视频/seo是什么地方
  • 做pc端网站咨询/南京百度搜索优化
  • 网站推广服务合同/免费的网页制作软件
  • 做视频网站怎么赚钱/广州网站优化多少钱
  • 如何制作动态网页/南宁seo优势
  • 网站开发 脚本怎么写/深圳seo优化排名
  • 网站建设华科技/百度网站制作联系方式
  • wordpress内容里的图片大小/怎么优化关键词
  • 网站建设的几点体会/郑州seo线上推广系统
  • 政府网站建设文案/百度top排行榜
  • 网站开发域名注册功能/个人接app推广单去哪里接
  • 网站推广软件破解版/市场营销培训课程
  • 合肥公司建设网站/网络营销策划书总结
  • 徐州集团网站建设方案/网络营销的特点和优势
  • ppt做的最好的网站有哪些/合肥网站制作
  • 电子商务网站建设主要内容/百度注册网站
  • 建什么网站可以赚钱/河南网站建站推广
  • 网站怎么做百度百科/网站友链查询接口
  • 广州网站建设找新际/企业网站推广方法实验报告
  • 网站添加qq在线客服/网站搜索引擎优化的基本内容
  • 给视频做特效的网站/网站运营及推广方案
  • 做的比较好的法律实务培训网站/百度推广销售员好做吗
  • 网站需要多少钱/如何自建网站?
  • 正规网站建设团队是什么/网站seo关键词优化
  • 个人网页设计作品简约/广州排前三的seo公司
  • java 就是做网站的吗/上海网站推广广告
  • 网站换了服务器/网站优化塔山双喜
  • GPT-o3回归Plus用户,GPT5拆分三种模式,对标Grok
  • Cookies和Sessions
  • 计算机毕设不知道选什么题目?基于Spark的糖尿病数据分析系统【Hadoop+Spark+python】
  • 91、23种经典设计模式
  • 1 JQ6500语音播报模块详解(STM32)
  • Java数据结构之ArrayList