日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

Springboot實現Rabbitmq死信隊列以及延遲隊列的優化

來源: 責編: 時間:2023-10-10 18:30:35 295觀看
導讀導入依賴:后續延遲隊列優化用Springboot整合,先理解死信隊列<!--RabbitMQ依賴--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <ver

導入依賴:

后續延遲隊列優化用Springboot整合,先理解死信隊列75D28資訊網——每日最新資訊28at.com

<!--RabbitMQ依賴-->        <dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>5.12.0</version>        </dependency>

死信隊列

由于特定原因導致隊列中的消息不能被消費,這樣的消息如果沒有后續處理就可以放入死信隊列中,例如一個訂單如果超時未被支付從而自動失效,就將這個訂單放到死信隊列中。(死信隊列中的消息是可以被消費的)75D28資訊網——每日最新資訊28at.com

75D28資訊網——每日最新資訊28at.com

死信隊列產生的原因

消息TTL過期

就是在規定的時間內消息沒有被消費,(和延遲隊列不同,延遲隊列時表示到達時間消息才可以被消費)75D28資訊網——每日最新資訊28at.com

在生產者代碼中設置消息過期時間:75D28資訊網——每日最新資訊28at.com

//生產者發送消息,將消息設置為TTL消息        AMQP.BasicProperties properties =                new AMQP.BasicProperties().builder().expiration("10000").build();

修改隊列參數argument的特殊屬性:75D28資訊網——每日最新資訊28at.com

arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkeyarguments.put("x-message-TTL", 10000);//設置過期時間(單位毫秒)  //將死信交換機與死信隊列綁定

模擬代碼:

消費者175D28資訊網——每日最新資訊28at.com

public class Consumer01 {    public static final String EXCHANGE_DIRECT = "exchange_direct";//普通交換機的名稱    public static final String EXCHANGE_DIRECT_DEAD = "exchange_direct_dead";//死信交換機的名稱    public static final String QUEUE_PLAIN = "queue_plain";//普通隊列的名稱    public static final String QUEUE_PLAIN_DEAD = "queue_plain_dead";//死信隊列的名稱    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Channel channel = RabbitMqUtils.createChannel();        //聲明死信交換機和普通交換機        channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(EXCHANGE_DIRECT_DEAD, BuiltinExchangeType.DIRECT);        //聲明普通隊列(綁定普通隊列與死信交換機的關系,在通過rotingkey綁定死信隊列        Map<String, Object> arguments = new HashMap<>();        arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機        arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey        //設置過期時間(單位毫秒)        arguments.put("x-message-TTL", 10000);        channel.queueDeclare(QUEUE_PLAIN, false, false, false, arguments);        //聲明死信隊列        channel.queueDeclare(QUEUE_PLAIN_DEAD, false, false, false, null);        //普通交換機和隊列的綁定        channel.queueBind(QUEUE_PLAIN, EXCHANGE_DIRECT, "routingkey_direct");        //死信交換機和死信隊列的綁定        channel.queueBind(QUEUE_PLAIN_DEAD, EXCHANGE_DIRECT_DEAD, "routingkey_direct-dead");        //模擬超時時間消息未被消費        Thread.sleep(1000000);        channel.basicConsume(QUEUE_PLAIN, true, (consumerTag, message) -> {            System.out.println("Consumer01.main接受到消息:" + new String(message.getBody()));        }, (consumerTag, sig) -> {        });    }}

生產者75D28資訊網——每日最新資訊28at.com

public class Produce {    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtils.createChannel();        //生產者發送消息,將消息設置為TTL消息        AMQP.BasicProperties properties =                new AMQP.BasicProperties().builder().expiration("10000").build();        for (int i = 0; i < 10; i++) {            String message = i + "";            channel.basicPublish(Consumer01.EXCHANGE_DIRECT,"routingkey_direct",properties,message.getBytes(StandardCharsets.UTF_8));        }    }}

消費者275D28資訊網——每日最新資訊28at.com

public class Consumer2 {    public static void main(String[] args) throws IOException, TimeoutException {        Channel channel = RabbitMqUtils.createChannel();        channel.basicConsume(Consumer01.QUEUE_PLAIN_DEAD, true, (consumerTag, message) -> {            System.out.println("Consumer2.main接受死信隊列的消息:" + new String(message.getBody()));        }, (consumerTag, sig) -> {        });    }}/**輸出結果:Consumer2.main接受死信隊列的消息:0Consumer2.main接受死信隊列的消息:1Consumer2.main接受死信隊列的消息:2Consumer2.main接受死信隊列的消息:3Consumer2.main接受死信隊列的消息:4Consumer2.main接受死信隊列的消息:5Consumer2.main接受死信隊列的消息:6Consumer2.main接受死信隊列的消息:7Consumer2.main接受死信隊列的消息:8Consumer2.main接受死信隊列的消息:9    */

隊列達到了最大長度

將RabbiMQ的隊列的argument屬性的鍵設置為 x-max-length 表示隊列可以容納的最大條數75D28資訊網——每日最新資訊28at.com

消息被拒絕

將自動應答設為false75D28資訊網——每日最新資訊28at.com

在消費者調一個Channel.basicReject,設置參數requeue為false,表示不重新排隊,將消息丟到死信隊列75D28資訊網——每日最新資訊28at.com

延遲隊列優化

延遲隊列就是講一個消息延遲發送,例如消息在隊列中10s后才能被取出,可以通過RabbitMQ的插件或者死信隊列來實現75D28資訊網——每日最新資訊28at.com

用死信隊列實現延遲隊列的思路:75D28資訊網——每日最新資訊28at.com

在于死信隊列綁定的普通隊列不設置消費者,利用TTL延遲消息,當TTL時間過期后,到達死信隊列被消費這樣就形成一個延遲隊列。75D28資訊網——每日最新資訊28at.com

延遲隊列的使用場景:①典型的就是流量削峰,對于不重要的消息,可以延遲消費,有助于減輕數據庫的壓力,強化分布式系統的高可用和并發性能。②還可以實現一個消息提醒,例如用戶三天未登錄發送一個消息提醒。75D28資訊網——每日最新資訊28at.com

75D28資訊網——每日最新資訊28at.com

在實際生產中可能存在很多不同的延遲時間要求,不可能每一個延遲要求就創造一個隊列,我們可以用生產者實現延遲信息,而隊列不設置TTL就可以根據生產的延遲消息進行延遲發送。75D28資訊網——每日最新資訊28at.com

但是此方法雖然實現了一個隊列就可以轉發不同延時時間的消息,但是有缺陷,隊列中的消息是排隊發送的,也就是說如果我第一條消息發送20s延時,接著第二條消息發送2s延時。最后卻是20s消息先消費,而2s消息后消費,因為RabbitMQ在檢測一條消息時發生了20s的阻塞。如下:75D28資訊網——每日最新資訊28at.com

###GET http://localhost:8080/ttl/sendExpirationMessage/aaaaa/20000###GET http://localhost:8080/ttl/sendExpirationMessage/bbbbb/2000最后輸出結果是先消費aaaa后消費bbbb

可以通過RabbitMQ的插件實現延時隊列,此方法沒有這缺陷75D28資訊網——每日最新資訊28at.com

從官網上下載對應版本的延遲插件,下載后如圖:交換機類型會多出一個 x-delayed-message75D28資訊網——每日最新資訊28at.com

75D28資訊網——每日最新資訊28at.com


75D28資訊網——每日最新資訊28at.com

在我們自定義的交換機中,這是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,當達到投遞時間時,才會投遞到目標隊列中。75D28資訊網——每日最新資訊28at.com

代碼實例:75D28資訊網——每日最新資訊28at.com

配置類:75D28資訊網——每日最新資訊28at.com

@Configurationpublic class RabbitDelayedConfig {    //延遲交換機    public static final String DELAYED_EXCHANGE = "delayed.exchange";    //延遲隊列b    public static final String DELAYED_QUEUE = "delayed.queue";    //延遲交換機和隊列的routingkey    public static final String DELAYED_ROTINGKEY = "delayed.routingkey";    //public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {    //		super(name, durable, autoDelete, arguments);    //		this.type = type;    //	}    @Bean    public CustomExchange delayedExchange() {        Map<String, Object> arguments = new HashMap<>();        //定義延遲消息類型由那種交換機規則處置        arguments.put("x-delayed-type", "direct");        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);    }    @Bean    public Queue delayedQueue() {        return QueueBuilder                .nonDurable(DELAYED_QUEUE)                .build();    }    @Bean    public Binding delayedBinding() {        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROTINGKEY).noargs();    }}

生產者:75D28資訊網——每日最新資訊28at.com

/*延遲交換機發送消息*/    @GetMapping("/sendDelayedMessage/{message}/{delayedTTL}")    public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer delayedTTL) {        log.info("當前時間:{},發送一條延遲時間為{}的延遲消息給延遲隊列:{}", new Date().toString(), delayedTTL, message);        rabbitTemplate.convertAndSend(RabbitDelayedConfig.DELAYED_EXCHANGE,                RabbitDelayedConfig.DELAYED_ROTINGKEY,                message,                msg -> {                    msg.getMessageProperties().setDelay(delayedTTL);//設置消息的延遲消息時間                    return msg;                });    }

消費者:75D28資訊網——每日最新資訊28at.com

@Slf4j@Componentpublic class DelayedQueueConsumer {    @RabbitListener(queues = RabbitDelayedConfig.DELAYED_QUEUE)    public void queue(Message message) {        log.info("接受到延遲隊列的消息,當前時間:{},消息:{}",new Date().toString(),new String(message.getBody()));    }}

本文鏈接:http://www.www897cc.com/showinfo-26-12687-0.htmlSpringboot實現Rabbitmq死信隊列以及延遲隊列的優化

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 福利來啦,一鍵部署:輕松掌握Docker及Docker-Compose的安裝方法

下一篇: 自己動手用Python實現一個保衛果實小游戲【完整版】

標簽:
  • 熱門焦點
  • Mate60手機殼曝光 致敬自己的經典設計

    8月3日消息,今天下午博主數碼閑聊站帶來了華為Mate60的第三方手機殼圖,可以讓我們在真機發布之前看看這款華為全新旗艦的大致輪廓。從曝光的圖片看,Mate 60背后攝像頭面積依然
  • 小米官宣:2023年上半年出貨量中國第一!

    今日早間,小米電視官方微博帶來消息,稱2023年小米電視上半年出貨量達到了中國第一,同時還表示小米電視的巨屏風暴即將開始。“公布一個好消息2023年#小米電視上半年出貨量中國
  • 7月安卓手機好評榜:三星S23Ultra好評率第一

    性能榜和性價比榜之后,我們來看最后的安卓手機好評榜,數據來源安兔兔評測,收集時間2023年7月1日至7月31日,僅限國內市場。第一名:三星Galaxy S23 Ultra好評率:95.71%在即將迎來新
  • 得物效率前端微應用推進過程與思考

    一、背景效率工程隨著業務的發展,組織規模的擴大,越來越多的企業開始意識到協作效率對于企業團隊的重要性,甚至是決定其在某個行業競爭中突圍的關鍵,是企業長久生存的根本。得物
  • 這款新興工具平臺,讓你的電腦效率翻倍

    隨著信息技術的發展,我們獲取信息的渠道越來越多,但是處理信息的效率卻成為一個瓶頸。于是各種工具應運而生,都在爭相解決我們的工作效率問題。今天我要給大家介紹一款效率
  • 東方甄選單飛:有些鳥注定是關不住的

    作者:彭寬鴻來源:華爾街科技眼&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;&zwj;東方甄選創始人俞敏洪帶隊的&ldquo;7天甘肅行&rdquo;直播活動已在近日順利收官。成立后一
  • 信通院:小米、華為等11家應用商店基本完成APP簽名及驗簽工作

    中國信通院表示,目前,小米、華為、OPPO、vivo、360手機助手、百度手機助手、應用寶、豌豆莢和努比亞等9家應用商店,以及抖音和快手2家新型應用分發平
  • 消息稱小米汽車開始篩選交付中心:需至少120個車位

    IT之家 7 月 7 日消息,日前,有微博簡介為“汽車行業從業者、長三角一體化擁護者”的微博用戶 @長三角行健者 發文表示,據經銷商集團反饋,小米汽車目前
  • 2299元起!iQOO Pad明晚首銷:性能最強天璣平板

    5月23日,iQOO如期舉行了新品發布會,除了首發安卓最強旗艦處理器的iQOO Neo8系列新機外,還在發布會上推出了旗下首款平板電腦——iQOO Pad,其最大的賣點
Top 主站蜘蛛池模板: 周宁县| 南木林县| 海宁市| 黑龙江省| 左权县| 中江县| 万宁市| 和平县| 监利县| 望谟县| 安泽县| 雷山县| 晋中市| 南召县| 若尔盖县| 阳城县| 义乌市| 汤阴县| 西林县| 晴隆县| 丹江口市| 万盛区| 龙山县| 墨脱县| 仁化县| 英山县| 朔州市| 大英县| 长沙县| 涿鹿县| 永寿县| 岳阳市| 赣榆县| 遵义县| 舒城县| 玉溪市| 乐陵市| 罗平县| 称多县| 万安县| 邯郸县|