diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/README.md b/src/main/java/io/jiulinxiri/rabbitmq/three/README.md index a69b6bd..dd1d0b6 100644 --- a/src/main/java/io/jiulinxiri/rabbitmq/three/README.md +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/README.md @@ -2,7 +2,7 @@ 之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化 ```java -//让消息风列持久化 +//让消息队列持久化 boolean durable=true; channel.queueDeclare (ACK QUEUE NAME, durable, exclusive: false, autoDelete: false, arguments: null); ``` @@ -23,4 +23,22 @@ channel.basicPublish("", ACK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN 注: 将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要 -更强有力的持久化策略。 \ No newline at end of file +更强有力的持久化策略。 + +### 不公平分发 +RabbitMQ 分发消息采用的轮训分发,某种场景下这种策略并不是很好,不不同消费这的处理能力不同。 +为了避免这种情况,我们可以设置参数 channel.basicQos(1); +```java +// 设置不公平分发 +int prefetchCount = 1; +channel.basicQos(prefetchCount); +``` +![8cqDZhg4rKFAUj2](https://vip2.loli.io/2021/11/30/8cqDZhg4rKFAUj2.png) + +意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个 +任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 +成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加 +新的 worker 或者改变其他存储任务的策略。 + +### 预取值 +开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。**该值定义通道上允许的未确认消息的最大数量**。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。 diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java index c3d6d96..86beb67 100644 --- a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java @@ -27,6 +27,10 @@ public class Worker03 { }; System.out.println("Worker03 等待接收消息..."); + // 设置不公平分发 + int prefetchCount = 1; + channel.basicQos(prefetchCount); + boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java index 21c95fb..e81b27f 100644 --- a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java @@ -27,6 +27,10 @@ public class Worker04 { }; System.out.println("Worker04 等待接收消息..."); + // 设置不公平分发 + int prefetchCount = 1; + channel.basicQos(prefetchCount); + boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }