日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

阿里面試:RabbitMQ如何實現延遲隊列?

來源: 責編: 時間:2024-04-28 08:55:17 155觀看
導讀延遲隊列是指當消息被發送以后,并不是立即執行,而是等待特定的時間后,消費者才會執行該消息。延遲隊列的使用場景有以下幾種:未按時支付的訂單,30 分鐘過期之后取消訂單。給活躍度比較低的用戶間隔 N 天之后推送消息,提高活

Ooc28資訊網——每日最新資訊28at.com

延遲隊列是指當消息被發送以后,并不是立即執行,而是等待特定的時間后,消費者才會執行該消息。延遲隊列的使用場景有以下幾種:Ooc28資訊網——每日最新資訊28at.com

  1. 未按時支付的訂單,30 分鐘過期之后取消訂單。
  2. 給活躍度比較低的用戶間隔 N 天之后推送消息,提高活躍度。
  3. 新注冊會員的用戶,等待幾分鐘之后發送歡迎郵件等。

1.如何實現延遲隊列?

延遲隊列有以下兩種實現方式:Ooc28資訊網——每日最新資訊28at.com

  1. 通過消息過期后進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能;
  2. 使用官方提供的延遲插件實現延遲功能。

早期,大部分公司都會采用第一種方式,而隨著 RabbitMQ 3.5.7(2015 年底發布)的延遲插件的發布,因為其使用更簡單、更方便,所以它現在才是大家普通會采用的,實現延遲隊列的方式,所以本文也只講第二種方式。Ooc28資訊網——每日最新資訊28at.com

2.實現延遲隊列

2.1 安裝并啟動延遲隊列

2.1.1 下載延遲插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releasesOoc28資訊網——每日最新資訊28at.com

注意:需要根據你自己的 RabbitMQ 服務器端版本選擇相同版本的延遲插件,可以在 RabbitMQ 控制臺查看:Ooc28資訊網——每日最新資訊28at.com

圖片圖片Ooc28資訊網——每日最新資訊28at.com

2.1.2 將插件放到插件目錄

接下來,將上一步下載的插件放到 RabbitMQ 服務器安裝目錄,如果是 docker,使用一下命令復制:Ooc28資訊網——每日最新資訊28at.com

docker cp 宿主機文件 容器名稱或ID:容器目錄Ooc28資訊網——每日最新資訊28at.com

如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片之后,進入 docker 容器,查看插件中是否包含延遲隊列:Ooc28資訊網——每日最新資訊28at.com

docker exec -it 容器名稱或ID /bin/bash rabbitmq-plugins listOoc28資訊網——每日最新資訊28at.com

如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片Ooc28資訊網——每日最新資訊28at.com

2.1.3 啟動插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchangeOoc28資訊網——每日最新資訊28at.com

如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片Ooc28資訊網——每日最新資訊28at.com

2.1.4 重啟RabbitMQ服務

安裝完 RabbitMQ 插件之后,需要重啟 RabbitMQ 服務才能生效。如果使用的是 Docker,只需要重啟 Docker 容器即可:Ooc28資訊網——每日最新資訊28at.com

docker restart 容器名稱或IDOoc28資訊網——每日最新資訊28at.com

如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片Ooc28資訊網——每日最新資訊28at.com

2.1.5 驗收結果

在 RabbitMQ 控制臺查看,新建交換機時是否有延遲消息選項,如果有就說明延遲消息插件已經正常運行了,如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片Ooc28資訊網——每日最新資訊28at.com

2.1.6 手動創建延遲交換器(可選)

此步驟可選(非必須),因為某些版本下通過程序創建延遲交換器可能會出錯,如果出錯了,手動創建延遲隊列即可,如下圖所示:Ooc28資訊網——每日最新資訊28at.com

圖片Ooc28資訊網——每日最新資訊28at.com

2.2 編寫延遲消息實現代碼

2.2.1 配置交換器和隊列

import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;/** * 延遲交換器和隊列 */@Configurationpublic class DelayedExchangeConfig {    public static final String EXCHANGE_NAME = "myDelayedExchange";    public static final String QUEUE_NAME = "delayed.queue";    public static final String ROUTING_KEY = "delayed.routing.key";    @Bean    public CustomExchange delayedExchange() {        return new CustomExchange(EXCHANGE_NAME,                "x-delayed-message", // 消息類型                true, // 是否持久化                false); // 是否自動刪除    }    @Bean    public Queue delayedQueue() {        return QueueBuilder.durable(QUEUE_NAME)                .withArgument("x-delayed-type", "direct")                .build();    }    @Bean    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();    }}

2.1.2 定義消息發送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;@Componentpublic class DelayedMessageProducer {    @Autowired    private RabbitTemplate rabbitTemplate;    @Scheduled(fixedDelay = 5000)    public void sendDelayedMessage(String message) {        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,                DelayedExchangeConfig.ROUTING_KEY,                message,                messagePostProcessor -> {                    messagePostProcessor.getMessageProperties().setDelay(10000); // 設置延遲時間,單位毫秒                    return messagePostProcessor;                });    }}

2.1.3 發送延遲消息

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/delayed")public class DelayedMessageController {    @Autowired    private DelayedMessageProducer delayedMessageProducer;    @GetMapping("/send")    public String sendDirectMessage(@RequestParam String message) {        delayedMessageProducer.sendDelayedMessage(message);        return "Delayed message sent to Exchange: " + message;    }}

2.1.4 接收延遲消息

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DelayedMessageConsumer {    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)    public void receiveDelayedMessage(String message) {        System.out.println("Received delayed message: " + message);    }}

PS:獲取本文延遲隊列的實現 Demo,請加我:GG_Stone【備注:延遲隊列】Ooc28資訊網——每日最新資訊28at.com

小結

實現 RabbitMQ 延遲隊列目前主流的實現方式,是采用官方提供的延遲插件來實現。而延遲插件需要先下載插件、然后配置并重啟 RabbitMQ 服務,之后就可以通過編寫代碼的方式實現延遲隊列了。Ooc28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-86060-0.html阿里面試:RabbitMQ如何實現延遲隊列?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 電視產業躍升:AI 芯片推動三星電視再度提至新高度

下一篇: 原理剖析| Kafka Exactly Once 語義實現原理:冪等性與事務消息

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 九寨沟县| 兴隆县| 师宗县| 和硕县| 陆丰市| 合阳县| 中宁县| 郴州市| 鸡东县| 清原| 长宁区| 苏尼特右旗| 漳浦县| 屯门区| 乌苏市| 榆林市| 当涂县| 宁都县| 沙坪坝区| 隆子县| 治多县| 项城市| 洞头县| 双峰县| 株洲县| 安福县| 桂东县| 九江市| 赤城县| 南充市| 西乡县| 沙田区| 孝感市| 宁津县| 开阳县| 梨树县| 宁阳县| 祁连县| 温州市| 教育| 沙坪坝区|