陕西省建设八大员官方网站网站建设黄页免费观看
如果想 发 一个消息, 能 被多个消费者消费, 这时候 就得用到发布订阅模型
举列: 类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
假如需要实现以下需求,注册完用户 发短信 和 发邮件
注意:发布订阅模式下,routingKey没有用,所以直接全部设置成""
生产者核心代码
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(exchange,"",null,msg.getBytes());
消费者核心代码,新建多个消息者,同时队列的名称改下,其余的代码一样
channel.queueBind(queue_name, EXCHANGE_NAME,"");
1 开发生产者代码:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接Connection connection = RabbitmqConnection.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//每次只向消费者发送一条消息,消费者使用后,手动确认后,才会发送另外一条channel.basicQos(1);for (int i = 0; i <10 ; i++) {String msg="消息-"+i;//默认情况下代理服务器端是存在一个""名字的exchange的,String exchange= EXCHANGE_NAME;//路由键String routingKey="";AMQP.BasicProperties props=null;//发送消息channel.basicPublish(exchange,routingKey,props,msg.getBytes());}channel.close();connection.close();}
}
2 开发消费者1的代码
import com.rabbitmq.client.*;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Recv_1 {private static final String queue_name="queue_PublishSubscribe_email";private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = RabbitmqConnection.getConnection();// 创建通道final Channel channel = connection.createChannel();//是否持久化Boolean durable=false;//是否排外的Boolean exclusive=false;//是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除Boolean autoDelete=false;Map<String, Object> arguments=null;//创建队列声明,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(queue_name,durable,exclusive,autoDelete,arguments);//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//将队列绑定到交换机,routingKey设置为""channel.queueBind(queue_name, EXCHANGE_NAME,"");channel.basicQos(1);//定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body, "utf-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {//手动确认,// false表示只确认当前这条消息已收到,// ture表示在当前这条消息及之前(小于 DelivertTag )的所有未确认的消息都已收到.channel.basicAck(envelope.getDeliveryTag(),false);}}};//消费者确认,// false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,boolean autoAck=false;//监听队列channel.basicConsume(queue_name,autoAck,defaultConsumer);}
}
3 开发消费者2的代码
import com.rabbitmq.client.*;
import util.RabbitmqConnection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Recv_2 {private static final String queue_name="queue_PublishSubscribe_phone";private static final String EXCHANGE_NAME ="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = RabbitmqConnection.getConnection();// 创建通道final Channel channel = connection.createChannel();//是否持久化Boolean durable=false;//是否排外的Boolean exclusive=false;//是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除Boolean autoDelete=false;Map<String, Object> arguments=null;//创建队列声明,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(queue_name,durable,exclusive,autoDelete,arguments);//声明交换机(fanout分发:发布/订阅模式)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//将队列绑定到交换机,routingKey设置为""channel.queueBind(queue_name, EXCHANGE_NAME,"");channel.basicQos(1);//定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body, "utf-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {//手动确认,// false表示只确认当前这条消息已收到,// ture表示在当前这条消息及之前(小于 DelivertTag )的所有未确认的消息都已收到.channel.basicAck(envelope.getDeliveryTag(),false);}}};//消费者确认,// false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,boolean autoAck=false;//监听队列channel.basicConsume(queue_name,autoAck,defaultConsumer);}
}
测试
一个消息 可以被多个消费者获取