日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

Spring Boot項(xiàng)目集成RabbitMQ實(shí)戰(zhàn)以及坑點(diǎn)講解

來源: 責(zé)編: 時(shí)間:2024-02-01 12:51:51 239觀看
導(dǎo)讀本文給大家介紹一下在 Spring Boot 項(xiàng)目中如何集成消息隊(duì)列 RabbitMQ,包含對 RibbitMQ 的架構(gòu)介紹、應(yīng)用場景、坑點(diǎn)解析以及代碼實(shí)戰(zhàn)。最后文末有免費(fèi)領(lǐng)取龍年紅包封面以及騰訊云社區(qū)答題領(lǐng)獎福利,歡迎大家領(lǐng)取。我將使

本文給大家介紹一下在 Spring Boot 項(xiàng)目中如何集成消息隊(duì)列 RabbitMQ,包含對 RibbitMQ 的架構(gòu)介紹、應(yīng)用場景、坑點(diǎn)解析以及代碼實(shí)戰(zhàn)。最后文末有免費(fèi)領(lǐng)取龍年紅包封面以及騰訊云社區(qū)答題領(lǐng)獎福利,歡迎大家領(lǐng)取。yEI28資訊網(wǎng)——每日最新資訊28at.com

我將使用 waynboot-mall 項(xiàng)目作為代碼講解,項(xiàng)目地址:https://github.com/wayn111/waynboot-mall。本文大綱如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

RabbitMQ 架構(gòu)介紹

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

RibbitMQ 是一個(gè)基于 AMQP 協(xié)議的開源消息隊(duì)列系統(tǒng),具有高性能、高可用、高擴(kuò)展等特點(diǎn)。通常作為在系統(tǒng)間傳遞消息的中間件,它可以實(shí)現(xiàn)異步處理、應(yīng)用解耦、流量削峰等功能。yEI28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

RibbitMQ 的主要組件介紹如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

  • producter:生產(chǎn)者,創(chuàng)建消息,然后將消息發(fā)布(發(fā)送)到 RabbitMQ。
  • channel: 信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的 TCP 連接內(nèi)地虛擬鏈接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動作都是通過信道完成。因?yàn)閷τ诓僮飨到y(tǒng)來說,建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
  • broker: 標(biāo)識消息隊(duì)列服務(wù)器實(shí)體 rabbitmq-server。
  • 連接器:這是負(fù)責(zé)接收客戶端連接請求和建立連接的組件。RabbitMQ 支持多種連接器,如 AMQP 0-9-1, AMQP 1.0, MQTT, STOMP 等。
  • v-host:虛擬主機(jī),這是 RabbitMQ 的邏輯隔離單元,每個(gè)虛擬主機(jī)相當(dāng)于一個(gè)獨(dú)立的代理,擁有自己的交換器、隊(duì)列、綁定、權(quán)限等。不同的虛擬主機(jī)之間是相互隔離的,不能共享資源。一個(gè) RabbitMQ 實(shí)例可以創(chuàng)建多個(gè)虛擬主機(jī),以滿足不同的業(yè)務(wù)需求。
  • exchange:交換機(jī),這是負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)路由規(guī)則將消息分發(fā)到相應(yīng)的隊(duì)列或者其他交換器的組件。RabbitMQ 支持多種類型的交換器,如 fanout, direct, topic, headers 等。
  • binding:綁定,這是負(fù)責(zé)將交換器和隊(duì)列之間建立關(guān)聯(lián)關(guān)系的組件。綁定可以指定一個(gè)路由鍵或者模式匹配規(guī)則,以決定哪些消息可以被路由到哪些隊(duì)列。
  • queue:隊(duì)列,這是負(fù)責(zé)存儲消費(fèi)者需要消費(fèi)的消息的組件。隊(duì)列可以有多種屬性和特性,如持久化、排他性、自動刪除、死信隊(duì)列、優(yōu)先級隊(duì)列等。隊(duì)列可以綁定到一個(gè)或多個(gè)交換器上,并指定一個(gè)或多個(gè)路由鍵或者模式匹配規(guī)則。
  • consuemer:消費(fèi)者,連接到 RabbitMQ 服務(wù)器,并訂閱到隊(duì)列上,接收來自隊(duì)列的消息。

應(yīng)用場景

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

RabbitMQ 是一個(gè)非常強(qiáng)大和靈活的消息中間件,它可以應(yīng)用于多種場景和需求。以下是一些常見的 RabbitMQ 應(yīng)用場景和實(shí)戰(zhàn)經(jīng)驗(yàn):yEI28資訊網(wǎng)——每日最新資訊28at.com

  • 異步處理:當(dāng)系統(tǒng)需要執(zhí)行一些耗時(shí)或者不重要的任務(wù)時(shí),可以使用 RabbitMQ 將任務(wù)封裝成消息發(fā)送到隊(duì)列中,然后由專門的消費(fèi)者來異步地執(zhí)行這些任務(wù)。這樣可以提高系統(tǒng)的響應(yīng)速度和用戶體驗(yàn),同時(shí)也可以避免因?yàn)槿蝿?wù)失敗或超時(shí)而影響主流程的執(zhí)行。例如在 waynboot-mall 項(xiàng)目中,用戶下單后需要發(fā)送郵件通知,這個(gè)任務(wù)就可以使用 RabbitMQ 異步處理。
  • 流量削峰:當(dāng)系統(tǒng)面臨突發(fā)的高并發(fā)請求時(shí),如果直接讓所有請求打到后端服務(wù)器上,可能會導(dǎo)致服務(wù)器崩潰或者響應(yīng)緩慢。這時(shí)可以使用 RabbitMQ 作為一個(gè)緩沖層,將請求先發(fā)送到隊(duì)列中,然后由后端服務(wù)器按照自己的處理能力從隊(duì)列中拉取請求進(jìn)行處理。這樣可以平滑地分?jǐn)傉埱髩毫?,避免系統(tǒng)崩潰或者服務(wù)降級。例如,在 waynboot-mall 項(xiàng)目中,每天晚上八點(diǎn)有秒殺活動,這時(shí)可以使用 RabbitMQ 來削峰限流,保證系統(tǒng)的穩(wěn)定運(yùn)行。
  • 消息廣播:當(dāng)系統(tǒng)需要將消息發(fā)送到多個(gè)接收方時(shí),可以使用 RabbitMQ 的發(fā)布/訂閱模式,將消息發(fā)送到一個(gè) fanout 類型的交換器上,然后由多個(gè)隊(duì)列綁定到這個(gè)交換器上,從而實(shí)現(xiàn)消息的廣播功能。這樣可以實(shí)現(xiàn)一對多的消息通信,同時(shí)也可以根據(jù)不同的業(yè)務(wù)需求,訂閱不同的消息內(nèi)容。例如,在 waynboot-mall 項(xiàng)目中,當(dāng)商品信息發(fā)生變化時(shí),需要通知搜索系統(tǒng)、推薦系統(tǒng)、緩存系統(tǒng)等多個(gè)系統(tǒng),這時(shí)可以使用 RabbitMQ 的消息廣播功能。
  • 消息路由:當(dāng)系統(tǒng)需要根據(jù)不同的條件將消息發(fā)送到不同的接收方時(shí),可以使用 RabbitMQ 的路由模式,將消息發(fā)送到一個(gè) direct 或者 topic 類型的交換器上,然后由多個(gè)隊(duì)列綁定到這個(gè)交換器上,并指定不同的路由鍵或者模式匹配規(guī)則,從而實(shí)現(xiàn)消息的路由功能。這樣可以實(shí)現(xiàn)多對多的消息通信,同時(shí)也可以靈活地控制消息的分發(fā)和消費(fèi)。例如,在 waynboot-mall 項(xiàng)目中,當(dāng)訂單狀態(tài)發(fā)生變化時(shí),需要通知不同的系統(tǒng)進(jìn)行不同的處理,這時(shí)可以使用 RabbitMQ 的消息路由功能。

坑點(diǎn)解析

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

在使用 RabbitMQ 的過程中,有一些常見的問題需要注意:yEI28資訊網(wǎng)——每日最新資訊28at.com

  • 消息確認(rèn):消息確認(rèn)是 RabbitMQ 保證消息可靠傳遞的機(jī)制。消息確認(rèn)分為生產(chǎn)者確認(rèn)和消費(fèi)者確認(rèn)。生產(chǎn)者確認(rèn)是指生產(chǎn)者發(fā)送消息后,等待 RabbitMQ 返回一個(gè)確認(rèn)消息,表明消息已經(jīng)被正確接收和存儲。消費(fèi)者確認(rèn)是指消費(fèi)者接收消息后,向 RabbitMQ 發(fā)送一個(gè)確認(rèn)消息,表明消息已經(jīng)被正確處理和消費(fèi)。在 waynboot-mall 項(xiàng)目中,消費(fèi)者開啟了手動消息確認(rèn)。
  • 消息持久化:消息持久化是指將消息存儲到磁盤上,以防止 RabbitMQ 重啟或者崩潰時(shí)丟失消息。消息持久化需要滿足以下三個(gè)條件:交換器、隊(duì)列和消息都需要設(shè)置為持久化。持久化會影響 RabbitMQ 的性能,因?yàn)樾枰M(jìn)行磁盤 IO 操作。建議根據(jù)業(yè)務(wù)需求選擇是否需要持久化消息,并合理地配置磁盤空間和清理策略。在 waynboot-mall 項(xiàng)目中,交換器、隊(duì)列設(shè)置了持久化,消息沒有設(shè)置持久化(消息設(shè)置持久化會對 RabbitMQ 的性能造成較大影響)。
  • 死信隊(duì)列:死信隊(duì)列是指存儲那些因?yàn)槟承┰驘o法被正常消費(fèi)的消息的隊(duì)列。死信隊(duì)列可以用來處理一些異常或者失敗的情況,如消息過期、隊(duì)列達(dá)到最大長度、消費(fèi)者拒絕等。建議使用死信隊(duì)列來監(jiān)控和處理這些情況,并根據(jù)業(yè)務(wù)需求選擇合適的重試或者補(bǔ)償策略。在 waynboot-mall 項(xiàng)目中,當(dāng)訂單消費(fèi)者處理消息失敗重試三次后,會將訂單消息發(fā)送到死信隊(duì)列。
  • 集群和鏡像:集群和鏡像是 RabbitMQ 實(shí)現(xiàn)高可用和高擴(kuò)展的兩種方式。集群是指將多個(gè) RabbitMQ 實(shí)例組成一個(gè)邏輯單元,共享元數(shù)據(jù)和負(fù)載均衡。鏡像是指將同一個(gè)隊(duì)列在多個(gè)節(jié)點(diǎn)上創(chuàng)建副本,實(shí)現(xiàn)數(shù)據(jù)冗余和容錯(cuò)。建議根據(jù)業(yè)務(wù)需求選擇合適的集群模式和鏡像類型,并注意集群中的網(wǎng)絡(luò)分區(qū)、腦裂等問題。

代碼實(shí)戰(zhàn)

在 waynboot-mall 項(xiàng)目中,消息層包含兩個(gè)模塊 waynboot-message-core 以及 waynboot-message-consumer,目錄結(jié)構(gòu)如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

|-- waynboot-message-core     // 核心消息配置,供其他服務(wù)集成使用|   |-- config|   |-- constant|   |-- dto|-- waynboot-message-consumer // 消息消費(fèi)服務(wù),訂閱隊(duì)列接收消息,調(diào)用其他服務(wù)執(zhí)行一些具體的業(yè)務(wù)邏輯|   |-- api|   |-- config|   |-- consumer

waynboot-message-core 包目錄說明如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

  • config:核心消息配置目錄,包含業(yè)務(wù)上使用的訂單消息、郵件消息、死信消息、延遲消息的交換機(jī)、隊(duì)列、路由綁定配置以及 RabbitTemplate 配置。
  • constants:核心消息配置的相關(guān)常量目錄,包含 MQ 的常量類,這里面會定義訂單、郵件、死信、延遲消息的交換機(jī)名稱、隊(duì)列名稱、路由鍵名稱等。
  • dto:核心消息配置的數(shù)據(jù)轉(zhuǎn)換實(shí)體目錄,包含 OrderDTO 等。

waynboot-message-consumer 包目錄說明如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

  • api:消息消費(fèi)服務(wù)調(diào)用其他服務(wù)定義的 api 包目錄,包含 MobileApi 類用來調(diào)用 moibile-api。
  • config:消息消費(fèi)服務(wù)的核心配置目錄,包含 RestTemplate 配置類。
  • consumer:消息消費(fèi)服務(wù)的消費(fèi)者包目錄,包含下單、發(fā)送郵件、未支付訂單超時(shí)取消等消費(fèi)者。

添加 POM 依賴

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId>    <version>${spring-boot.version}</version></dependency>

指定虛擬主機(jī)

圖片圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

在 waynboot-mall 項(xiàng)目中,通過 yml 文件的 spring.rabbitmq.virtual-host=“/” 屬性來指定虛擬主機(jī)名稱。yEI28資訊網(wǎng)——每日最新資訊28at.com

建議大家在使用 RabbitMQ 時(shí)都配置好自己項(xiàng)目的虛擬主機(jī)名稱,來達(dá)到各系統(tǒng)資源隔離的目的。當(dāng)然如果 RabbitMQ 服務(wù)只有一個(gè)項(xiàng)目在用,那就用默認(rèn)的 / 作為虛擬主機(jī)名稱也是可以的。yEI28資訊網(wǎng)——每日最新資訊28at.com

小知識:出于多租戶和安全因素設(shè)計(jì)的,vhost 把 AMQP 的基本組件劃分到一個(gè)虛擬的分組中。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換機(jī)、綁定和權(quán)限機(jī)制。當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ 服務(wù)器時(shí),可以劃分出多個(gè)虛擬主機(jī)。RabbitMQ 默認(rèn)的虛擬主機(jī)路徑是 /。yEI28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者發(fā)送消息

在 waynboot-mall 項(xiàng)目中,用訂單消息來舉例,生產(chǎn)者發(fā)送消息需要經(jīng)過三個(gè)步驟yEI28資訊網(wǎng)——每日最新資訊28at.com

1. 創(chuàng)建訂單消息的交換機(jī)、隊(duì)列以及路由綁定

public class MQConstants {    public static final String ORDER_DIRECT_QUEUE = "order_direct_queue";    public static final String ORDER_DIRECT_EXCHANGE = "order_direct_exchange";    public static final String ORDER_DIRECT_ROUTING = "order_direct_routing";}@Configurationpublic class BusinessRabbitConfig {    @Bean    public Queue orderDirectQueue() {        return new Queue(MQConstants.ORDER_DIRECT_QUEUE);    }    @Bean    DirectExchange orderDirectExchange() {        return new DirectExchange(MQConstants.ORDER_DIRECT_EXCHANGE);    }    @Bean    Binding bindingOrderDirect() {        return BindingBuilder.bind(orderDirectQueue()).to(orderDirectExchange()).with(MQConstants.ORDER_DIRECT_ROUTING);    }}

在 BusinessRabbitConfig 中,我們創(chuàng)建了訂單交換機(jī)、隊(duì)列以及路由綁定關(guān)系。在 Spring 項(xiàng)目中,項(xiàng)目啟動時(shí),就會自動在 RabbitMQ 服務(wù)器上創(chuàng)建好這些東西。yEI28資訊網(wǎng)——每日最新資訊28at.com

交換機(jī)列表交換機(jī)列表yEI28資訊網(wǎng)——每日最新資訊28at.com

隊(duì)列列表隊(duì)列列表yEI28資訊網(wǎng)——每日最新資訊28at.com

2. 生產(chǎn)者配置

圖片yEI28資訊網(wǎng)——每日最新資訊28at.com

生產(chǎn)者的消息發(fā)送確認(rèn)主要包含兩部分,yEI28資訊網(wǎng)——每日最新資訊28at.com

producter -> rabbitmq broker exchange -> queueyEI28資訊網(wǎng)——每日最新資訊28at.com

  • 消息從 producte( 生產(chǎn)者)發(fā)送到 rabbitmq broker(RabbitMQ 服務(wù)器)的交換機(jī)中,發(fā)送后會觸發(fā) confirmCallBack 回調(diào)
  • 消息從 exchange 發(fā)送到 queue,投遞失敗則會調(diào)用 returnCallBack 回調(diào)

waynboot-mall 項(xiàng)目的 yml 中關(guān)于 RabbitMQ 的相關(guān)配置如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

spring:  # 配置rabbitMq 服務(wù)器  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    # 消息確認(rèn)配置項(xiàng)    # 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)    publisher-confirm-type: correlated    # 確認(rèn)消息已發(fā)送到隊(duì)列(Queue)    publisher-returns: true    # 虛擬主機(jī)名稱    virtual-host: /
publisher-confirm-type 屬性

可以看到,我們設(shè)置了 publisher-confirm-type 屬性為 correlated,表示開啟發(fā)布確認(rèn)模式,用來確認(rèn)消息已發(fā)送到交換機(jī),publisher-confirm-type 有三個(gè)選項(xiàng):yEI28資訊網(wǎng)——每日最新資訊28at.com

  • NONE:禁用發(fā)布確認(rèn)模式,是默認(rèn)值
  • CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回 confirmCallBack 回調(diào)方法
  • SIMPLE:經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是 waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel,則接下來無法發(fā)送消息到 broker。
publisher-returns 屬性

在 RabbitMQ 中,消息發(fā)送到交換機(jī)中也不代表消費(fèi)者一定能接收到消息,所以我們還需要設(shè)置 publisher-returns 為 true 來表示確認(rèn)交換機(jī)中消息已經(jīng)發(fā)送到隊(duì)列里。true 表示開啟失敗回調(diào),開啟后當(dāng)消息無法路由到指定隊(duì)列時(shí)會觸發(fā) ReturnCallback 回調(diào)。yEI28資訊網(wǎng)——每日最新資訊28at.com

接著是 RabbitTemplateConfig 的代碼,這里面會定義前面提到的 confirmCallBack、returnCallBack 相關(guān)代碼,yEI28資訊網(wǎng)——每日最新資訊28at.com

@Slf4j@Componentpublic class RabbitTemplateConfig {    @Bean    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        // 設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強(qiáng)制調(diào)用回調(diào)函數(shù)        rabbitTemplate.setMandatory(true);        // 交換機(jī)收到消息回調(diào)        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));        // 隊(duì)列收到消息回調(diào),如果失敗的話會進(jìn)行 returnCallback 的回調(diào)處理,反之成功就不會回調(diào)。        rabbitTemplate.setReturnsCallback(returned -> {            log.info("returnCallback:     " + "消息:" + returned.getMessage());            log.info("returnCallback:     " + "回應(yīng)碼:" + returned.getReplyCode());            log.info("returnCallback:     " + "回應(yīng)信息:" + returned.getReplyText());            log.info("returnCallback:     " + "交換機(jī):" + returned.getExchange());            log.info("returnCallback:     " + "路由鍵:" + returned.getRoutingKey());        });        return rabbitTemplate;    }}

在 RabbitTemplateConfig 類代碼里,我們可以設(shè)置 confirmCallBack、returnCallBack 回調(diào)函數(shù)后,監(jiān)控生產(chǎn)者發(fā)送消息是否被交換機(jī)接收、以及交換機(jī)是否把消息發(fā)送到隊(duì)列中。yEI28資訊網(wǎng)——每日最新資訊28at.com

3. 使用 RabbitTemplate 發(fā)送消息

在 Spring Boot 項(xiàng)目中,集成了 spring-boot-starter-amqp 依賴后,就可以直接注入 RabbitTemplate 來發(fā)送消息。yEI28資訊網(wǎng)——每日最新資訊28at.com

這里用 waynboot-mall 項(xiàng)目中的異步下單流程舉例,代碼如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

@Slf4j@Service@AllArgsConstructorpublic class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {    private RabbitTemplate rabbitTemplate;    @Override    public R asyncSubmit(OrderVO orderVO) {        OrderDTO orderDTO = new OrderDTO();        ...        // 開始異步下單        String uid = IdUtil.getUid();        // 1. 創(chuàng)建消息ID,確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一 id,以區(qū)分不同消息,避免 ack 沖突        CorrelationData correlationData = new CorrelationData(uid);        // 2. 創(chuàng)建消息載體 Message ,AMQP 規(guī)范中定義的消息承載類,用來在生產(chǎn)者和消費(fèi)者之前傳遞消息        Map<String, Object> map = new HashMap<>();        map.put("order", orderDTO);        map.put("notifyUrl", WaynConfig.getMobileUrl() + "/callback/order/submit");        try {            Message message = MessageBuilder                    .withBody(JSON.toJSONString(map).getBytes(Constants.UTF_ENCODING))                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)                    .build();            // 3. 發(fā)送消息到 RabbitMQ 服務(wù)器,需要指定交換機(jī)、路由鍵、消息載體以及消息ID            rabbitTemplate.convertAndSend(MQConstants.ORDER_DIRECT_EXCHANGE, MQConstants.ORDER_DIRECT_ROUTING, message, correlationData);        } catch (UnsupportedEncodingException e) {            log.error(e.getMessage(), e);        }        return R.success().add("actualPrice", actualPrice).add("orderSn", orderSn);    }}

waynboot-mall 項(xiàng)目中在使用 rabbitTemplate 發(fā)送消息時(shí),按照如下步驟,大家可以參考yEI28資訊網(wǎng)——每日最新資訊28at.com

  1. 創(chuàng)建消息 ID,確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一 id,以區(qū)分不同消息,消費(fèi)者消費(fèi)時(shí)出現(xiàn) ack 沖突。
  2. 創(chuàng)建消息載體 Message ,AMQP 規(guī)范中定義的消息承載類,用來在生產(chǎn)者和消費(fèi)者之前傳遞消息。
  3. 發(fā)送消息到 RabbitMQ 服務(wù)器,需要指定交換機(jī)、路由鍵、消息載體以及消息 ID。

以上就是生產(chǎn)者發(fā)送消息時(shí)所有相關(guān)代碼了,接著我們看下消費(fèi)者處理消息的相關(guān)代碼。yEI28資訊網(wǎng)——每日最新資訊28at.com

消費(fèi)者處理消息

在 waynboot-mall 項(xiàng)目中,還是用訂單消息來舉例,消費(fèi)者 yml 配置如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

1. 消費(fèi)者配置

在 RabbitMQ 的消息消費(fèi)環(huán)節(jié),需要注意的一點(diǎn)就是,如果需要確保消費(fèi)者不出現(xiàn)漏消費(fèi),則需要開啟消費(fèi)者的手動 ack 模式。yEI28資訊網(wǎng)——每日最新資訊28at.com

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    ...    listener:      simple:        # 消息確認(rèn)方式,其有三種配置方式,分別是none、manual(手動ack) 和auto(自動ack) 默認(rèn)auto        acknowledge-mode: manual        # 一個(gè)消費(fèi)者最多可處理的nack(未確認(rèn))消息數(shù)量,默認(rèn)是250        prefetch: 250        # 設(shè)置消費(fèi)者數(shù)量        concurrency: 1
acknowledge-mode 屬性

在 yml 文件的消費(fèi)者配置中,acknowledge-mode 屬性用于指定消息確認(rèn)模式,有三種模式:yEI28資訊網(wǎng)——每日最新資訊28at.com

  1. 手動確認(rèn) manual,在該模式下,消費(fèi)者消費(fèi)消息后需要根據(jù)消費(fèi)情況給 Broker 返回一個(gè)回執(zhí),是確認(rèn) ack 使 Broker 刪除該條已消費(fèi)的消息,還是失敗確認(rèn)返回 nack,還是拒絕該消息。開啟手動確認(rèn)后,如果消費(fèi)者接收到消息后還沒有返回 ack 就宕機(jī)了,這種情況下消息也不會丟失,只有 RabbitMQ 接收到返回 ack 后,消息才會從隊(duì)列中被刪除。
  2. 自動確認(rèn) none,rabbitmq 默認(rèn)消費(fèi)者正確處理所有請求(不設(shè)置時(shí)的默認(rèn)方式)。
  3. 根據(jù)請況確認(rèn) auto,主要分成以下幾種情況:

如果消費(fèi)者在消費(fèi)的過程中沒有拋出異常,則自動確認(rèn)。yEI28資訊網(wǎng)——每日最新資訊28at.com

當(dāng)消費(fèi)者消費(fèi)的過程中拋出 AmqpRejectAndDontRequeueException 異常的時(shí)候,則消息會被拒絕,且該消息不會重回隊(duì)列。yEI28資訊網(wǎng)——每日最新資訊28at.com

當(dāng)拋出 ImmediateAcknowledgeAmqpException 異常,消息會被確認(rèn)。yEI28資訊網(wǎng)——每日最新資訊28at.com

如果拋出其他的異常,則消息會被拒絕,但是與前兩個(gè)不同的是,該消息會重回隊(duì)列,如果此時(shí)只有一個(gè)消費(fèi)者監(jiān)聽該隊(duì)列,那么該消息重回隊(duì)列后又會推送給該消費(fèi)者,會造成死循環(huán)的情況。yEI28資訊網(wǎng)——每日最新資訊28at.com

prefetch 屬性

消費(fèi)者配置中,prefetch 屬性用于指定消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量。yEI28資訊網(wǎng)——每日最新資訊28at.com

每個(gè) customer 會在 MQ 預(yù)取一些消息放入內(nèi)存的 LinkedBlockingQueue 中進(jìn)行消費(fèi),這個(gè)值越高,消息傳遞的越快,但非順序處理消息的風(fēng)險(xiǎn)更高。如果 ack 模式為 none,則忽略。yEI28資訊網(wǎng)——每日最新資訊28at.com

prefetch 默認(rèn)值以前是 1,這可能會導(dǎo)致高效使用者的利用率不足。從 spring-amqp 2.0 版開始,默認(rèn)的 prefetch 值是 250,這將使消費(fèi)者在大多數(shù)常見場景中保持忙碌,從而提高吞吐量。yEI28資訊網(wǎng)——每日最新資訊28at.com

不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量內(nèi)存;以及對于一些嚴(yán)格要求順序的消息,prefetch 的值應(yīng)當(dāng)設(shè)置為 1。yEI28資訊網(wǎng)——每日最新資訊28at.com

對于低容量消息和多個(gè)消費(fèi)者的情況(也包括單 listener 容器的 concurrency 配置)希望在多個(gè)使用者之間實(shí)現(xiàn)更均勻的消息分布,建議在手動 ack 下并設(shè)置 prefetch=1。yEI28資訊網(wǎng)——每日最新資訊28at.com

如果要保證消息的可靠不丟失,當(dāng) prefetch 大于 1 時(shí),可能會出現(xiàn)因?yàn)榉?wù)宕機(jī)引起的數(shù)據(jù)丟失,故建議將 prefetch=1。yEI28資訊網(wǎng)——每日最新資訊28at.com

concurrency 屬性

消費(fèi)者配置中,concurrency 屬性設(shè)置的是對每個(gè) listener 在初始化的時(shí)候設(shè)置的并發(fā)消費(fèi)者的個(gè)數(shù)。在上面的 yml 配置中,cnotallow=1,即每個(gè) Listener 容器將開啟一個(gè)線程去處理消息。在 2.0 以后的版本中,可以在注解中配置該參數(shù),實(shí)例代碼如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

@RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE, concurrency = "2")public void process(Channel channel, Message message) throws IOException {    String body = new String(message.getBody());    log.info("OrderPayConsumer 消費(fèi)者收到消息: {}", body);    ...}

2. 使用 RabbitListener 注解消費(fèi)消息

在 waynboot-mall 項(xiàng)目中,消費(fèi)者監(jiān)聽隊(duì)列代碼如下,yEI28資訊網(wǎng)——每日最新資訊28at.com

@Slf4j@Componentpublic class OrderPayConsumer {    @Resource    private RedisCache redisCache;    @Resource    private MobileApi mobileApi;    @RabbitListener(queues = MQConstants.ORDER_DIRECT_QUEUE)    public void process(Channel channel, Message message) throws IOException {        // 1. 轉(zhuǎn)換訂單消息        String body = new String(message.getBody());        log.info("OrderPayConsumer 消費(fèi)者收到消息: {}", body);        // 2. 獲取消息ID        String msgId = message.getMessageProperties().getHeader("spring_returned_message_correlation");        // 3. 獲取發(fā)送tag        long deliveryTag = message.getMessageProperties().getDeliveryTag();        // 4. 消費(fèi)消息冪等性處理        if (redisCache.getCacheObject(ORDER_CONSUMER_MAP.getKey()) != null) {            // redis中包含該 key,說明該消息已經(jīng)被消費(fèi)過            log.error("msgId: {},消息已經(jīng)被消費(fèi)", msgId);            channel.basicAck(deliveryTag, false);// 確認(rèn)消息已消費(fèi)            return;        }        try {            // 5. 下單處理            mobileApi.submitOrder(body);            // 6. 手動ack,消息成功確認(rèn)            channel.basicAck(deliveryTag, false);            // 7. 設(shè)置消息已被消費(fèi)標(biāo)識            redisCache.setCacheObject(ORDER_CONSUMER_MAP.getKey(), msgId, ORDER_CONSUMER_MAP.getExpireSecond());        } catch (Exception e) {            channel.basicNack(deliveryTag, false, false);            log.error(e.getMessage(), e);        }    }}

waynboot-mall 項(xiàng)目中在使用 RabbitListener 注解消費(fèi)消息時(shí),按照如下步驟,大家可以參考yEI28資訊網(wǎng)——每日最新資訊28at.com

  1. 將 message 參數(shù)轉(zhuǎn)換成訂單消息。
  2. 從 message 參數(shù)中獲取消息唯一 msgId。
  3. 從 message 參數(shù)中獲取消息發(fā)送 tag。
  4. 冪等性處理,根據(jù)第二步獲取的 msgId ,消費(fèi)消息時(shí)需要先判斷 msgId 是否已經(jīng)被處理。
  5. 調(diào)用 mobile-api 服務(wù),進(jìn)行下單邏輯處理,在 mobileApi.submitOrder(body) 方法中使用 Spring-Retry 的 @Retryable 注解,進(jìn)行自動重試。
  6. 手動 ack,basicAck(long deliveryTag, boolean multiple)。basicAck 方法表示成功確認(rèn),使用此方法后,消息就會被 rabbitmq 服務(wù)器刪除。

其中參數(shù) long deliveryTag 為消息的唯一序號也就是第三步獲取的發(fā)送 tag,第二個(gè) boolean multiple 參數(shù)表示是否一次消費(fèi)多條消息,false 表示只確認(rèn)該序列號對應(yīng)的消息,true 則表示確認(rèn)該序列號對應(yīng)的消息以及比該序列號小的所有消息,比如我先發(fā)送 2 條消息,他們的序列號分別為 2,3,并且他們都沒有被確認(rèn),還留在隊(duì)列中,那么如果當(dāng)前消息序列號為 4,那么當(dāng) multiple 為 true,則序列號為 2、3 的消息也會被一同確認(rèn)。yEI28資訊網(wǎng)——每日最新資訊28at.com

  1. 冪等性處理,消息已經(jīng)被成功消費(fèi)后,根據(jù)第二步獲取的 msgId 設(shè)置冪等標(biāo)識。

總結(jié)一下

這篇文章給大家講解了在 Spring Boot 項(xiàng)目中如何集成消息隊(duì)列 RabbitMQ 用于業(yè)務(wù)邏輯解耦,有架構(gòu)介紹、應(yīng)用場景、坑點(diǎn)解析、代碼實(shí)戰(zhàn) 4 個(gè)部分,能帶領(lǐng)大家比較全面的了解一波 RabbitMQ。大家在自己的項(xiàng)目中如果需要引入 RabbitMQ 時(shí),都可以參考本文的代碼實(shí)戰(zhàn)配置,幫助大家快速集成、避免踩坑。yEI28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-70467-0.htmlSpring Boot項(xiàng)目集成RabbitMQ實(shí)戰(zhàn)以及坑點(diǎn)講解

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 利用Nacos實(shí)現(xiàn)Seata事務(wù)模式(XA與AT)的快速配置與靈活切換

下一篇: 字節(jié)碼增強(qiáng)技術(shù),不止有 Java Proxy、 Cglib 和 Javassist 還有 Byte Buddy

標(biāo)簽:
  • 熱門焦點(diǎn)
Top 主站蜘蛛池模板: 探索| 德兴市| 武城县| 昌吉市| 包头市| 鄂托克前旗| 庆元县| 萝北县| 江北区| 南昌县| 黔西县| 梨树县| 平顺县| 北碚区| 巴东县| 淮北市| 威信县| 玉屏| 项城市| 锦屏县| 搜索| 洪泽县| 凤台县| 山东省| 襄樊市| 玉门市| 富平县| 民和| 大石桥市| 宁海县| 民县| 柘荣县| 香河县| 壶关县| 浦北县| 合肥市| 陆河县| 康平县| 庄浪县| 泰来县| 石门县|