From 96c71755ed4598a2f9a5274b099b874859845df8 Mon Sep 17 00:00:00 2001 From: jiulinxiri Date: Wed, 1 Dec 2021 10:29:15 +0800 Subject: [PATCH] =?UTF-8?q?direct=20=E7=9B=B4=E6=8E=A5=E4=BA=A4=E6=8D=A2?= =?UTF-8?q?=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/jiulinxiri/rabbitmq/five/README.md | 2 ++ .../jiulinxiri/rabbitmq/six/DirectLogs.java | 25 ++++++++++++++ .../java/io/jiulinxiri/rabbitmq/six/README.md | 33 +++++++++++++++++++ .../rabbitmq/six/ReceiveLogsDirect01.java | 30 +++++++++++++++++ .../rabbitmq/six/ReceiveLogsDirect02.java | 29 ++++++++++++++++ 5 files changed, 119 insertions(+) create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/six/DirectLogs.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/six/README.md create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect01.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect02.java diff --git a/src/main/java/io/jiulinxiri/rabbitmq/five/README.md b/src/main/java/io/jiulinxiri/rabbitmq/five/README.md index 6298a25..fc3c439 100644 --- a/src/main/java/io/jiulinxiri/rabbitmq/five/README.md +++ b/src/main/java/io/jiulinxiri/rabbitmq/five/README.md @@ -50,3 +50,5 @@ binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchang Logs 和临时队列的绑定关系如下图 uDo2IqbMv3jF1CH + + diff --git a/src/main/java/io/jiulinxiri/rabbitmq/six/DirectLogs.java b/src/main/java/io/jiulinxiri/rabbitmq/six/DirectLogs.java new file mode 100644 index 0000000..f9c4c69 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/six/DirectLogs.java @@ -0,0 +1,25 @@ +package io.jiulinxiri.rabbitmq.six; + +import com.rabbitmq.client.Channel; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +/** + * 生产者 + */ +public class DirectLogs { + public static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + + Scanner scanner = new Scanner(System.in); + while (scanner.hasNext()) { + String message = scanner.next(); + channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8)); + System.out.println("生产者发送消息: " + message); + } + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/six/README.md b/src/main/java/io/jiulinxiri/rabbitmq/six/README.md new file mode 100644 index 0000000..10c07ae --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/six/README.md @@ -0,0 +1,33 @@ +--- +gitea: none +include_toc: true +--- + +### direct 直接交换机 + +#### Direct exchange 介绍 + +Direct exchange类型的工作方式是,**消息只去到它绑定的 routingKey 队列中去**。 + +5OkBSxQI7ecwEFP + +在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, +队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green. + +在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 +Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。 + +#### 多重绑定 + +jiulinxiri_20211201092706.png + +当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 +况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了 + +#### 实战 + +D6nCZhpf8wAul1P + +交换机与队列的绑定关系图 + +D6nCZhpf8wAul1P \ No newline at end of file diff --git a/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect01.java b/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect01.java new file mode 100644 index 0000000..1b123e4 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect01.java @@ -0,0 +1,30 @@ +package io.jiulinxiri.rabbitmq.six; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; + +public class ReceiveLogsDirect01 { + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + // 声明直接交换机 + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + // 声明队列 + channel.queueDeclare("console", false, false, false, null); + // 交换机与队列的绑定 + channel.queueBind("console", EXCHANGE_NAME, "info"); + channel.queueBind("console", EXCHANGE_NAME, "warning"); + + DeliverCallback deliverCallback = (consumerTag, message) -> { + System.out.println("ReceiveLogsDirect01 控制台打印接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8)); + }; + + // 消息的消费 + channel.basicConsume("console", true, deliverCallback, consumerTag -> {}); + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect02.java b/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect02.java new file mode 100644 index 0000000..332df59 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/six/ReceiveLogsDirect02.java @@ -0,0 +1,29 @@ +package io.jiulinxiri.rabbitmq.six; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; + +public class ReceiveLogsDirect02 { + private static final String EXCHANGE_NAME = "direct_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + // 声明直接交换机 + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + // 声明队列 + channel.queueDeclare("disk", false, false, false, null); + // 交换机与队列的绑定 + channel.queueBind("disk", EXCHANGE_NAME, "error"); + + DeliverCallback deliverCallback = (consumerTag, message) -> { + System.out.println("ReceiveLogsDirect02 控制台打印接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8)); + }; + + // 消息的消费 + channel.basicConsume("disk", true, deliverCallback, consumerTag -> {}); + } +}