路由模式特點:
圖解:
在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機的類型為:Direct,還有隊列綁定交換機的時候需要指定routing key。
在寫案例之前,我們首先定義一下需求:
package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_Routing { //交換機名稱 static final String DIRECT_EXCHAGE = "direct_exchange"; //隊列名稱 static final String DIRECT_QUEUE_INSERT = "direct_queue_insert"; //隊列名稱 static final String DIRECT_QUEUE_UPDATE = "direct_queue_update"; public static void main(String[] args) throws IOException, TimeoutException { //1.創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數(shù) factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創(chuàng)建連接 Connection Connection connection = factory.newConnection(); //4. 創(chuàng)建Channel Channel channel = connection.createChannel(); //5. 創(chuàng)建交換機 /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 參數(shù): 1. exchange:交換機名稱 2. type:交換機類型 DIRECT("direct"):定向 FANOUT("fanout"):扇形(廣播),發(fā)送消息到每一個與之綁定隊列。 TOPIC("topic") 通配符的方式 HEADERS("headers") 參數(shù)匹配 3. durable:是否持久化 4. autoDelete:自動刪除 5. internal:內(nèi)部使用。 一般false 6. arguments:參數(shù) */ channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null); // 6.聲明(創(chuàng)建)隊列 /** * 參數(shù)1:隊列名稱 * 參數(shù)2:是否定義持久化隊列 * 參數(shù)3:是否獨占本次連接 * 參數(shù)4:是否在不使用的時候自動刪除隊列 * 參數(shù)5:隊列其它參數(shù) */ channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null); channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null); // 7. 綁定隊列和交換機 /* queueBind(String queue, String exchange, String routingKey) 參數(shù): 1. queue:隊列名稱 2. exchange:交換機名稱 3. routingKey:路由鍵,綁定規(guī)則 如果交換機的類型為fanout ,routingKey設置為"" */ channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert"); channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update"); //8. 發(fā)送消息至交換機,由交換機分發(fā)消息 // 發(fā)送信息 String message = "新增了商品。路由模式;routing key 為 insert " ; /** * 參數(shù)1:交換機名稱,如果沒有指定則使用默認Default Exchage * 參數(shù)2:路由key,簡單模式可以傳遞隊列名稱 * 參數(shù)3:消息其它屬性 * 參數(shù)4:消息內(nèi)容 */ channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes()); System.out.println("已發(fā)送消息:" + message); // 發(fā)送信息 message = "修改了商品。路由模式;routing key 為 update" ; /** * 參數(shù)1:交換機名稱,如果沒有指定則使用默認Default Exchage * 參數(shù)2:路由key,簡單模式可以傳遞隊列名稱 * 參數(shù)3:消息其它屬性 * 參數(shù)4:消息內(nèi)容 */ channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes()); System.out.println("已發(fā)送消息:" + message); //9. 釋放資源 channel.close(); connection.close(); }}
執(zhí)行發(fā)送消息:
發(fā)送消息之后,我們來看看聲明好的交換機:
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing1 { //隊列名稱 static final String DIRECT_QUEUE_INSERT = "direct_queue_insert"; public static void main(String[] args) throws IOException, TimeoutException { //1.創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數(shù) factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創(chuàng)建連接 Connection Connection connection = factory.newConnection(); //4. 創(chuàng)建Channel Channel channel = connection.createChannel(); //5. 創(chuàng)建隊列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 參數(shù): 1. queue:隊列名稱 2. durable:是否持久化,當mq重啟之后,還在 3. exclusive: * 是否獨占。只能有一個消費者監(jiān)聽這隊列 * 當Connection關閉時,是否刪除隊列 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 5. arguments:參數(shù)。 */ channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 參數(shù): 1. queue:隊列名稱 2. autoAck:是否自動確認 3. callback:回調(diào)對象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回調(diào)方法,當收到消息后,會自動執(zhí)行該方法 1. consumerTag:標識 2. envelope:獲取一些信息,交換機,路由key... 3. properties:配置信息 4. body:數(shù)據(jù) */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收隊列的數(shù)據(jù) body: " + new String(body)); } }; channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer); //不需要關閉資源,因為消費者需要持續(xù)監(jiān)聽隊列信息 }}
package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing2 { //隊列名稱 static final String DIRECT_QUEUE_UPDATE = "direct_queue_update"; public static void main(String[] args) throws IOException, TimeoutException { //1.創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2. 設置參數(shù) factory.setHost("127.0.0.1"); // ip 默認值 localhost factory.setPort(5672); //端口 默認值 5672 factory.setVirtualHost("/test"); //虛擬機 默認值 / factory.setUsername("libai"); // 用戶名 默認 guest factory.setPassword("libai"); //密碼 默認值 guest //3. 創(chuàng)建連接 Connection Connection connection = factory.newConnection(); //4. 創(chuàng)建Channel Channel channel = connection.createChannel(); //5. 創(chuàng)建隊列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 參數(shù): 1. queue:隊列名稱 2. durable:是否持久化,當mq重啟之后,還在 3. exclusive: * 是否獨占。只能有一個消費者監(jiān)聽這隊列 * 當Connection關閉時,是否刪除隊列 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 5. arguments:參數(shù)。 */ channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 參數(shù): 1. queue:隊列名稱 2. autoAck:是否自動確認 3. callback:回調(diào)對象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回調(diào)方法,當收到消息后,會自動執(zhí)行該方法 1. consumerTag:標識 2. envelope:獲取一些信息,交換機,路由key... 3. properties:配置信息 4. body:數(shù)據(jù) */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收隊列的數(shù)據(jù) body: " + new String(body)); } }; channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer); //不需要關閉資源,因為消費者需要持續(xù)監(jiān)聽隊列信息 }}
啟動所有消費者,然后使用生產(chǎn)者發(fā)送消息;在消費者對應的控制臺可以查看到生產(chǎn)者發(fā)送對應routing key對應隊列的消息;到達按照需要接收的效果。
Routing模式要求隊列在綁定交換機時要指定routing key,消息會轉(zhuǎn)發(fā)到符合routing key的隊列。
本文鏈接:http://www.www897cc.com/showinfo-26-31551-0.htmlRabbitMQ工作模式-Routing路由模式
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 快速了解 CSS @starting-style 規(guī)則
下一篇: 通過實例理解Web應用跨域問題