RabbitMQ 手动应答
This commit is contained in:
23
src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java
Normal file
23
src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java
Normal file
@@ -0,0 +1,23 @@
|
||||
package io.jiulinxiri.rabbitmq.three;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Scanner;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class Task02 {
|
||||
public static final String ACK_QUEUE_NAME = "ack_queue";
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
channel.queueDeclare(ACK_QUEUE_NAME,false, false,false,null);
|
||||
Scanner scanner = new Scanner(System.in);
|
||||
while (scanner.hasNext()) {
|
||||
String message = scanner.next();
|
||||
channel.basicPublish("", ACK_QUEUE_NAME, null, message.getBytes());
|
||||
System.out.println(message + " 消息发送完成!");
|
||||
}
|
||||
}
|
||||
}
|
33
src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java
Normal file
33
src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java
Normal file
@@ -0,0 +1,33 @@
|
||||
package io.jiulinxiri.rabbitmq.three;
|
||||
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
import io.jiulinxiri.rabbitmq.utils.SleepUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class Worker03 {
|
||||
public static final String ACK_QUEUE_NAME = "ack_queue";
|
||||
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
DeliverCallback deliverCallback = (coustomerTag, message) -> {
|
||||
SleepUtils.sleep(1);
|
||||
System.out.println("Worker03 接收到的消息: " + new String(message.getBody()));
|
||||
//手动应答
|
||||
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
|
||||
};
|
||||
|
||||
CancelCallback cancelCallback = (coustomerTag) -> {
|
||||
System.out.println("Worker03 消费中断");
|
||||
};
|
||||
|
||||
System.out.println("Worker03 等待接收消息...");
|
||||
boolean autoAck = false;
|
||||
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
|
||||
}
|
||||
}
|
33
src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java
Normal file
33
src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java
Normal file
@@ -0,0 +1,33 @@
|
||||
package io.jiulinxiri.rabbitmq.three;
|
||||
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
import io.jiulinxiri.rabbitmq.utils.SleepUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class Worker04 {
|
||||
public static final String ACK_QUEUE_NAME = "ack_queue";
|
||||
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
DeliverCallback deliverCallback = (coustomerTag, message) -> {
|
||||
SleepUtils.sleep(30);
|
||||
System.out.println("Worker04 接收到的消息: " + new String(message.getBody()));
|
||||
//手动应答
|
||||
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
|
||||
};
|
||||
|
||||
CancelCallback cancelCallback = (coustomerTag) -> {
|
||||
System.out.println("Worker04 消费中断");
|
||||
};
|
||||
|
||||
System.out.println("Worker04 等待接收消息...");
|
||||
boolean autoAck = false;
|
||||
channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
|
||||
}
|
||||
}
|
11
src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java
Normal file
11
src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java
Normal file
@@ -0,0 +1,11 @@
|
||||
package io.jiulinxiri.rabbitmq.utils;
|
||||
|
||||
public class SleepUtils {
|
||||
public static void sleep(int second) {
|
||||
try {
|
||||
Thread.sleep(1000*second);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user