在本文中,我們將深入研究使用Rust構(gòu)建實時消息代理服務器,展示其強大的并發(fā)特性。我們將使用Warp作為web服務器,并使用Tokio來管理異步任務。此外,我們將創(chuàng)建一個WebSocket客戶端來測試代理服務器的功能。
設計圖如下:
圖片
消息代理服務器允許客戶端為主題生成事件并訂閱它們。它使用Warp作為HTTP和WebSocket服務器,使用Tokio作為異步運行時。
使用以下命令創(chuàng)建一個Rust項目:
cargo new real-ime-message
在Cargo.toml文件中加入以下依賴項:
[dependencies]futures-util = "0.3.30"tokio = {version = "1.35.1", features = ["full"]}tokio-tungstenite = "0.21.0"url = "2.5.0"warp = "0.3.6"
在src/main.rs文件中定義一個Broker結(jié)構(gòu)體:
use std::{ collections::{HashMap, VecDeque}, sync::Arc,};use futures_util::{SinkExt, StreamExt};use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock,};use warp::{filters::ws::Message, Filter};type Topic = String;type Event = String;type WsSender = UnboundedSender<warp::ws::Message>;struct Broker { events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>, subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,}
創(chuàng)建一個新的Broker實例:
impl Broker { fn new() -> Self { Broker { events: Arc::new(RwLock::new(HashMap::new())), subscribers: Arc::new(RwLock::new(HashMap::new())), } }}
定義發(fā)布事件的方法produce:
impl Broker { ...... async fn produce(&self, topic: Topic, event: Event) { let mut events = self.events.write().await; events .entry(topic.clone()) .or_default() .push_back(event.clone()); // 異步通知所有訂閱者 let subscribers_list; { let subscribers = self.subscribers.read().await; subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default(); } for ws_sender in subscribers_list { // 將事件發(fā)送到WebSocket客戶端 let _ = ws_sender.send(warp::ws::Message::text(event.clone())); } }}
這個方法主要是將事件添加到相應的主題,然后將新事件通知所有訂閱者。
定義subscribe方法,來管理新的訂閱:
impl Broker { ...... pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) { let (ws_sender, mut ws_receiver) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::<Message>(); { let mut subs = self.subscribers.write().await; subs.entry(topic).or_default().push(tx); } tokio::task::spawn(async move { while let Some(result) = ws_receiver.next().await { match result { Ok(message) => { // 處理有效的消息 if message.is_text() { println!( "Received message from client: {}", message.to_str().unwrap() ); } } Err(e) => { // 處理錯誤 eprintln!("WebSocket error: {:?}", e); break; } } } println!("WebSocket connection closed"); }); tokio::task::spawn(async move { let mut sender = ws_sender; while let Some(msg) = rx.recv().await { let _ = sender.send(msg).await; } }); }}
這個方法主要是將WebSocket拆分為發(fā)送方和接收方,將訂閱者添加到訂閱者列表中,處理傳入的WebSocket消息。
main函數(shù)代碼如下:
#[tokio::main]async fn main() { let broker = Arc::new(Broker::new()); let broker_clone1 = Arc::clone(&broker); let broker_clone2 = Arc::clone(&broker); let produce = warp::path!("produce" / String) .and(warp::post()) .and(warp::body::json()) .and(warp::any().map(move || Arc::clone(&broker_clone1))) .and_then( move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move { broker_clone2.produce(topic, event).await; Ok::<_, warp::Rejection>(warp::reply()) }, ); let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map( move |topic: String, ws: warp::ws::Ws| { let broker_clone3 = Arc::clone(&broker_clone2); ws.on_upgrade(move |socket| async move { broker_clone3.subscribe(topic.clone(), socket).await; }) }, ); let routes = produce.or(subscribe); println!("Broker server running at http://127.0.0.1:3030"); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;}
WebSocket客戶端將模擬一個訂閱主題和接收消息的真實用戶。
在src/bin目錄下,創(chuàng)建一個ws_cli.rs文件。在文件中定義websocket_client函數(shù),建立WebSocket連接并管理消息:
use futures_util::{sink::SinkExt, stream::StreamExt};use std::sync::Arc;use tokio::sync::RwLock;use tokio::time::{sleep, Duration};use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};use url::Url;async fn websocket_client(topic_url: &str) { // 解析要連接WebSocket服務器的URL let url = Url::parse(topic_url).expect("Invalid URL"); // 連接到WebSocket服務器 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); println!("WebSocket client connected"); let (mut write, mut read) = ws_stream.split(); let message = Arc::new(RwLock::new(String::new())); let message_1 = message.clone(); // 生成一個任務來處理傳入的消息 tokio::spawn(async move { let msg_lock = message_1.clone(); while let Some(message) = read.next().await { match message { Ok(msg) => { let mut ms = msg_lock.write().await; *ms = msg.to_text().unwrap().to_string(); println!("Received message: {}", msg.to_text().unwrap()); } Err(e) => { eprintln!("Error receiving message: {:?}", e); break; } } } }); // 發(fā)送消息 loop { let msg_lock = message.clone(); let ms = msg_lock.read().await; if let Err(e) = write.send(Message::Text(ms.to_string())).await { eprintln!("Error sending message: {:?}", e); break; } sleep(Duration::from_secs(5)).await; }}
main函數(shù)代碼如下:
#[tokio::main]async fn main() { websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;}
執(zhí)行如下命令運行消息代理服務器:
cargo run --bin real-ime-message
執(zhí)行結(jié)果:
Broker server running at http://127.0.0.1:3030
然后打開一個新的命令行,執(zhí)行如下命令運行WebSocket客戶端:
cargo run --bin ws_cli
執(zhí)行結(jié)果:
WebSocket client connected
向http://127.0.0.1:3030/produce/newtopic接口發(fā)送post請求,如圖:
圖片
客戶端接收到消息:
WebSocket client connectedReceived message: This is a new event
我們已經(jīng)探索了在Rust中創(chuàng)建一個簡單的消息代理,并使用WebSocket客戶端對其進行測試。這個例子突出了Rust在構(gòu)建高效、并發(fā)的網(wǎng)絡應用程序方面的能力。
本文鏈接:http://www.www897cc.com/showinfo-26-70392-0.html異步Rust:構(gòu)建實時消息代理服務器
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com