不公平分发
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果
|
||||
要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
|
||||
```java
|
||||
//让消息风列持久化
|
||||
//让消息队列持久化
|
||||
boolean durable=true;
|
||||
channel.queueDeclare (ACK QUEUE NAME, durable, exclusive: false, autoDelete: false, arguments: null);
|
||||
```
|
||||
@@ -23,4 +23,22 @@ channel.basicPublish("", ACK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN
|
||||
注: 将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是
|
||||
这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没
|
||||
有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要
|
||||
更强有力的持久化策略。
|
||||
更强有力的持久化策略。
|
||||
|
||||
### 不公平分发
|
||||
RabbitMQ 分发消息采用的轮训分发,某种场景下这种策略并不是很好,不不同消费这的处理能力不同。
|
||||
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
|
||||
```java
|
||||
// 设置不公平分发
|
||||
int prefetchCount = 1;
|
||||
channel.basicQos(prefetchCount);
|
||||
```
|
||||

|
||||
|
||||
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个
|
||||
任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完
|
||||
成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加
|
||||
新的 worker 或者改变其他存储任务的策略。
|
||||
|
||||
### 预取值
|
||||
开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。**该值定义通道上允许的未确认消息的最大数量**。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
|
||||
|
@@ -27,6 +27,10 @@ public class Worker03 {
|
||||
};
|
||||
|
||||
System.out.println("Worker03 等待接收消息...");
|
||||
// 设置不公平分发
|
||||
int prefetchCount = 1;
|
||||
channel.basicQos(prefetchCount);
|
||||
|
||||
boolean autoAck = false;
|
||||
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
|
||||
}
|
||||
|
@@ -27,6 +27,10 @@ public class Worker04 {
|
||||
};
|
||||
|
||||
System.out.println("Worker04 等待接收消息...");
|
||||
// 设置不公平分发
|
||||
int prefetchCount = 1;
|
||||
channel.basicQos(prefetchCount);
|
||||
|
||||
boolean autoAck = false;
|
||||
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
|
||||
}
|
||||
|
Reference in New Issue
Block a user