diff --git a/src/main/java/io/jiulinxiri/rabbitmq/five/EmitLog.java b/src/main/java/io/jiulinxiri/rabbitmq/five/EmitLog.java new file mode 100644 index 0000000..5b7e273 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/five/EmitLog.java @@ -0,0 +1,27 @@ +package io.jiulinxiri.rabbitmq.five; + +import com.rabbitmq.client.Channel; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +/** + * 生产者 + */ +public class EmitLog { + public static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + + // 声明交换机 +// channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + Scanner scanner = new Scanner(System.in); + while (scanner.hasNext()) { + String message = scanner.next(); + channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); + System.out.println("生产者发送消息: " + message); + } + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/five/README.md b/src/main/java/io/jiulinxiri/rabbitmq/five/README.md new file mode 100644 index 0000000..33d2d8a --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/five/README.md @@ -0,0 +1,49 @@ +--- +gitea: none +include_toc: true +--- + +## 交换机Exchanges +### Exchange 概念 +RabbitMQ 消息传递模型的核心思想是: **生产者生产的消息从不会直接发送到队列**。实际上,通常生产 +者甚至都不知道这些消息传递传递到了哪些队列中。 + +相反,**生产者只能将消息发送到交换机(exchange)**,交换机工作的内容非常简单,一方面它接收来 +自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 +息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。 + +Exchange + +### Exchange 的类型 +交换机有以下的四种类型 +* 直接(direct) +* 主题(topic) +* 标题(headers) +* 扇出(fanout) [发布/订阅] + +#### 无名 exchange +```java +channel.basicPublish("", queueName, null, message.getBytes()); +``` +exchange的名称为空字符串表示默认或无名称交换机 + +### 临时队列 +断开消费者连接,能自动删除的队列。临时队列没有进行持久化的队列。 +754ueDFzNOZgyA3 + +```java +// 创建临时队列的方式 +String queueName = channel.queueDeclare().getQueue(); +``` + +### 绑定(bindings) +binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 +列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定 + +qKJkT5ec4pYzDmQ + +### Fanout +发布订阅模式,将接收到的所有消息广播到它知道的所有队列中 + +#### Fanout 实战 +Vt8fjZL4QgeR69S \ No newline at end of file diff --git a/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs01.java b/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs01.java new file mode 100644 index 0000000..4358856 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs01.java @@ -0,0 +1,33 @@ +package io.jiulinxiri.rabbitmq.five; + +import com.rabbitmq.client.CancelCallback; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +public class ReceiveLogs01 { + public static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + + // 声明交换机 + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + + // 声明临时队列 + String queueName = channel.queueDeclare().getQueue(); + + // 绑定交换机与队列 + channel.queueBind(queueName, EXCHANGE_NAME, ""); + System.out.println("等待接收消息,接收到的消息打印到屏幕上..."); + + DeliverCallback ackCallback = (consumerTag, message) -> { + System.out.println("ReceiveLogs01 控制太打印接收到的消息:" + new String(message.getBody(), "utf-8")); + }; + + CancelCallback cancelCallback = consumerTag -> {}; + + // 消费消息 + channel.basicConsume(queueName, true, ackCallback, cancelCallback); + } +} diff --git a/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs02.java b/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs02.java new file mode 100644 index 0000000..d6594c2 --- /dev/null +++ b/src/main/java/io/jiulinxiri/rabbitmq/five/ReceiveLogs02.java @@ -0,0 +1,33 @@ +package io.jiulinxiri.rabbitmq.five; + +import com.rabbitmq.client.CancelCallback; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import io.jiulinxiri.rabbitmq.utils.RabbitMqUtils; + +public class ReceiveLogs02 { + public static final String EXCHANGE_NAME = "logs"; + + public static void main(String[] args) throws Exception { + Channel channel = RabbitMqUtils.getChannel(); + + // 声明交换机 + channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); + + // 声明临时队列 + String queueName = channel.queueDeclare().getQueue(); + + // 绑定交换机与队列 + channel.queueBind(queueName, EXCHANGE_NAME, ""); + System.out.println("等待接收消息,接收到的消息打印到屏幕上..."); + + DeliverCallback ackCallback = (consumerTag, message) -> { + System.out.println("ReceiveLogs02 控制太打印接收到的消息:" + new String(message.getBody(), "utf-8")); + }; + + CancelCallback cancelCallback = consumerTag -> {}; + + // 消费消息 + channel.basicConsume(queueName, true, ackCallback, cancelCallback); + } +}