哈嘍,大家好,我是了不起。
在實(shí)際的企業(yè)開(kāi)發(fā)中,消息中間件是至關(guān)重要的組件之一。如常見(jiàn)的RabbitMQ和Kafka,這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,這時(shí)候 Spring Cloud Stream 給我們提供了一種解耦合的方式。
Spring Cloud Stream 由一個(gè)中間件中立的核組成。
應(yīng)用通過(guò) Spring Cloud Stream 插入的Input(相當(dāng)于消費(fèi)者Consumer,它是從隊(duì)列中接收消息的)和Output(相當(dāng)于生產(chǎn)者Producer,它是從隊(duì)列中發(fā)送消息的。)通道與外界交流。
通道通過(guò)指定中間件的Binder實(shí)現(xiàn)與外部代理連接。
業(yè)務(wù)開(kāi)發(fā)者不再關(guān)注具體消息中間件,只需關(guān)注Binder對(duì)應(yīng)用程序提供的抽象概念來(lái)使用消息中間件實(shí)現(xiàn)業(yè)務(wù)即可。
Spring Cloud Stream 為各大消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn),引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。
Spring Cloud Stream 提供了很多抽象和基礎(chǔ)組件來(lái)簡(jiǎn)化消息驅(qū)動(dòng)型微服務(wù)應(yīng)用。包含以下內(nèi)容:
Spring Cloud Stream由一個(gè)中立的中間件內(nèi)核組成。Spring Cloud Stream會(huì)注入輸入和輸出的channels,應(yīng)用程序通過(guò)這些channels與外界通信,而channels則是通過(guò)一個(gè)明確的中間件Binder與外部brokers連接。
圖片
Spring Cloud Stream 提供對(duì)Kafka、Rabbit MQ、Redis、Gemfire的Binder實(shí)現(xiàn)。Spring Cloud Stream還包括了一個(gè)TestSupportBinder、TestSupportBinder預(yù)留一個(gè)未更改的channel以便于直接地、可靠地和channels通信。
分區(qū)在有狀態(tài)處理中是一個(gè)很重要的概念,其重要性體現(xiàn)在性能和一致性上,要確保所有相關(guān)數(shù)據(jù)被一并處理,例如,在時(shí)間窗平均計(jì)算的例子中,給定傳感器測(cè)量結(jié)果應(yīng)該都由同一應(yīng)用實(shí)例進(jìn)行計(jì)算。
Spring Cloud Stream支持在一個(gè)應(yīng)用程序的多個(gè)實(shí)例之間數(shù)據(jù)分區(qū),在分區(qū)的情況下,物理通信介質(zhì)(例如,topic代理)被視為多分區(qū)結(jié)構(gòu)。一個(gè)或多個(gè)生產(chǎn)者應(yīng)用程序?qū)嵗龑?shù)據(jù)發(fā)送給多個(gè)消費(fèi)應(yīng)用實(shí)例,并保證共同的特性的數(shù)據(jù)由相同的消費(fèi)者實(shí)例處理。
Spring Cloud Stream 提供了一個(gè)通用的抽象,用于統(tǒng)一方式進(jìn)行分區(qū)處理,因此分區(qū)可以用于自帶分區(qū)的代理(如Kafka)或者不帶分區(qū)的代理(如RabbieMQ)
Spring Cloud Stream 提供了一些預(yù)定義的注解,用于綁定輸入和輸出channels,以及如何監(jiān)聽(tīng)channels。
將@EnableBinding注解添加到應(yīng)用的配置類,就可以把一個(gè)spring應(yīng)用轉(zhuǎn)換成Spring Cloud Stream應(yīng)用,@EnableBinding注解本身就包含@Configuration注解,會(huì)觸發(fā)Spring Cloud Stream 基本配置。
@Import(...)@Configuration@EnableIntegrationpublic @interface EnableBinding { ... Class<?>[] value() default {};}
一個(gè)Spring Cloud Stream應(yīng)用可以有任意數(shù)目的input和output通道,后者通過(guò)@Input和@Output注解在接口中定義。
定義在方法中,被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽(tīng)器,注解中屬性值對(duì)應(yīng)了監(jiān)聽(tīng)的消息通道名。
Spring Cloud Stream提供了三個(gè)開(kāi)箱即用的預(yù)定義接口。
public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output();}
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input();}
public interface Processor extends Source, Sink {}
下面是一個(gè)非常簡(jiǎn)單的 SpringBootApplication應(yīng)用,通過(guò)依賴Spring Cloud Stream,從Input通道監(jiān)聽(tīng)消息然后返回應(yīng)答到Output通道,只要添加配置文件就可以應(yīng)用。
@SpringBootApplication@EnableBinding(Processor.class)public class ServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); }}
下面解釋下這個(gè)示例中相關(guān)注解的應(yīng)用:
消息發(fā)送失敗后悔發(fā)送到默認(rèn)的一個(gè)“topic.errors"的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯(cuò)誤消息的channel打開(kāi)。
消費(fèi)者配置如下
spring: application: name: spring-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 內(nèi)容格式。這里使用 JSON producer: errorChannelEnabled: true
在啟動(dòng)類中配置錯(cuò)誤消息的Channel信息
@Bean("stream-test-topic.errors")MessageChannel testoutPutErrorChannel(){ return new PublishSubscribeChannel();}
新建異常處理service
import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.stereotype.Service;@Servicepublic class ErrorProducerService { @ServiceActivator(inputChannel = "stream-test-topic.errors") public void receiveProducerError(Message message){ System.out.println("receive error msg :"+message); }}
當(dāng)發(fā)生異常時(shí),由于測(cè)試類中已經(jīng)將異常捕獲,處理發(fā)送異常主要是在這里進(jìn)行。
這篇文章根據(jù) Spring Cloud Stream 的官方文檔,對(duì)Stream做了一個(gè)整體的介紹,包括設(shè)計(jì)目標(biāo),應(yīng)用場(chǎng)景,業(yè)務(wù)模型以及對(duì)外開(kāi)放的注解,希望大家能夠?qū)W以致用。
本文鏈接:http://www.www897cc.com/showinfo-26-66963-0.htmlStream幫你無(wú)感知切換消息中間件
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com