交换机 Exchange
This commit is contained in:
27
src/main/java/io/jiulinxiri/rabbitmq/five/EmitLog.java
Normal file
27
src/main/java/io/jiulinxiri/rabbitmq/five/EmitLog.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package io.jiulinxiri.rabbitmq.five;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Scanner;
|
||||
|
||||
/**
|
||||
* 生产者
|
||||
*/
|
||||
public class EmitLog {
|
||||
public static final String EXCHANGE_NAME = "logs";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
|
||||
// 声明交换机
|
||||
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
Scanner scanner = new Scanner(System.in);
|
||||
while (scanner.hasNext()) {
|
||||
String message = scanner.next();
|
||||
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
|
||||
System.out.println("生产者发送消息: " + message);
|
||||
}
|
||||
}
|
||||
}
|
||||
49
src/main/java/io/jiulinxiri/rabbitmq/five/README.md
Normal file
49
src/main/java/io/jiulinxiri/rabbitmq/five/README.md
Normal file
@@ -0,0 +1,49 @@
|
||||
---
|
||||
gitea: none
|
||||
include_toc: true
|
||||
---
|
||||
|
||||
## 交换机Exchanges
|
||||
### Exchange 概念
|
||||
RabbitMQ 消息传递模型的核心思想是: **生产者生产的消息从不会直接发送到队列**。实际上,通常生产
|
||||
者甚至都不知道这些消息传递传递到了哪些队列中。
|
||||
|
||||
相反,**生产者只能将消息发送到交换机(exchange)**,交换机工作的内容非常简单,一方面它接收来
|
||||
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
|
||||
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
|
||||
|
||||
<img alt="Exchange" src="https://vip1.loli.io/2021/11/30/8P1NCc4gqWbBSDy.png" title="Exchange" width="300px"/>
|
||||
|
||||
### Exchange 的类型
|
||||
交换机有以下的四种类型
|
||||
* 直接(direct)
|
||||
* 主题(topic)
|
||||
* 标题(headers)
|
||||
* 扇出(fanout) [发布/订阅]
|
||||
|
||||
#### 无名 exchange
|
||||
```java
|
||||
channel.basicPublish("", queueName, null, message.getBytes());
|
||||
```
|
||||
exchange的名称为空字符串表示默认或无名称交换机
|
||||
|
||||
### 临时队列
|
||||
断开消费者连接,能自动删除的队列。临时队列没有进行持久化的队列。
|
||||
<img alt="754ueDFzNOZgyA3" src="https://vip2.loli.io/2021/11/30/754ueDFzNOZgyA3.png" width="400"/>
|
||||
|
||||
```java
|
||||
// 创建临时队列的方式
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
```
|
||||
|
||||
### 绑定(bindings)
|
||||
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队
|
||||
列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
|
||||
|
||||
<img alt="qKJkT5ec4pYzDmQ" src="https://vip1.loli.io/2021/11/30/qKJkT5ec4pYzDmQ.png" width="300px"/>
|
||||
|
||||
### Fanout
|
||||
发布订阅模式,将接收到的所有消息广播到它知道的所有队列中
|
||||
|
||||
#### Fanout 实战
|
||||
<img alt="Vt8fjZL4QgeR69S" src="https://vip1.loli.io/2021/11/30/Vt8fjZL4QgeR69S.png" width="400"/>
|
||||
33
src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs01.java
Normal file
33
src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs01.java
Normal file
@@ -0,0 +1,33 @@
|
||||
package io.jiulinxiri.rabbitmq.five;
|
||||
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
|
||||
public class ReceiveLogs01 {
|
||||
public static final String EXCHANGE_NAME = "logs";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
|
||||
// 声明交换机
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
|
||||
// 声明临时队列
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
|
||||
// 绑定交换机与队列
|
||||
channel.queueBind(queueName, EXCHANGE_NAME, "");
|
||||
System.out.println("等待接收消息,接收到的消息打印到屏幕上...");
|
||||
|
||||
DeliverCallback ackCallback = (consumerTag, message) -> {
|
||||
System.out.println("ReceiveLogs01 控制太打印接收到的消息:" + new String(message.getBody(), "utf-8"));
|
||||
};
|
||||
|
||||
CancelCallback cancelCallback = consumerTag -> {};
|
||||
|
||||
// 消费消息
|
||||
channel.basicConsume(queueName, true, ackCallback, cancelCallback);
|
||||
}
|
||||
}
|
||||
33
src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs02.java
Normal file
33
src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs02.java
Normal file
@@ -0,0 +1,33 @@
|
||||
package io.jiulinxiri.rabbitmq.five;
|
||||
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils;
|
||||
|
||||
public class ReceiveLogs02 {
|
||||
public static final String EXCHANGE_NAME = "logs";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Channel channel = RabbitMqUtils.getChannel();
|
||||
|
||||
// 声明交换机
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
|
||||
// 声明临时队列
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
|
||||
// 绑定交换机与队列
|
||||
channel.queueBind(queueName, EXCHANGE_NAME, "");
|
||||
System.out.println("等待接收消息,接收到的消息打印到屏幕上...");
|
||||
|
||||
DeliverCallback ackCallback = (consumerTag, message) -> {
|
||||
System.out.println("ReceiveLogs02 控制太打印接收到的消息:" + new String(message.getBody(), "utf-8"));
|
||||
};
|
||||
|
||||
CancelCallback cancelCallback = consumerTag -> {};
|
||||
|
||||
// 消费消息
|
||||
channel.basicConsume(queueName, true, ackCallback, cancelCallback);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user