訂閱模式示例圖:
前面2個案例中,只有3個角色:
而在訂閱模型中,多了一個exchange角色,而且過程略有變化:
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
發布訂閱模式:
每個消費者監聽自己的隊列。
生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息
package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數 factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創建連接 Connection Connection connection = factory.newConnection(); //4. 創建Channel Channel channel = connection.createChannel(); //5. 創建交換機 /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 參數: 1. exchange:交換機名稱 2. type:交換機類型 DIRECT("direct"):定向 FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。 TOPIC("topic") 通配符的方式 HEADERS("headers") 參數匹配 3. durable:是否持久化 4. autoDelete:自動刪除 5. internal:內部使用。 一般false 6. arguments:參數 */ String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //6. 創建隊列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 7. 綁定隊列和交換機 /* queueBind(String queue, String exchange, String routingKey) 參數: 1. queue:隊列名稱 2. exchange:交換機名稱 3. routingKey:路由鍵,綁定規則 如果交換機的類型為fanout ,routingKey設置為"" */ channel.queueBind(queue1Name, exchangeName, ""); channel.queueBind(queue2Name, exchangeName, ""); //8. 發送消息至交換機,由交換機分發消息 String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO...."; channel.basicPublish(exchangeName, "", null, body.getBytes()); //9. 釋放資源 channel.close(); connection.close(); }}
執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:
下面再來看看隊列,如下:
下面我們繼續來寫兩個消費者接收消息。
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub1 { //定義接收隊列的名稱 final static String queueName = "test_fanout_queue1"; public static void main(String[] args) throws IOException, TimeoutException { //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數 factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創建連接 Connection Connection connection = factory.newConnection(); //4. 創建Channel Channel channel = connection.createChannel(); //5. 創建隊列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 參數: 1. queue:隊列名稱 2. durable:是否持久化,當mq重啟之后,還在 3. exclusive: * 是否獨占。只能有一個消費者監聽這隊列 * 當Connection關閉時,是否刪除隊列 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 5. arguments:參數。 */ channel.queueDeclare(queueName, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 參數: 1. queue:隊列名稱 2. autoAck:是否自動確認 3. callback:回調對象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回調方法,當收到消息后,會自動執行該方法 1. consumerTag:標識 2. envelope:獲取一些信息,交換機,路由key... 3. properties:配置信息 4. body:數據 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收隊列的數據 body: " + new String(body)); } }; channel.basicConsume(queueName,true,consumer); //不需要關閉資源,因為消費者需要持續監聽隊列信息 }}
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub2 { //定義接收隊列的名稱 final static String queueName = "test_fanout_queue2"; public static void main(String[] args) throws IOException, TimeoutException { //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數 factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創建連接 Connection Connection connection = factory.newConnection(); //4. 創建Channel Channel channel = connection.createChannel(); //5. 創建隊列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 參數: 1. queue:隊列名稱 2. durable:是否持久化,當mq重啟之后,還在 3. exclusive: * 是否獨占。只能有一個消費者監聽這隊列 * 當Connection關閉時,是否刪除隊列 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 5. arguments:參數。 */ channel.queueDeclare(queueName, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 參數: 1. queue:隊列名稱 2. autoAck:是否自動確認 3. callback:回調對象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回調方法,當收到消息后,會自動執行該方法 1. consumerTag:標識 2. envelope:獲取一些信息,交換機,路由key... 3. properties:配置信息 4. body:數據 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收隊列的數據 body: " + new String(body)); } }; channel.basicConsume(queueName,true,consumer); //不需要關閉資源,因為消費者需要持續監聽隊列信息 }}
啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。
從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。
交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。
發布訂閱模式與工作隊列模式的區別:
本文鏈接:http://www.www897cc.com/showinfo-26-20057-0.htmlRabbitMQ工作模式-Publish/Subscribe發布與訂閱模式
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: Oracle數據庫調優實戰:優化SQL查詢的黃金法則!
下一篇: 學會使用Java的遠程調試工具,解決難題