日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

RabbitMQ工作模式-Routing路由模式

來源: 責(zé)編: 時間:2023-11-20 17:12:25 327觀看
導(dǎo)讀Routing路由模式1、模式說明路由模式特點:隊列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)。消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。Exchange不再把消息交給每一個綁

q2C28資訊網(wǎng)——每日最新資訊28at.com

Routing路由模式

1、模式說明

路由模式特點:q2C28資訊網(wǎng)——每日最新資訊28at.com

  • 隊列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)。
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息。

q2C28資訊網(wǎng)——每日最新資訊28at.com

圖解:q2C28資訊網(wǎng)——每日最新資訊28at.com

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時,會指定一個routing key。
  • X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊列
  • C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

2。案例

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊列綁定交換機(jī)的時候需要指定routing key。q2C28資訊網(wǎng)——每日最新資訊28at.com

在寫案例之前,我們首先定義一下需求:q2C28資訊網(wǎng)——每日最新資訊28at.com

  • 生產(chǎn)者:發(fā)送兩條消息,一條消息的用于插入數(shù)據(jù),另一條消息用于更新數(shù)據(jù)。
  • 消費者1:接收插入數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)插入。
  • 消費者2:接收更新數(shù)據(jù)的消息,進(jìn)行數(shù)據(jù)更新。

(1)生產(chǎn)者

q2C28資訊網(wǎng)——每日最新資訊28at.com

package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_Routing {    //交換機(jī)名稱    static final String DIRECT_EXCHAGE = "direct_exchange";    //隊列名稱    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";    //隊列名稱    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創(chuàng)建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設(shè)置參數(shù)        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost        factory.setPort(5672); //端口  默認(rèn)值 5672        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /        factory.setUsername("libai"); // 用戶名 默認(rèn) guest        factory.setPassword("libai"); //密碼 默認(rèn)值 guest        //3. 創(chuàng)建連接 Connection        Connection connection = factory.newConnection();        //4. 創(chuàng)建Channel        Channel channel = connection.createChannel();        //5. 創(chuàng)建交換機(jī)        /*           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)           參數(shù):            1. exchange:交換機(jī)名稱            2. type:交換機(jī)類型                DIRECT("direct"):定向                FANOUT("fanout"):扇形(廣播),發(fā)送消息到每一個與之綁定隊列。                TOPIC("topic") 通配符的方式                HEADERS("headers") 參數(shù)匹配            3. durable:是否持久化            4. autoDelete:自動刪除            5. internal:內(nèi)部使用。 一般false            6. arguments:參數(shù)        */        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);        // 6.聲明(創(chuàng)建)隊列        /**         * 參數(shù)1:隊列名稱         * 參數(shù)2:是否定義持久化隊列         * 參數(shù)3:是否獨占本次連接         * 參數(shù)4:是否在不使用的時候自動刪除隊列         * 參數(shù)5:隊列其它參數(shù)         */        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);        // 7. 綁定隊列和交換機(jī)        /*            queueBind(String queue, String exchange, String routingKey)            參數(shù):                1. queue:隊列名稱                2. exchange:交換機(jī)名稱                3. routingKey:路由鍵,綁定規(guī)則                    如果交換機(jī)的類型為fanout ,routingKey設(shè)置為""         */        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");        //8. 發(fā)送消息至交換機(jī),由交換機(jī)分發(fā)消息        // 發(fā)送信息        String message = "新增了商品。路由模式;routing key 為 insert " ;        /**         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage         * 參數(shù)2:路由key,簡單模式可以傳遞隊列名稱         * 參數(shù)3:消息其它屬性         * 參數(shù)4:消息內(nèi)容         */        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());        System.out.println("已發(fā)送消息:" + message);        // 發(fā)送信息        message = "修改了商品。路由模式;routing key 為 update" ;        /**         * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage         * 參數(shù)2:路由key,簡單模式可以傳遞隊列名稱         * 參數(shù)3:消息其它屬性         * 參數(shù)4:消息內(nèi)容         */        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());        System.out.println("已發(fā)送消息:" + message);        //9. 釋放資源        channel.close();        connection.close();    }}

執(zhí)行發(fā)送消息:q2C28資訊網(wǎng)——每日最新資訊28at.com

q2C28資訊網(wǎng)——每日最新資訊28at.com

發(fā)送消息之后,我們來看看聲明好的交換機(jī):q2C28資訊網(wǎng)——每日最新資訊28at.com

q2C28資訊網(wǎng)——每日最新資訊28at.com

q2C28資訊網(wǎng)——每日最新資訊28at.com

(2)消費者1:專門接收 insert 的消息

q2C28資訊網(wǎng)——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing1 {    //隊列名稱    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創(chuàng)建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設(shè)置參數(shù)        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost        factory.setPort(5672); //端口  默認(rèn)值 5672        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /        factory.setUsername("libai"); // 用戶名 默認(rèn) guest        factory.setPassword("libai"); //密碼 默認(rèn)值 guest        //3. 創(chuàng)建連接 Connection        Connection connection = factory.newConnection();        //4. 創(chuàng)建Channel        Channel channel = connection.createChannel();        //5. 創(chuàng)建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數(shù):            1. queue:隊列名稱            2. durable:是否持久化,當(dāng)mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監(jiān)聽這隊列                * 當(dāng)Connection關(guān)閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉            5. arguments:參數(shù)。         */        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數(shù):            1. queue:隊列名稱            2. autoAck:是否自動確認(rèn)            3. callback:回調(diào)對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調(diào)方法,當(dāng)收到消息后,會自動執(zhí)行該方法                1. consumerTag:標(biāo)識                2. envelope:獲取一些信息,交換機(jī),路由key...                3. properties:配置信息                4. body:數(shù)據(jù)             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數(shù)據(jù) body: " + new String(body));            }        };        channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);        //不需要關(guān)閉資源,因為消費者需要持續(xù)監(jiān)聽隊列信息    }}

(3)消費者2:專門接收 update 的消息

q2C28資訊網(wǎng)——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_Routing2 {    //隊列名稱    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創(chuàng)建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設(shè)置參數(shù)        factory.setHost("127.0.0.1"); // ip  默認(rèn)值 localhost        factory.setPort(5672); //端口  默認(rèn)值 5672        factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /        factory.setUsername("libai"); // 用戶名 默認(rèn) guest        factory.setPassword("libai"); //密碼 默認(rèn)值 guest        //3. 創(chuàng)建連接 Connection        Connection connection = factory.newConnection();        //4. 創(chuàng)建Channel        Channel channel = connection.createChannel();        //5. 創(chuàng)建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數(shù):            1. queue:隊列名稱            2. durable:是否持久化,當(dāng)mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監(jiān)聽這隊列                * 當(dāng)Connection關(guān)閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉            5. arguments:參數(shù)。         */        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數(shù):            1. queue:隊列名稱            2. autoAck:是否自動確認(rèn)            3. callback:回調(diào)對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調(diào)方法,當(dāng)收到消息后,會自動執(zhí)行該方法                1. consumerTag:標(biāo)識                2. envelope:獲取一些信息,交換機(jī),路由key...                3. properties:配置信息                4. body:數(shù)據(jù)             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數(shù)據(jù) body: " + new String(body));            }        };        channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);        //不需要關(guān)閉資源,因為消費者需要持續(xù)監(jiān)聽隊列信息    }}

3、測試

啟動所有消費者,然后使用生產(chǎn)者發(fā)送消息;在消費者對應(yīng)的控制臺可以查看到生產(chǎn)者發(fā)送對應(yīng)routing key對應(yīng)隊列的消息;到達(dá)按照需要接收的效果。q2C28資訊網(wǎng)——每日最新資訊28at.com

  • 消費者1 收到了 insert 的消息

q2C28資訊網(wǎng)——每日最新資訊28at.com

  • 消費者2 收到了 update 的消息

q2C28資訊網(wǎng)——每日最新資訊28at.com

4、小結(jié)

Routing模式要求隊列在綁定交換機(jī)時要指定routing key,消息會轉(zhuǎn)發(fā)到符合routing key的隊列。q2C28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-31551-0.htmlRabbitMQ工作模式-Routing路由模式

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 快速了解 CSS @starting-style 規(guī)則

下一篇: 通過實例理解Web應(yīng)用跨域問題

標(biāo)簽:
  • 熱門焦點
  • 一加Ace2 Pro官宣:普及16G內(nèi)存 引領(lǐng)24G

    一加官方今天繼續(xù)為本月發(fā)布的新機(jī)一加Ace2 Pro帶來預(yù)熱,公布了內(nèi)存方面的信息。“淘汰 8GB ,12GB 起步,16GB 普及,24GB 引領(lǐng),還有呢?#一加Ace2Pro#,2023 年 8 月,敬請期待。”同時
  • 俄羅斯:將審查iPhone等外國公司設(shè)備 保數(shù)據(jù)安全

    iPhone和特斯拉都屬于在各自領(lǐng)域領(lǐng)頭羊的品牌,推出的產(chǎn)品也也都是數(shù)一數(shù)二的,但對于一些國家而言,它們的產(chǎn)品可靠性和安全性還是在限制范圍內(nèi)。近日,俄羅斯聯(lián)邦通信、信息技術(shù)
  • 6月iOS設(shè)備好評榜:第一蟬聯(lián)榜首近一年

    作為安兔兔各種榜單里變化最小的那個,2023年6月的iOS好評榜和上個月相比沒有任何排名上的變化,僅僅是部分設(shè)備好評率的下降,長年累月的用戶評價和逐漸退出市場的老款機(jī)器讓這
  • 6月安卓手機(jī)好評榜:魅族20 Pro蟬聯(lián)冠軍

    性能榜和性價比榜之后,我們來看最后的安卓手機(jī)好評榜,數(shù)據(jù)來源安兔兔評測,收集時間2023年6月1日至6月30日,僅限國內(nèi)市場。第一名:魅族20 Pro好評率:95%5月份的時候魅族20 Pro就是
  • 企業(yè)采用CRM系統(tǒng)的11個好處

    客戶關(guān)系管理(CRM)軟件可以為企業(yè)提供很多的好處,從客戶保留到提高生產(chǎn)力。  CRM軟件用于企業(yè)收集客戶互動,以改善客戶體驗和滿意度。  CRM軟件市場規(guī)模如今超過580
  • 三言兩語說透柯里化和反柯里化

    JavaScript中的柯里化(Currying)和反柯里化(Uncurrying)是兩種很有用的技術(shù),可以幫助我們寫出更加優(yōu)雅、泛用的函數(shù)。本文將首先介紹柯里化和反柯里化的概念、實現(xiàn)原理和應(yīng)用
  • 共享單車的故事講到哪了?

    來源丨海克財經(jīng)與共享充電寶相差不多,共享單車已很久沒有被國內(nèi)熱點新聞關(guān)照到了。除了一再漲價和用戶直呼用不起了。近日多家媒體再發(fā)報道稱,成都、天津、鄭州等地多個共享單
  • “又被陳思誠騙了”

    作者|張思齊 出品|眾面(ID:ZhongMian_ZM)如今的國產(chǎn)懸疑電影,成了陳思誠的天下。最近大爆電影《消失的她》票房突破30億斷層奪魁暑期檔,陳思誠再度風(fēng)頭無兩。你可以說陳思誠的
  • 朋友圈可以修改可見范圍了 蘋果用戶可率先體驗

    近日,iOS用戶迎來微信8.0.27正式版更新,除了可更換二維碼背景外,還新增了多項實用功能。在新版微信中,朋友圈終于可以修改可見范圍,簡單來說就是已發(fā)布的朋友圈
Top 主站蜘蛛池模板: 海阳市| 边坝县| 离岛区| 潍坊市| 叶城县| 铅山县| 象山县| 九台市| 耒阳市| 荆州市| 湘乡市| 崇明县| 蓝田县| 周宁县| 博客| 安义县| 清徐县| 凭祥市| 抚松县| 邢台县| 永泰县| 内丘县| 方正县| 县级市| 大化| 金山区| 黄浦区| 沅江市| 石渠县| 凭祥市| 聂拉木县| 蓬安县| 饶平县| 长武县| 于都县| 巴林右旗| 德惠市| 凤庆县| 苏州市| 天全县| 进贤县|