From 17c190033416bf6f2f665dcfd085bb6e30a4a49e Mon Sep 17 00:00:00 2001 From: jiulinxiri Date: Wed, 1 Dec 2021 16:38:32 +0800 Subject: [PATCH] =?UTF-8?q?topic=E4=BA=A4=E6=8D=A2=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/runConfigurations.xml | 10 ------ .../rabbitmq/seven/EmitLogTopic.java | 36 +++++++++++++++++++ .../io/jiulinxiri/rabbitmq/seven/README.md | 23 ++++++++++++ .../rabbitmq/seven/ReceiveLogsTopic01.java | 30 ++++++++++++++++ .../rabbitmq/seven/ReceiveLogsTopic02.java | 30 ++++++++++++++++ 5 files changed, 119 insertions(+), 10 deletions(-) delete mode 100644 .idea/runConfigurations.xml create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/seven/README.md create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic01.java create mode 100644 src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic02.java diff --git a/.idea/runConfigurations.xml b/.idea/runConfigurations.xml deleted file mode 100644 index 797acea..0000000 --- a/.idea/runConfigurations.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java b/src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java new file mode 100644 index 0000000..a509ecc --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java @@ -0,0 +1,36 @@ +package io.jiulinxiri.rabbitmq.seven; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.util.HashMap; +import java.util.Map; + +public class EmitLogTopic { + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + // 声明交换机类型Topic + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + Map bindingKeyMap = new HashMap<>(); + bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2接收到"); + bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2接收到"); + bindingKeyMap.put("quick.orange.fox","被队列 Q1接收到"); + bindingKeyMap.put("lazy.brown.fox","被队列 Q2接收到"); + bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2接收一次"); + bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); + bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); + bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); + + + for (Map.Entry bindingKeyEntry: + bindingKeyMap.entrySet()) { + String bindingKey = bindingKeyEntry.getKey(); + String message = bindingKeyEntry.getValue(); + channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes()); + System.out.println("生产者发出消息:" + message); + } + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/seven/README.md b/src/main/java/io/jiulinxiri/rabbitmq/seven/README.md new file mode 100644 index 0000000..76f60e1 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/seven/README.md @@ -0,0 +1,23 @@ +--- +gitea: none +include_toc: true +--- + +### Topic Exchange(主体交换机) +topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 +词列表,以点号分隔开。这些单词可以是任意单词,比如"stock.usd.nyse", "nyse.vmw", +"quick.orange.rabbit" + +通配符的使用: +* *(星号)可以代替一个单词 +* \#(井号)可以替代零个或多个单词 + +g1pm8PDIWKA9475 + +Q1-->绑定的是: 中间带 orange 带 3 个单词的字符串(*.orange.*) + +Q2-->绑定的是 :最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)以及第一个单词是 lazy 的多个单词(lazy.#) + +**注意点**: +1. 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了 +2. 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了 \ No newline at end of file diff --git a/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic01.java b/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic01.java new file mode 100644 index 0000000..4cb4774 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic01.java @@ -0,0 +1,30 @@ +package io.jiulinxiri.rabbitmq.seven; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; + +public class ReceiveLogsTopic01 { + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + // 声明交换机类型Topic + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + String queueName = "Q1"; + channel.queueDeclare(queueName, true, false, false, null); + + // 交换机与队列的绑定 + channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); + System.out.println("等待接收消息..."); + DeliverCallback deliverCallback = (consumerTag, message) -> { + System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); + System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey()); + }; + + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic02.java b/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic02.java new file mode 100644 index 0000000..9c9ba28 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/seven/ReceiveLogsTopic02.java @@ -0,0 +1,30 @@ +package io.jiulinxiri.rabbitmq.seven; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; + +public class ReceiveLogsTopic02 { + private static final String EXCHANGE_NAME = "topic_logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + // 声明交换机类型Topic + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + String queueName = "Q2"; + channel.queueDeclare(queueName, true, false, false, null); + // 交换机与队列的绑定 + channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); + channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); + System.out.println("等待接收消息..."); + DeliverCallback deliverCallback = (consumerTag, message) -> { + System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); + System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey()); + }; + + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); + } +}