日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

Netty入門實踐:模擬IM聊天

來源: 責編: 時間:2023-12-08 09:14:44 482觀看
導讀我們使用的框架幾乎都有網絡通信的模塊,比如常見的Dubbo、RocketMQ、ElasticSearch等。它們的網絡通信模塊使用Netty實現,之所以選擇Netty,有兩個主要原因:Netty封裝了復雜的JDK 的 NIO操作,還封裝了各種復雜的異常場景,豐

我們使用的框架幾乎都有網絡通信的模塊,比如常見的Dubbo、RocketMQ、ElasticSearch等。它們的網絡通信模塊使用Netty實現,之所以選擇Netty,有兩個主要原因:gpB28資訊網——每日最新資訊28at.com

  • Netty封裝了復雜的JDK 的 NIO操作,還封裝了各種復雜的異常場景,豐富的API使得在使用上也非常方便,幾行代碼就可以實現高性能的網絡通信功能。
  • Netty已經經歷各種大型中間件的生產環境的驗證,高可用性和健壯性都得到了全方位驗證,用起來更放心。

本文以入門實踐為主,通過原理+代碼的方式,實現一個簡易IM聊天功能。分為兩個部分:Netty的核心概念、IM聊天簡易實現。gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

一、Netty核心概念

1、通信流程

既然是網絡通信,那肯定有服務端和客戶端。在客戶端-A和客戶端-B通信的過程中,實際上是利用服務端作為消息中轉站,來實現A-B通信的。gpB28資訊網——每日最新資訊28at.com

不管是點-點通信,還是群通信,都可以認為是客戶端-服務端之間的通信,有了這一點,許多設計方案都可以輕松理解。gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

2、服務端核心概念

(1) Boss線程:Boss線程負責監聽端口,接受新的連接,監聽連接的數據讀寫變化。gpB28資訊網——每日最新資訊28at.com

(2) Worker線程:Worker線程負責處理具體的業務邏輯,Boss線程接收到連接的讀寫變化后,然后交給Worker處理具體業務邏輯。gpB28資訊網——每日最新資訊28at.com

(3) 服務端的IO模型:Netty支持使用NIO和BIO進行通信,可以自行設置。一般使用NioServerSocketChannel來指定NIO模型。gpB28資訊網——每日最新資訊28at.com

(4) 服務端引導類:服務端通過引導類 ServerBootstrap來啟動一系列的工作。gpB28資訊網——每日最新資訊28at.com

3、客戶端核心概念

(1) Worker線程:客戶端只有工作線程的概念,負責連接到服務端,監聽數據讀寫變化。gpB28資訊網——每日最新資訊28at.com

(2) 客戶端的IO模型:一般使用NioSocketChannel指定客戶端的NIO模型gpB28資訊網——每日最新資訊28at.com

(3) 客戶端引導類:客戶端通過引導類Bootstrap來啟動一些列工作。gpB28資訊網——每日最新資訊28at.com

4、通用核心概念

(1) Handler:負責處理接受到的消息,大部分的業務邏輯都是放在Handler里處理。自定義的Handler一般繼承于SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。gpB28資訊網——每日最新資訊28at.com

(2) ByteBuf和編碼、解碼:數據的載體,Java對象編碼成字節碼,存放于ByteBuf,然后發送出去。服務端接收到消息后,從ByteBuf中取出數據,解碼成Java對象。gpB28資訊網——每日最新資訊28at.com

(3) 通訊協議:許多框架都會自定義一套自己的協議,這樣比較符合業務。比如dubbo協議、hessian協議。gpB28資訊網——每日最新資訊28at.com

一般的協議包括如下部分:魔數、版本號、序列化算法、指令、數據長度、數據內容,其余的都是為了適配自身業務而定的。gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

  • 魔數:一般是固定數字,用來快速判斷是否符合本協議,如果不符合本協議,則快速失敗。
  • 版本號:一般無需改動,如果早期設置的協議到了后續不適用了,在升級版本號。
  • 序列化算法:Java對象轉序列化的方式,比如JSON。
  • 指令:操作大類。比如說登錄指令、單點發送消息指令、建群指令等。這樣服務端接收到對應指令就用對應的Handler去處理業務邏輯。指令占用的字節數可以根據自身業務適當調大。
  • 數據長度:用來記錄本次數據的長度。
  • 數據內容:具體消息內容,比如聊天時的消息、登錄時的用戶名密碼等。

(4) 粘包拆包gpB28資訊網——每日最新資訊28at.com

Netty屬于上層應用,在發送消息時,還是通過底層操作系統將數據發送出去,操作系統在發送數據時,不會按照我們設想的消息長度去發送內容。這就需要我們在接收到內容時,自行做好內容的分割和等待。gpB28資訊網——每日最新資訊28at.com

比如有一條消息1024字節,如果接受的內容沒這么長就需要繼續等待,等這條消息的內容完整后,在處理。如果接受的內容包含了1條完整消息和1條不完整的消息,那么就需要拆分內容,將完整的消息先傳遞到后面處理,剩下不完整的消息則繼續等待下一個內容。gpB28資訊網——每日最新資訊28at.com

Netty自帶了幾種拆包器:固定長度的拆包器 FixedLengthFrameDecoder、行拆包器 LineBasedFrameDecoder、分隔符拆包器 DelimiterBasedFrameDecoder、長度域拆包器LengthFieldBasedFrameDecoder。gpB28資訊網——每日最新資訊28at.com

一般在使用自定義協議時,會使用:長度域拆包器 LengthFieldBasedFrameDecoder。gpB28資訊網——每日最新資訊28at.com

(5) 空閑檢測和定時心跳gpB28資訊網——每日最新資訊28at.com

在服務端和客戶端的通信過程中,有時候會出現假死連接,或者長時間沒有消息傳遞需要釋放連接。對于這些連接,我們需要及時釋放,畢竟每條連接都占用著CPU和內存資源。大量這種連接如果不及時釋放,服務器資源遲早會耗盡,最終崩潰。gpB28資訊網——每日最新資訊28at.com

應對這種問題的解決方式是:Netty提供了IdleStateHandler做空閑檢測,用來檢測連接是否活躍,如果再指定的時間內,沒有活躍,那么就關閉連接。然后就是客戶端定時發送心跳請求,服務器響應心跳請求。gpB28資訊網——每日最新資訊28at.com

二、IM聊天簡易實現

介紹完Netty的核心概念,接下來以一個簡易的點對點IM聊天,將核心概念融入到案例中。IM聊天的核心模塊大致是如下幾個:gpB28資訊網——每日最新資訊28at.com

1、通信主體流程

通信主體流程就是搭建好:服務端、客戶端、兩端正常建立連接進行通信。gpB28資訊網——每日最新資訊28at.com

服務端代碼:gpB28資訊網——每日最新資訊28at.com

public static void main(String[] args) {    ServerBootstrap serverBootstrap = new ServerBootstrap();    NioEventLoopGroup boss = new NioEventLoopGroup();    NioEventLoopGroup worker = new NioEventLoopGroup();    serverBootstrap            .group(boss, worker)            .channel(NioServerSocketChannel.class)            .childHandler(new ChannelInitializer<NioSocketChannel>() {                protected void initChannel(NioSocketChannel ch) {                    ch.pipeline().addLast(new StringDecoder());                    ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {                        @Override                        protected void channelRead0(ChannelHandlerContext ctx, String msg) {                            System.out.println("server accept: " + msg);                        }                    });                }            });    serverBootstrap.bind(9000)            .addListener(future -> {                if (future.isSuccess()) {                    System.out.println("端口9000綁定成功");                } else {                    System.err.println("端口9000綁定失敗");                }            });}

客戶端代碼:gpB28資訊網——每日最新資訊28at.com

public static void main(String[] args) throws InterruptedException {    Bootstrap bootstrap = new Bootstrap();    NioEventLoopGroup group = new NioEventLoopGroup();    bootstrap.group(group)            .channel(NioSocketChannel.class)            .handler(new ChannelInitializer<Channel>() {                @Override                protected void initChannel(Channel ch) {                    ch.pipeline().addLast(new StringEncoder());                }            });    bootstrap.connect("127.0.0.1", 9000)            .addListener(future -> {                if (future.isSuccess()) {                    System.out.println("鏈接服務端成功");                    Channel channel = ((ChannelFuture) future).channel();                    channel.writeAndFlush("我是客戶端A");                } else {                    System.err.println("連接服務端失敗");                }            });}

2、數據包—包含通訊協議

定義數據包的抽象類,后續的各種類型的數據包都繼承此類。數據包中定義通訊協議的各種字段。gpB28資訊網——每日最新資訊28at.com

@Datapublic abstract class Packet {    /**     * 協議版本     */    private Byte version = 1;    /**     * 指令,此處有多種實現:比如登錄、登出、單聊、建群等等     *     * @return     */    public abstract Byte getCommand();    /**     * 獲取算法,默認使用JSON,如果使用其余算法,子類重寫此方法     *     * @return     */    public Byte getSerializeAlgorithm() {        return SerializerAlgorithm.JSON;    }}public class LoginRequestPacket extends Packet {    private String userName;    private String password;    @Override    public Byte getCommand() {        return Command.LOGIN_REQUEST;    }}

3、序列化器

定義序列化器,功能包括:序列化、反序列化。可以定義多種序列化算法,文中以JSON為例。gpB28資訊網——每日最新資訊28at.com

public interface Serializer {    /**     * 序列化算法     *     * @return     */    byte getSerializerAlgorithm();    /**     * java 對象轉換成二進制     */    byte[] serialize(Object object);    /**     * 二進制轉換成 java 對象     */    <T> T deserialize(Class<T> clazz, byte[] bytes);}public class JSONSerializer implements Serializer {    @Override    public byte getSerializerAlgorithm() {        return SerializerAlgorithm.JSON;    }    @Override    public byte[] serialize(Object object) {        return JSON.toJSONBytes(object);    }    @Override    public <T> T deserialize(Class<T> clazz, byte[] bytes) {        return JSON.parseObject(bytes, clazz);    }}

4、編解碼器

有了通訊協議、有了序列化協議,接下來就是對數據的編碼和解碼了。gpB28資訊網——每日最新資訊28at.com

public void encode(ByteBuf byteBuf, Packet packet) {    Serializer serializer = getSerializer(packet.getSerializeAlgorithm());    // 1. 序列化 java 對象    byte[] bytes = serializer.serialize(packet);    // 2. 實際編碼過程    byteBuf.writeInt(MAGIC_NUMBER);    byteBuf.writeByte(packet.getVersion());    byteBuf.writeByte(packet.getSerializeAlgorithm());    byteBuf.writeByte(packet.getCommand());    byteBuf.writeInt(bytes.length);    byteBuf.writeBytes(bytes);}public Packet decode(ByteBuf byteBuf) {    // 跳過 magic number    byteBuf.skipBytes(4);    // 跳過版本號    byteBuf.skipBytes(1);    // 讀取序列化算法    byte serializeAlgorithm = byteBuf.readByte();    // 讀取指令    byte command = byteBuf.readByte();    // 讀取數據包長度    int length = byteBuf.readInt();    // 讀取數據    byte[] bytes = new byte[length];    byteBuf.readBytes(bytes);    Class<? extends Packet> requestType = getRequestType(command);    Serializer serializer = getSerializer(serializeAlgorithm);    if (requestType != null && serializer != null) {        return serializer.deserialize(requestType, bytes);    }    return null;}

5、消息處理器Handler

以上把通訊的基本架子和收發消息的數據包、協議、編解碼器等基礎工具已經做完,接下來就是編寫Handler實現具體的業務邏輯了。gpB28資訊網——每日最新資訊28at.com

這里以客戶端發起登錄功能為例,分3步,消息收發也是類似:gpB28資訊網——每日最新資訊28at.com

  • 先在客戶端發送登錄請求數據包。
  • 服務端接收到登錄請求數據包后,在服務端的Handler里做業務邏輯處理,然后發送響應給客戶端。
  • 客戶端接收到登錄響應數據包后,在客戶端的Handler里做業務邏輯處理。

效果如下:gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

gpB28資訊網——每日最新資訊28at.com

核心代碼如下:gpB28資訊網——每日最新資訊28at.com

  • 客戶端發送請求
bootstrap.connect("127.0.0.1", 9000)                .addListener(future -> {                    if (future.isSuccess()) {                        System.out.println("連接服務端成功");                        Channel channel = ((ChannelFuture) future).channel();                        // 連接之后,假設再這里發起各種操作指令,采用異步線程開始發送各種指令,發送數據用到的的channel是必不可少的                        sendActionCommand(channel);                    } else {                        System.err.println("連接服務端失敗");                    }                });private static void sendActionCommand(Channel channel) {        // 直接采用控制臺輸入的方式,模擬操作指令        Scanner scanner = new Scanner(System.in);        LoginActionCommand loginActionCommand = new LoginActionCommand();        new Thread(() -> {            loginActionCommand.exec(scanner, channel);        }).start();    }
  • 服務端接受請求,并且處理
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {    LoginResponsePacket loginResponsePacket = new LoginResponsePacket();    loginResponsePacket.setVersion(loginRequestPacket.getVersion());    loginResponsePacket.setUserName(loginRequestPacket.getUserName());    if (valid(loginRequestPacket)) {        loginResponsePacket.setSuccess(true);        String userId = IDUtil.randomId();        loginResponsePacket.setUserId(userId);        System.out.println("[" + loginRequestPacket.getUserName() + "]登錄成功");        SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());    } else {        loginResponsePacket.setReason("校驗失敗");        loginResponsePacket.setSuccess(false);        System.out.println("登錄失敗!");    }    // 登錄響應    ctx.writeAndFlush(loginResponsePacket);}private boolean valid(LoginRequestPacket loginRequestPacket) {    System.out.println("服務端LoginRequestHandler,正在校驗客戶端登錄請求");    return true;}
  • 客戶端接受響應,并且處理
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {        String userId = loginResponsePacket.getUserId();        String userName = loginResponsePacket.getUserName();        if (loginResponsePacket.isSuccess()) {            System.out.println("[" + userName + "]登錄成功,userId為: " + loginResponsePacket.getUserId());            SessionUtil.bindSession(new Session(userId, userName), ctx.channel());        } else {            System.out.println("[" + userName + "]登錄失敗,原因為:" + loginResponsePacket.getReason());        }    }    @Override    public void channelInactive(ChannelHandlerContext ctx) {        System.out.println("客戶端連接被關閉!");    }}

6、空閑檢測和定時心跳

主流程和主要功能已經實現,還剩最后一個空閑檢測和定時心跳。gpB28資訊網——每日最新資訊28at.com

實現步驟:gpB28資訊網——每日最新資訊28at.com

  • 客戶端和服務端都先定義好空閑檢測。如果再規定的時間內沒有數據傳輸,則關閉通道。
  • 客戶端定時發送心跳
  • 服務端處理心跳請求,發送響應給客戶端

核心代碼:gpB28資訊網——每日最新資訊28at.com

  • 空閑檢測代碼:
/** * IM聊天空閑檢測器 * 比如:20秒內沒有數據,則關閉通道 */public class ImIdleStateHandler extends IdleStateHandler {    private static final int READER_IDLE_TIME = 20;    public ImIdleStateHandler() {        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);    }    @Override    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {        System.out.println(READER_IDLE_TIME + "秒內未讀到數據,關閉連接!");        ctx.channel().close();    }}
  • 客戶端定時心跳代碼:
public void channelActive(ChannelHandlerContext ctx) throws Exception {        scheduleSendHeartBeat(ctx);        super.channelActive(ctx);    }    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {        // 此處無需使用scheduleAtFixedRate,因為如果通道失效后,就無需在發起心跳了,按照目前的方式是最好的:成功一次安排一次        ctx.executor().schedule(() -> {            if (ctx.channel().isActive()) {                System.out.println("定時任務發送心跳!");                ctx.writeAndFlush(new HeartBeatRequestPacket());                scheduleSendHeartBeat(ctx);            }        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);    }
  • 服務端響應心跳代碼:
public class ImIdleStateHandler extends IdleStateHandler {    private static final int READER_IDLE_TIME = 20;    public ImIdleStateHandler() {        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);    }    @Override    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {        System.out.println(READER_IDLE_TIME + "秒內未讀到數據,關閉連接!");        ctx.channel().close();    }}

三、總結

本文介紹了Netty的核心概念,以及基本使用方法,希望能夠幫到你。本文核心詞:gpB28資訊網——每日最新資訊28at.com

  • 通信流程
  • Boss線程、Worker線程
  • 處理消息的Handler
  • 通訊協議、序列化協議、編解碼器
  • 空閑檢測、定時心跳

本文完整代碼:https://github.com/yclxiao/netty-demo.gitgpB28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-39513-0.htmlNetty入門實踐:模擬IM聊天

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Python中看似沒用的寫法,卻是老手都不一定會的原理。解決閉包延遲綁定

下一篇: Python入門必備:細講Python推導式

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 涞水县| 伊金霍洛旗| 宝坻区| 景洪市| 闻喜县| 宁德市| 和硕县| 高阳县| 宣恩县| 朝阳市| 伽师县| 旺苍县| 宜章县| 高阳县| 南康市| 达孜县| 通州市| 阳山县| 昆山市| 南平市| 甘孜| 舟山市| 长寿区| 隆尧县| 乌拉特后旗| 兴安县| 木兰县| 宣城市| 资溪县| 松溪县| 清河县| 金堂县| 安化县| 阿图什市| 安平县| 灵宝市| 广灵县| 民县| 乌恰县| 大田县| 枣庄市|