From d0695f724f3d1d8affb8ff5dffa880ccdccdc9cb Mon Sep 17 00:00:00 2001 From: jiulinxiri Date: Mon, 29 Nov 2021 15:43:42 +0800 Subject: [PATCH] =?UTF-8?q?RabbitMQ=20=E6=89=8B=E5=8A=A8=E5=BA=94=E7=AD=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/jiulinxiri/rabbitmq/three/Task02.java | 23 +++++++++++++ .../jiulinxiri/rabbitmq/three/Worker03.java | 33 +++++++++++++++++++ .../jiulinxiri/rabbitmq/three/Worker04.java | 33 +++++++++++++++++++ .../jiulinxiri/rabbitmq/utils/SleepUtils.java | 11 +++++++ 4 files changed, 100 insertions(+) create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java b/src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java new file mode 100644 index 0000000..cd57b06 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/Task02.java @@ -0,0 +1,23 @@ +package io.jiulinxiri.rabbitmq.three; + +import com.rabbitmq.client.Channel; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.io.IOException; +import java.util.Scanner; +import java.util.concurrent.TimeoutException; + +public class Task02 { + public static final String ACK_QUEUE_NAME = "ack_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + Channel channel = RabbitMqUtils.getChannel(); + channel.queueDeclare(ACK_QUEUE_NAME,false, false,false,null); + Scanner scanner = new Scanner(System.in); + while (scanner.hasNext()) { + String message = scanner.next(); + channel.basicPublish("", ACK_QUEUE_NAME, null, message.getBytes()); + System.out.println(message + " 消息发送完成!"); + } + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java new file mode 100644 index 0000000..c3d6d96 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker03.java @@ -0,0 +1,33 @@ +package io.jiulinxiri.rabbitmq.three; + +import com.rabbitmq.client.CancelCallback; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; +import io.jiulinxiri.rabbitmq.utils.SleepUtils; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class Worker03 { + public static final String ACK_QUEUE_NAME = "ack_queue"; + + + public static void main(String[] args) throws IOException, TimeoutException { + Channel channel = RabbitMqUtils.getChannel(); + DeliverCallback deliverCallback = (coustomerTag, message) -> { + SleepUtils.sleep(1); + System.out.println("Worker03 接收到的消息: " + new String(message.getBody())); + //手动应答 + channel.basicAck(message.getEnvelope().getDeliveryTag(), false); + }; + + CancelCallback cancelCallback = (coustomerTag) -> { + System.out.println("Worker03 消费中断"); + }; + + System.out.println("Worker03 等待接收消息..."); + boolean autoAck = false; + channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java new file mode 100644 index 0000000..21c95fb --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/three/Worker04.java @@ -0,0 +1,33 @@ +package io.jiulinxiri.rabbitmq.three; + +import com.rabbitmq.client.CancelCallback; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; +import io.jiulinxiri.rabbitmq.utils.SleepUtils; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class Worker04 { + public static final String ACK_QUEUE_NAME = "ack_queue"; + + + public static void main(String[] args) throws IOException, TimeoutException { + Channel channel = RabbitMqUtils.getChannel(); + DeliverCallback deliverCallback = (coustomerTag, message) -> { + SleepUtils.sleep(30); + System.out.println("Worker04 接收到的消息: " + new String(message.getBody())); + //手动应答 + channel.basicAck(message.getEnvelope().getDeliveryTag(), false); + }; + + CancelCallback cancelCallback = (coustomerTag) -> { + System.out.println("Worker04 消费中断"); + }; + + System.out.println("Worker04 等待接收消息..."); + boolean autoAck = false; + channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java b/src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java new file mode 100644 index 0000000..2509c8e --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/utils/SleepUtils.java @@ -0,0 +1,11 @@ +package io.jiulinxiri.rabbitmq.utils; + +public class SleepUtils { + public static void sleep(int second) { + try { + Thread.sleep(1000*second); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +}