网站开发代理报价表/网站制作公司有哪些
文章目录
- 1. 没有交换机 和 有交换机的 情况对比
- 2. 交换机(exchanges)的 概念
- 3. 交换机 绑定(bindings)
- 4. 交换机 Fanout模式(发布、订阅模式)
- 5. 交换机 直接交换机(Direct exchange)
- 6. 交换机 主题交换机(Topics)
- 6.1 主题交换机 概述
- 6.2 主题交换机 案例
- 7. 死信的 概念
- 8. 死信的 代码架构图
- 9. 死信 代码案例(模拟消息TTL过期,进入死信队列)
- 10. 死信 代码案例(模拟队列达到最大长度,进入死信队列)
- 11. 死信 代码案例(消息被拒绝 进入死信队列)
1. 没有交换机 和 有交换机的 情况对比
没有交换机的情况下:
- 这种模式是
简单模式(也有叫工作模式,简单队列,工作队列)
。 - 消息会基于策略(像轮询)发给消费者,并且一个消息只能被消费一次。也就是C1拿到了一个消息,C2就没法获得这个消息。
有交换机的情况下:
- 这种模式成为
发布、订阅模式
。 - 同样一个消息只能被消费一次,但是交换机可以多个相同的消息,给多个消费者。
2. 交换机(exchanges)的 概念
RabbitMQ消息传递模型的核心思想是:
- 生产者的消息从不会直接发送到队列。
- 就是生产者都不知道这些消息传递到了哪些队列中。
其实之前写过的代码并不是违背了这个原则,之前的代码交换机参数为“”空字符串
,使用的是默认的交换机。
交换机作用:
- 两个作用:一方面接受来自生产者的消息,另一方面将它们推入队列。
- 交换机必须确切知道如何处理收到的消息:是应该传给特定队列,还是许多队列,还是丢弃它们。
交换机类型:
- 直接(direct)类型,也叫路由类型。
- 主题(topic)类型。
- 扇出(fanout)类型。
- 无名类型(默认类型,通常用""空字符串标识)。
临时队列概念:
- 就是一旦我们断开了消费者连接,队列将被自动删除。
声明临时队列的方式:
String queueName = schannel.queueDeclare().getQueue();
3. 交换机 绑定(bindings)
可以在网页上面添加交换机:
通过使用RoutingKey来进行绑定,每个RoutingKey都会有一个对应值,根据对应值来判断应该发送给那个队列:
4. 交换机 Fanout模式(发布、订阅模式)
Fanout模式其实就类似于广播。
消费者代码演示:
- 代码相同,copy两份使用就可以。
package com.itholmes.rabbitmq.five;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** 消费者:* 消息接受*/
public class ReceiveLogs01 {//交换机名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机。*/channel.exchangeDeclare(EXCHANGE_NAME,"fanout");/*** 生成一个临时队列:* 队列的名称是随机的。* 当消费者断开与队列的连接的时候,队列就会自动删除。*/String queueName = channel.queueDeclare().getQueue();/*** 绑定交换机与队列* 绑定的队列名,交换机名,routingKey值*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接受消息,巴接受到的消息打印到屏幕上....");//接受消息回调DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("ReceiveLogs01控制台打印接受到的消息:"+new String(message.getBody(),"UTF-8"));};//接受消息channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});//这里直接匿名lamba表达式简单写一下。}}
生产者代码演示:
package com.itholmes.rabbitmq.five;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** 生产者:* 发消息给交换机*/
public class EmitLog {//交换机名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明交换机,如果后台rabbitmq服务中已经声明了交换机就不用再次声明。channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();//这里就要指定交换机名称和routingkey(消费者后面指定的是""空字符串,这里同样。)channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}
因为是广播式,所以我们的RoutingKey都统一设置为一样的,这里我们统一设置为了“”空字符串。
5. 交换机 直接交换机(Direct exchange)
直接交换机就是RoutingKey不同。
消费者代码:
- 修改绑定routingKey就行。
package com.itholmes.rabbitmq.six;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiverLogsRirect01 {//交换机名public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机。* 这里就要声明为直接模式* BuiltinExchangeType是枚举,一般都是这样表示。*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列名channel.queueDeclare("console",false,false,false,null);/*** 绑定交换机与队列* 绑定的队列名,交换机名,routingKey值。*///一个队列可以绑定多个RoutingKey。channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");System.out.println("等待接受消息,巴接受到的消息打印到屏幕上....");//接受消息回调DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("ReceiverLogsRirect01控制台打印接受到的消息:"+new String(message.getBody(),"UTF-8"));};//接受消息channel.basicConsume("console",true,deliverCallback,consumerTag ->{});//这里直接匿名lamba表达式简单写一下。}}
生产者代码:
- 同样发送给哪个交换机,并且告诉交换机走哪个RoutingKey。
package com.itholmes.rabbitmq.six;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;/*** 生产者:* 发消息给交换机*/
public class DirectLogs {//交换机名称public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();//这里就要指定交换机名称和routingkey(消费者后面指定的是""空字符串,这里同样。)channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}
6. 交换机 主题交换机(Topics)
6.1 主题交换机 概述
主题交换机要比上面两个模式更加灵活高效,方便。
主题交换机的注意事项:
- 主题交换机的routing_key必须是一个单词列表,以点号分隔开。
- 单词列表最多不能超过255个字节。
- *星号可以代替一个单词。
- #井号可以替代另个或多个单词。
意思就是如下图:
需要注意的一点:
(不匹配任何绑定不会被任何队列接受到会被丢弃。)
主题交换机也可以做到直接绑定和扇形绑定的效果:
- 因此,主题交换机是最强大的,它包含了直接绑定和扇形绑定,也弥补了他们的缺点。
6.2 主题交换机 案例
消费者代码:
package com.itholmes.rabbitmq.seven;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** 声明主题交换价 以及相关队列*/
public class ReceivelLogsTopic02 {//交换机名称public static final String EXCHANGE_NAME = "topic_logs";//接受消息public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明主题交换机*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明队列String queueName = "Q2";channel.queueDeclare(queueName,false,false,false,null);/*** 配置RoutingKey绑定* 绑定两个*/channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("等待接受消息...");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody(),"UTF-8"));/*** 可以通过message.getEnvelope().getRoutingKey()获取到对应的RoutingKey*/System.out.println("接受队列:"+queueName+",绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,(s)->{});}}
生产者代码:
package com.itholmes.rabbitmq.seven;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;/*** 生产者代码*/
public class EmitLogTopic {//交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 切换不同的binding来测验。*/channel.basicPublish(EXCHANGE_NAME,"lazy.aa.zz",null,"message".getBytes());}
}
7. 死信的 概念
死信就是无法被消费的消息
,生产者将消息投递到了broker(中间人,就是rabbitmq服务器)或者queue里面,但是由于某些原因queue中的某些消息无法被消费。
这样无法被消费的消息,如果没有后续的处理,就变成了死信,有死信就有了死信队列。
死信队列作用很大,当消息消费发生异常时,将消息投入死信队列中,防止消息丢失
。此外还有很多场景也都用到了死信。
死信的来源:
- 消息TTL(Time To Live,存活时间)过期。
- 队列达到最大长度(队列满了,无法再添加数据到mq中)。
- 消息被拒绝(basic.reject拒绝应答或basic.nack否定应答)并且requeue=false不放回队列中。
8. 死信的 代码架构图
从图中看出,多了一个专门处理死信的交换机
和该交换机对应的死信队列
,以及处理这些死信消息的C2消费者
。
- 特别的地方就是这个死信交换机和正常队列要有绑定关系,正常队列将死信消息发给死信交换机。
9. 死信 代码案例(模拟消息TTL过期,进入死信队列)
设置存活时间可以在生产者或者正常队列配置死信相关信息设置。
按照图上设计架构代码:
C1消费者代码案例:
package com.itholmes.rabbitmq.eight;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;/*** 消费者C1*/
public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机的名称public static final String DEAD_EXCHANGE = "dead_exchange";//普通队列的名称public static final String NORMAL_QUEUE = "normal_queue";//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 声明死信交换机和普通交换机类型为direct*/channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);/*** 声明普通队列* 想要普通队列指定到死信交换机,就需要配置相关map信息。*///声明普通队列HashMap<String, Object> arguments = new HashMap<>();//过期时间 这里对应是毫秒//arguments.put("x-message-ttl",10000); 可以不用在这里设置ttl,回头再生产者设置。//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");//声明普通队列,并且将相关配置信息map,加上参数。channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);/*** 声明死信队列*/channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通的交换机与普通的队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接受消息...");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consume01接受的消息是:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,(s)->{});}
}
可以在页面查询到哪个队列配置绑定了死信交换机以及告诉死信交换机走哪个RoutingKey:
- DLX:x-dead-letter-exchange 。
- DLK:x-dead-letter-routing-key(下图写错了!!)。
C2消费者代码案例:
package com.itholmes.rabbitmq.eight;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;/*** 消费者C1*/
public class Consumer02 {//死信队列的名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接受消息...");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("Consume02(接受死信队列)接受的消息是:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(s)->{});}
}
Producer生产者代码案例:
package com.itholmes.rabbitmq.eight;import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;/*** 生产者代码*/
public class Producer {//普通交换机的名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();/*** 设置TTL时间*/AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("10000").build();//发送死信消息 ,设置TTL事件 ,模拟测试死信for (int i = 0; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}
10. 死信 代码案例(模拟队列达到最大长度,进入死信队列)
同上配置C1的map来操作就可以了:
/*** 设置队列最大长度*/
arguments.put("x-max-length",6);
测试效果:
11. 死信 代码案例(消息被拒绝 进入死信队列)
消息拒绝要在C1消费者上面进行设置:
- 开启消息手动应答,关闭自动应答。
- channel.basicReject 和 channel.basicAck参数设置。
DeliverCallback deliverCallback = (consumerTag,message)->{
String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("拒绝消息"+message);/*** channel.basicReject的两个参数:* 第一个参数:message.getEnvelope().getDeliveryTag()(该消息的标识)* 第二个参数:requeue(是否重新放回队列中)*/channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {System.out.println("Consume01接受的消息是:"+new String(message.getBody(),"UTF-8"));/*** channel.basicAck的两个参数:* 第一个参数:message.getEnvelope().getDeliveryTag()(该消息的标识)* 第二个参数:multiple是否批量响应。*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}};
/**
* 这里要关闭自动应答,开启手动应答。 参数设置为false
*/
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(s)->{});
这样消息被拒绝后,并且不会requeue发送给死信交换机,进入死信队列。