日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

RabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

來源: 責編: 時間:2023-11-10 17:08:11 341觀看
導讀訂閱模式類型訂閱模式示例圖:前面2個案例中,只有3個角色:P:生產者,也就是要發送消息的程序C:消費者:消息的接受者,會一直等待消息到來。queue:消息隊列,圖中紅色部分而在訂閱模型中,多了一個exchange角色,而且過程略有變化:P:生產者

HHm28資訊網——每日最新資訊28at.com

訂閱模式類型

訂閱模式示例圖:HHm28資訊網——每日最新資訊28at.com

HHm28資訊網——每日最新資訊28at.com

前面2個案例中,只有3個角色:HHm28資訊網——每日最新資訊28at.com

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:HHm28資訊網——每日最新資訊28at.com

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
  • Fanout:廣播,將消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!HHm28資訊網——每日最新資訊28at.com

Publish/Subscribe發布與訂閱模式

1、模式說明

HHm28資訊網——每日最新資訊28at.com

發布訂閱模式:HHm28資訊網——每日最新資訊28at.com

每個消費者監聽自己的隊列。HHm28資訊網——每日最新資訊28at.com

生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息HHm28資訊網——每日最新資訊28at.com

2、案例

(1)生產者

HHm28資訊網——每日最新資訊28at.com

package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_PubSub {    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建交換機        /*           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)           參數:            1. exchange:交換機名稱            2. type:交換機類型                DIRECT("direct"):定向                FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。                TOPIC("topic") 通配符的方式                HEADERS("headers") 參數匹配            3. durable:是否持久化            4. autoDelete:自動刪除            5. internal:內部使用。 一般false            6. arguments:參數        */        String exchangeName = "test_fanout";        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);        //6. 創建隊列        String queue1Name = "test_fanout_queue1";        String queue2Name = "test_fanout_queue2";        channel.queueDeclare(queue1Name, true, false, false, null);        channel.queueDeclare(queue2Name, true, false, false, null);        // 7. 綁定隊列和交換機        /*            queueBind(String queue, String exchange, String routingKey)            參數:                1. queue:隊列名稱                2. exchange:交換機名稱                3. routingKey:路由鍵,綁定規則                    如果交換機的類型為fanout ,routingKey設置為""         */        channel.queueBind(queue1Name, exchangeName, "");        channel.queueBind(queue2Name, exchangeName, "");        //8. 發送消息至交換機,由交換機分發消息        String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO....";        channel.basicPublish(exchangeName, "", null, body.getBytes());        //9. 釋放資源        channel.close();        connection.close();            }}

執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:HHm28資訊網——每日最新資訊28at.com

HHm28資訊網——每日最新資訊28at.com

HHm28資訊網——每日最新資訊28at.com

下面再來看看隊列,如下:HHm28資訊網——每日最新資訊28at.com

HHm28資訊網——每日最新資訊28at.com

HHm28資訊網——每日最新資訊28at.com

下面我們繼續來寫兩個消費者接收消息。HHm28資訊網——每日最新資訊28at.com

(2)消費者1:讀取隊列1的消息

HHm28資訊網——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub1 {    //定義接收隊列的名稱    final static String queueName = "test_fanout_queue1";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數:            1. queue:隊列名稱            2. durable:是否持久化,當mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監聽這隊列                * 當Connection關閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉            5. arguments:參數。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數:            1. queue:隊列名稱            2. autoAck:是否自動確認            3. callback:回調對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調方法,當收到消息后,會自動執行該方法                1. consumerTag:標識                2. envelope:獲取一些信息,交換機,路由key...                3. properties:配置信息                4. body:數據             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數據 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要關閉資源,因為消費者需要持續監聽隊列信息    }}

(3)消費者2:讀取隊列2的消息

HHm28資訊網——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub2 {    //定義接收隊列的名稱    final static String queueName = "test_fanout_queue2";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數:            1. queue:隊列名稱            2. durable:是否持久化,當mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監聽這隊列                * 當Connection關閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉            5. arguments:參數。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數:            1. queue:隊列名稱            2. autoAck:是否自動確認            3. callback:回調對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調方法,當收到消息后,會自動執行該方法                1. consumerTag:標識                2. envelope:獲取一些信息,交換機,路由key...                3. properties:配置信息                4. body:數據             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數據 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要關閉資源,因為消費者需要持續監聽隊列信息    }}

3、測試

啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。HHm28資訊網——每日最新資訊28at.com

  • 消費者1接收到的消息:

HHm28資訊網——每日最新資訊28at.com

  • 消費者2接收到的消息:

HHm28資訊網——每日最新資訊28at.com

從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。HHm28資訊網——每日最新資訊28at.com

4、小結

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。HHm28資訊網——每日最新資訊28at.com

發布訂閱模式與工作隊列模式的區別:HHm28資訊網——每日最新資訊28at.com

  • 工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
  • 發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
  • 發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。

本文鏈接:http://www.www897cc.com/showinfo-26-20057-0.htmlRabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Oracle數據庫調優實戰:優化SQL查詢的黃金法則!

下一篇: 學會使用Java的遠程調試工具,解決難題

標簽:
  • 熱門焦點
  • 一加Ace2 Pro官宣:普及16G內存 引領24G

    一加官方今天繼續為本月發布的新機一加Ace2 Pro帶來預熱,公布了內存方面的信息。“淘汰 8GB ,12GB 起步,16GB 普及,24GB 引領,還有呢?#一加Ace2Pro#,2023 年 8 月,敬請期待。”同時
  • 俄羅斯:將審查iPhone等外國公司設備 保數據安全

    iPhone和特斯拉都屬于在各自領域領頭羊的品牌,推出的產品也也都是數一數二的,但對于一些國家而言,它們的產品可靠性和安全性還是在限制范圍內。近日,俄羅斯聯邦通信、信息技術
  • 7月安卓手機好評榜:三星S23Ultra好評率第一

    性能榜和性價比榜之后,我們來看最后的安卓手機好評榜,數據來源安兔兔評測,收集時間2023年7月1日至7月31日,僅限國內市場。第一名:三星Galaxy S23 Ultra好評率:95.71%在即將迎來新
  • 7月安卓手機性價比榜:努比亞+紅魔兩款新機入榜

    7月登場的新機有努比亞Z50S Pro和紅魔8S Pro,除了三星之外目前唯二的兩款搭載超頻版驍龍8Gen2處理器的產品,而且努比亞和紅魔也一貫有著不錯的性價比,所以在本次的性價比榜單
  • 5月安卓手機好評榜:魅族20 Pro奪冠

    性能榜和性價比榜之后,我們來看最后的安卓手機好評榜,數據來源安兔兔評測,收集時間2023年5月1日至5月31日,僅限國內市場。第一名:魅族20 Pro好評率:97.50%不得不感慨魅族老品牌還
  • 三萬字盤點 Spring 九大核心基礎功能

    大家好,我是三友~~今天來跟大家聊一聊Spring的9大核心基礎功能。話不多說,先上目錄:圖片友情提示,本文過長,建議收藏,嘿嘿嘿!一、資源管理資源管理是Spring的一個核心的基礎功能,不
  • 大廠卷向扁平化

    來源:新熵作者丨南枝 編輯丨月見大廠職級不香了。俗話說,兵無常勢,水無常形,互聯網企業調整職級體系并不稀奇。7月13日,淘寶天貓集團啟動了近年來最大的人力制度改革,目前已形成一
  • 三星Galaxy Z Fold5官方渲染圖曝光:13.4mm折疊厚度依舊感人

    據官方此前宣布,三星將于7月26日在韓國首爾舉辦Unpacked活動,屆時將帶來帶來包括Galaxy Buds 3、Galaxy Watch 6、Galaxy Tab S9、Galaxy Z Flip 5、
  • 微軟發布Windows 11新版 引入全新任務欄狀態

    近日,微軟發布了Windows 11新版,而Build 22563更新主要引入了幾周前曝光的平板模式任務欄等,系統更流暢了。更新中,Windows 11加入了專門針對平板優化的任務欄
Top 主站蜘蛛池模板: 雷波县| 子长县| 孙吴县| 安康市| 武威市| 铁岭市| 临海市| 曲靖市| 鹤壁市| 海晏县| 舟曲县| 桐梓县| 阳原县| 鄢陵县| 同心县| 应用必备| 临清市| 出国| 松江区| 宁陵县| 成都市| 吴忠市| 兴海县| 高陵县| 和顺县| 论坛| 夏河县| 邵阳市| 南靖县| 邵武市| 和田县| 临高县| 逊克县| 吴忠市| 五常市| 云南省| 西宁市| 吉林省| 远安县| 昭通市| 吴堡县|