当前位置: 首页 > news >正文

陕西省建设八大员官方网站网站建设黄页免费观看

陕西省建设八大员官方网站,网站建设黄页免费观看,liunx做网站跳转服务器,北京企业做网站费用如果想 发 一个消息, 能 被多个消费者消费, 这时候 就得用到发布订阅模型 举列: 类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者) 解读: 1、1 个生产者,多个消费者 2、每一个消费者都有自己的一个队列 3、生产…

如果想 发 一个消息, 能 被多个消费者消费, 这时候 就得用到发布订阅模型

举列: 类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
 
解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的  

假如需要实现以下需求,注册完用户 发短信 和 发邮件

注意:发布订阅模式下,routingKey没有用,所以直接全部设置成""

生产者核心代码

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(exchange,"",null,msg.getBytes());

消费者核心代码,新建多个消息者,同时队列的名称改下,其余的代码一样
channel.queueBind(queue_name, EXCHANGE_NAME,"");

1 开发生产者代码:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接Connection connection = RabbitmqConnection.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//每次只向消费者发送一条消息,消费者使用后,手动确认后,才会发送另外一条channel.basicQos(1);for (int i = 0; i <10 ; i++) {String msg="消息-"+i;//默认情况下代理服务器端是存在一个""名字的exchange的,String exchange= EXCHANGE_NAME;//路由键String routingKey="";AMQP.BasicProperties props=null;//发送消息channel.basicPublish(exchange,routingKey,props,msg.getBytes());}channel.close();connection.close();}
}

2 开发消费者1的代码

import com.rabbitmq.client.*;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Recv_1 {private static final String queue_name="queue_PublishSubscribe_email";private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = RabbitmqConnection.getConnection();// 创建通道final Channel channel = connection.createChannel();//是否持久化Boolean durable=false;//是否排外的Boolean exclusive=false;//是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除Boolean autoDelete=false;Map<String, Object> arguments=null;//创建队列声明,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(queue_name,durable,exclusive,autoDelete,arguments);//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//将队列绑定到交换机,routingKey设置为""channel.queueBind(queue_name, EXCHANGE_NAME,"");channel.basicQos(1);//定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body, "utf-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {//手动确认,// false表示只确认当前这条消息已收到,// ture表示在当前这条消息及之前(小于 DelivertTag )的所有未确认的消息都已收到.channel.basicAck(envelope.getDeliveryTag(),false);}}};//消费者确认,// false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,boolean autoAck=false;//监听队列channel.basicConsume(queue_name,autoAck,defaultConsumer);}
}

3 开发消费者2的代码

import com.rabbitmq.client.*;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Recv_2 {private static final String queue_name="queue_PublishSubscribe_phone";private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = RabbitmqConnection.getConnection();// 创建通道final Channel channel = connection.createChannel();//是否持久化Boolean durable=false;//是否排外的Boolean exclusive=false;//是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除Boolean autoDelete=false;Map<String, Object> arguments=null;//创建队列声明,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(queue_name,durable,exclusive,autoDelete,arguments);//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//将队列绑定到交换机,routingKey设置为""channel.queueBind(queue_name, EXCHANGE_NAME,"");channel.basicQos(1);//定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body, "utf-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {//手动确认,// false表示只确认当前这条消息已收到,// ture表示在当前这条消息及之前(小于 DelivertTag )的所有未确认的消息都已收到.channel.basicAck(envelope.getDeliveryTag(),false);}}};//消费者确认,// false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,boolean autoAck=false;//监听队列channel.basicConsume(queue_name,autoAck,defaultConsumer);}
}

测试 
一个消息 可以被多个消费者获取 

http://www.lbrq.cn/news/2727091.html

相关文章:

  • 可以做样机图的网站seo线上培训班
  • 公司网站制作注意什么卡一卡二卡三入口2021
  • 网站如何做长尾词排名谷歌广告联盟官网
  • 品牌的佛山网站建设怎么找拉新推广平台
  • 如何自己做门户网站益阳网站seo
  • 建站快车官网如何做百度关键词推广
  • 找公司做网站注意事项优化系统的软件
  • 安微省建设厅网站seo推广优化外包公司
  • 个人备案能做什么网站电商培训有用吗
  • 网站访问量大打不开舆情视频
  • 用php做动态网站google海外推广
  • 微信做兼职什么网站好6个好用的bt种子搜索引擎
  • 手机wap网站制作免费最全的百度网盘搜索引擎
  • 网站怎么做适配谷歌网站网址
  • 营口网站开发建网站哪个平台好
  • 网站制作样板网站seo检测
  • 网站怎么做隐藏内容seo和sem的联系
  • 郑州最近新闻事件汕头seo快速排名
  • 西安谁家做网站湖南seo
  • 网站建设教育类旧式网站seo网络推广是什么意思
  • 做黄页网站要告我上海百度搜索优化
  • 万江区网站仿做北京seo优化厂家
  • 网站制作预付款会计分录百度下载安装 官方
  • 建设银行确认参加面试网站怎么做营销推广
  • 制作网站的程序如何做宣传推广效果最好
  • 厦门关键词seo排名网站最新国内新闻重大事件
  • wordpress 时间线seo关键词排名报价
  • 知名室内设计网站谷歌排名算法
  • 怎么设计海报图片郑州seo排名工具
  • 网站推广培训哪里好seo整站优化费用
  • Linux软件编程-进程(2)及线程(1)
  • SaltStack 基础
  • 架构需求规格说明(ARD):项目成功的隐形引擎
  • 【新手入门】Android基础知识(一):系统架构
  • 【C#】PNG 和 JPG、JPEG的应用以及三种格式的区别?
  • anaconda创建pytorch1.10.0和pytorch2.0.0的GPU环境