topic交换机
This commit is contained in:
10
.idea/runConfigurations.xml
generated
10
.idea/runConfigurations.xml
generated
@@ -1,10 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project version="4">
|
|
||||||
<component name="RunConfigurationProducerService">
|
|
||||||
<option name="ignoredProducers">
|
|
||||||
<set>
|
|
||||||
<option value="com.android.tools.idea.compose.preview.runconfiguration.ComposePreviewRunConfigurationProducer" />
|
|
||||||
</set>
|
|
||||||
</option>
|
|
||||||
</component>
|
|
||||||
</project>
|
|
36
src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java
Normal file
36
src/main/java/io/jiulinxiri/rabbitmq/seven/EmitLogTopic.java
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package io.jiulinxiri.rabbitmq.seven;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.BuiltinExchangeType;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class EmitLogTopic {
|
||||||
|
private static final String EXCHANGE_NAME = "topic_logs";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Channel channel = RabbitMqUtils.getChannel();
|
||||||
|
// 声明交换机类型Topic
|
||||||
|
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
|
||||||
|
Map<String, String> bindingKeyMap = new HashMap<>();
|
||||||
|
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2接收到");
|
||||||
|
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2接收到");
|
||||||
|
bindingKeyMap.put("quick.orange.fox","被队列 Q1接收到");
|
||||||
|
bindingKeyMap.put("lazy.brown.fox","被队列 Q2接收到");
|
||||||
|
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2接收一次");
|
||||||
|
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
|
||||||
|
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
|
||||||
|
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
|
||||||
|
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> bindingKeyEntry:
|
||||||
|
bindingKeyMap.entrySet()) {
|
||||||
|
String bindingKey = bindingKeyEntry.getKey();
|
||||||
|
String message = bindingKeyEntry.getValue();
|
||||||
|
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes());
|
||||||
|
System.out.println("生产者发出消息:" + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
23
src/main/java/io/jiulinxiri/rabbitmq/seven/README.md
Normal file
23
src/main/java/io/jiulinxiri/rabbitmq/seven/README.md
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
---
|
||||||
|
gitea: none
|
||||||
|
include_toc: true
|
||||||
|
---
|
||||||
|
|
||||||
|
### Topic Exchange(主体交换机)
|
||||||
|
topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单
|
||||||
|
词列表,以点号分隔开。这些单词可以是任意单词,比如"stock.usd.nyse", "nyse.vmw",
|
||||||
|
"quick.orange.rabbit"
|
||||||
|
|
||||||
|
通配符的使用:
|
||||||
|
* *(星号)可以代替一个单词
|
||||||
|
* \#(井号)可以替代零个或多个单词
|
||||||
|
|
||||||
|
<img alt="g1pm8PDIWKA9475" src="https://vip1.loli.io/2021/12/01/g1pm8PDIWKA9475.png" width="400"/>
|
||||||
|
|
||||||
|
Q1-->绑定的是: 中间带 orange 带 3 个单词的字符串(*.orange.*)
|
||||||
|
|
||||||
|
Q2-->绑定的是 :最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)以及第一个单词是 lazy 的多个单词(lazy.#)
|
||||||
|
|
||||||
|
**注意点**:
|
||||||
|
1. 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
|
||||||
|
2. 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
|
@@ -0,0 +1,30 @@
|
|||||||
|
package io.jiulinxiri.rabbitmq.seven;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.BuiltinExchangeType;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.DeliverCallback;
|
||||||
|
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
public class ReceiveLogsTopic01 {
|
||||||
|
private static final String EXCHANGE_NAME = "topic_logs";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Channel channel = RabbitMqUtils.getChannel();
|
||||||
|
// 声明交换机类型Topic
|
||||||
|
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
|
||||||
|
String queueName = "Q1";
|
||||||
|
channel.queueDeclare(queueName, true, false, false, null);
|
||||||
|
|
||||||
|
// 交换机与队列的绑定
|
||||||
|
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
|
||||||
|
System.out.println("等待接收消息...");
|
||||||
|
DeliverCallback deliverCallback = (consumerTag, message) -> {
|
||||||
|
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
|
||||||
|
System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
|
||||||
|
};
|
||||||
|
|
||||||
|
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,30 @@
|
|||||||
|
package io.jiulinxiri.rabbitmq.seven;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.BuiltinExchangeType;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.DeliverCallback;
|
||||||
|
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
public class ReceiveLogsTopic02 {
|
||||||
|
private static final String EXCHANGE_NAME = "topic_logs";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Channel channel = RabbitMqUtils.getChannel();
|
||||||
|
// 声明交换机类型Topic
|
||||||
|
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
|
||||||
|
String queueName = "Q2";
|
||||||
|
channel.queueDeclare(queueName, true, false, false, null);
|
||||||
|
// 交换机与队列的绑定
|
||||||
|
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
|
||||||
|
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
|
||||||
|
System.out.println("等待接收消息...");
|
||||||
|
DeliverCallback deliverCallback = (consumerTag, message) -> {
|
||||||
|
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
|
||||||
|
System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
|
||||||
|
};
|
||||||
|
|
||||||
|
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user