百度权重1关键词优化推广排名
问题场景:
当我们使用work queues队列时,默认是轮询消费,此时如果消费者的处理速度不一致(消费者A慢,消费者B快),则会发生消费者B消费完消息后,一直等待着消费者A消费结束,这样比较浪费资源,所以我们需要根据消费者处理速度不同来分配队列中的消息
解决思路:
1.设置每次消费者获取的消息条数为1。
2.设置消费者消息手动确认
分析思路:
1.第一点其实很好理解,如果生产者生产了10条消息,如果不设置消费者每次消费数为1,则在消费队列中的消息前,就会进行平均的消息分配,结果为:
消费者A消费的消息条数为:5条
消费者B消费的消息条数为:5条
这样根本就不会存在根据性能而分配队列中消息的说法了,因为他们已经在消费前做好分配了,所以我们要控制消费者每次消费消息的数量
2.第二点我们就要引入一个概念,消费者消息确认机制。消息确认机制是确认消息有没有成功的被消费,如果消息被确认就认为消息成功被消费,则该消息在消息队列中会被删除,意味着本条消费结束。我们需要关闭自动消息确认,进行手动消息确认。如果不关闭自动消息确认会造成什么情况呢?
如果生产者生产了10条消息:1,2,3...10,是自动消息确认而不是手动消息确认,结果为:
消费者A消费的消息为:1,3,5,7,9
消费者B消费的消息为:2,4,6,8,10
还是不能满足根据消费者处理速度不一致进行消息分配的结果。
为什么会得出以上的结果呢?我们举个例子来说:
一个篮子里有10个苹果,苹果上都写有数字1,2,3...10,有两个孩子(孩子A,孩子B)排队轮流拿苹果,这时候我们如果是自动消息确认且设置每次消费条数为1,则拿苹果的规则为:两个孩子排队拿苹果,每次只能拿一个,拿完一次就需要重新排队,这种规则我们叫做轮询,拿完的结果为:
孩子A手中的苹果为:苹果1,苹果3,苹果5,苹果7,苹果9
孩子B手中的苹果为:苹果2,苹果4,苹果6,苹果8,苹果10
如果我们的条件是手动确认消息且设置每次消费条数为1,则拿苹果的规则为:两个孩子排队拿苹果,每次只能拿一个,孩子吃完了在重新排队,这样如果两个孩子吃苹果的速度不同,拿到苹果的数量就会不一样。假如孩子A吃的慢,孩子B吃得快,可能的结果为(A拿的少,B拿的多):
所以,消息手动确认对比消息自动确认的区别就是:消息手动确认一定要对消息做出应答,否则rabbit认为当前消息没有消费完成,消费者将不能继续消费消息,直到前一个消息确认之后才能恢复消费。这样就可以实现根据消费者不同的消费速度来分配队列中的消息。
代码实现:
1.获取连接工具类
package utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {private static ConnectionFactory connectionFactory;static {connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.xxx.xxx");//服务器的ipconnectionFactory.setPort(5672);//访问端口号connectionFactory.setVirtualHost("/lln");//虚拟主机名称connectionFactory.setUsername("lln");//账号connectionFactory.setPassword("123");//密码}//提供连接对象的方法public static Connection getConnection(){try{return connectionFactory.newConnection();}catch (Exception e){e.printStackTrace();}return null;}//关闭通道和关闭连接工具的方法public static void closeConnectionAndChanel(Channel channel,Connection connection){try {if (channel!=null){channel.close();}if (connection!=null){connection.close();}}catch (Exception e){e.printStackTrace();}}
}
2.生产者Provider
package workqueue;import com.rabbitmq.client.*;
import utils.RabbitMqUtils;import java.io.IOException;public class ConsumerA {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("work",true,false,false,null);//第二个参数:取消自动消息确认channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try{Thread.sleep(2000);}catch (Exception e){e.printStackTrace();}System.out.println("孩子-A:"+new String(body));//消息手动确认//参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
package workqueue;import com.rabbitmq.client.*;
import utils.RabbitMqUtils;import java.io.IOException;public class ConsumerB {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("work",true,false,false,null);//channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("孩子-B:"+new String(body));//消息手动确认channel.basicAck(envelope.getDeliveryTag(),false);}});}
}