Netty 拆包沾包问题解决方案详解

 更新时间:2022年11月25日 10:13:09   作者:鳄鱼儿  
这篇文章主要为大家介绍了Netty 拆包沾包问题解决方案示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

上一篇说到Springboot整合Netty,自定义协议实现,本文聊一些拆包/沾包问题。

拆包/沾包问题

TCP是面向字节流的协议,在发送方发送的若干包数据到接收方接收时,这些数据包可能会被粘成一个数据包,而从接收缓冲区看,后一包数据的头紧接着前一包数据的尾,这就形成沾包问题

但如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包问题,也就是将一个大的包拆分为多个小包进行发送,接收端接收到多个包才能组成一个完整数据。

为什么UDP没有粘包

粘包/拆包问题在数据链路层、网络层以及传输层都有可能发生。日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生粘包/拆包问题。

而TCP是面向字节流,没有边界,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,通过这个缓冲区来进行优化,例如缓冲区为1024个字节大小,如果一次发送数据量小于1024,则会合并多个数据作为一个数据包发送;如果一次发送数据量大于1024,则会将这个包拆分成多个数据包进行发送。上述两种情况也是沾包和拆包问题。

上图出现的四种情况包括:

  • 正常发送,两个包恰好满足TCP缓冲区的大小或达到TCP等待时长,分别发送两个包。
  • 沾包:D1、D2都过小,两者进行了沾包处理。
  • 拆包沾包:D2过大,进行了拆包处理,而拆出去的一部分D2_1又与D1进行粘包处理。
  • 沾包拆包:D1过大,进行了拆包处理,而拆出去的一部分D1_2又与D2进行粘包处理。

解决方案

对于粘包和拆包问题,通常可以使用这四种解决方案:

  • 使用固定数据长度进行发送,发送端将每个包都封装成固定的长度,比如100字节大小。如果不足100字节可通过补0等填充到指定长度再发送。
  • 发送端在每个包的末尾使用固定的分隔符,例如##@##。如果发生拆包需等待多个包发送过来之后再找到其中的##@##进行合并。如果发送沾包则找到其中的##@##进行拆分。
  • 将消息分为头部和消息体,头部中保存整个消息的长度,这种情况下接收端只有在读取到足够长度的消息之后,才算是接收到一个完整的消息。
  • 通过自定义协议进行粘包和拆包的处理。

Netty拆包沾包处理

Netty对解决粘包和拆包的方案做了抽象,提供了一些解码器(Decoder)来解决粘包和拆包的问题。如:

LineBasedFrameDecoder:以行为单位进行数据包的解码,使用换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息。

DelimiterBasedFrameDecoder:以特殊的符号作为分隔来进行数据包的解码。 FixedLengthFrameDecoder:以固定长度进行数据包的解码。

LenghtFieldBasedFrameDecode:适用于消息头包含消息长度的协议(最常用)。

基于Netty进行网络读写的程序,可以直接使用这些Decoder来完成数据包的解码。对于高并发、大流量的系统来说,每个数据包都不应该传输多余的数据(所以补齐的方式不可取),LenghtFieldBasedFrameDecode更适合这样的场景。

LineBasedFrameDecoder

使用LineBasedFrameDecoder解决粘包问题,其会根据"\n"或"\r\n"对二进制数据进行拆分,封装到不同的ByteBuf实例中

    /**
     * 服务启动器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的线程组
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())          
                // 通过换行符处理沾包/拆包
                .childHandler(new NettyServerLineBasedHandler());
        return serverBootstrap;
    }
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 使用LineBasedFrameDecoder解决粘包问题,其会根据"\n"或"\r\n"对二进制数据进行拆分,封装到不同的ByteBuf实例中,并且每次查找的最大长度为1024字节
        pipeline.addLast(new LineBasedFrameDecoder(1024, true, true));
        // 将上一步解码后的数据转码为Message实例
        pipeline.addLast(new MessageDecodeHandler());
        // 对发送客户端的数据进行编码
        pipeline.addLast(new MessageEncodeHandler());
        // 对数据进行最终处理
        pipeline.addLast(new ServerListenerHandler());
    }
}

DelimiterBasedFrameDecoder

以特殊的符号作为分隔来进行数据包的解码,上文中就是以##@##作为分割符作为示例展开讲解的。这里再粘贴一下关键代码: 使用DelimiterBasedFrameDecoder处理拆包/沾包,并且每次查找的最大长度为1024字节。

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 数据分割符
        String delimiterStr = "##@##";
        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 使用自定义分隔符处理拆包/沾包,并且每次查找的最大长度为1024字节
        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
        // 将上一步解码后的数据转码为Message实例
        pipeline.addLast(new MessageDecodeHandler());
        // 对发送客户端的数据进行编码,并添加数据分隔符
        pipeline.addLast(new MessageEncodeHandler(delimiterStr));
        // 对数据进行最终处理
        pipeline.addLast(new ServerListenerHandler());
    }

MessageEncodeHandler对发送数据进行添加分割符并编码操作

public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
    // 数据分割符
    String delimiter;
    public MessageEncodeHandler(String delimiter) {
        this.delimiter = delimiter;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
        out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
    }
}

FixedLengthFrameDecoder

服务端代码设置,在NettyConfig配置中将worker处理器改为NettyServerFixedLengthHandler,使用固定100字节长度处理消息。

    /**
     * 服务启动器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的线程组
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                // 指定为固定长度字节的处理器
                .childHandler(new NettyServerFixedLengthHandler());
        return serverBootstrap;
    }

NettyServerFixedLengthHandler类代码,使用FixedLengthFrameDecoder设置按固定100字节数去拆分接收到的ByteBuf。并自定义一个消息编码器,对字节长度不足100字节的消息进行补0操作。

public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 固定字节长度
        Integer length = 100;
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 按固定100字节数拆分接收到的ByteBuf的解码器
        pipeline.addLast(new FixedLengthFrameDecoder(length));
        // 将上一步解码后的数据转码为Message实例
        pipeline.addLast(new MessageDecodeHandler());
        // 对发送客户端的数据进行自定义编码,并设置字节长度不足补0
        pipeline.addLast(new MessageEncodeFixedLengthHandler(length));
        // 对数据进行最终处理
        pipeline.addLast(new ServerListenerHandler());
    }
}

自定义MessageEncodeFixedLengthHandler编码类,使用固定字节长度编码消息,字节长度不足时补0。

public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> {
    private int length;
    public MessageEncodeFixedLengthHandler(int length) {
        this.length = length;
    }
    /**
     * 使用固定字节长度编码消息,字节长度不足时补0
     *
     * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
     * @param msg the message to encode
     * @param out the {@link ByteBuf} into which the encoded message will be written
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        String jsonStr = msg.toJsonString();
        // 如果长度不足,则进行补0
        if (jsonStr.length() < length) {
            jsonStr = addSpace(jsonStr);
        }
        // 使用Unpooled.wrappedBuffer实现零拷贝,将字符串转为ByteBuf
        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes()));
    }
    /**
     * 如果没有达到指定长度进行补0
     *
     * @param msg
     * @return
     */
    private String addSpace(String msg) {
        StringBuilder builder = new StringBuilder(msg);
        for (int i = 0; i < length - msg.length(); i++) {
            builder.append(0);
        }
        return builder.toString();
    }
}

LenghtFieldBasedFrameDecode

LenghtFieldBasedFrameDecode适用于消息头包含消息长度的协议,根据消息长度判断是否读取完一个数据包。

    /**
     * 服务启动器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的线程组
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                // 请求头包含数据长度
                .childHandler(new NettyServerLenghtFieldBasedHandler());
        return serverBootstrap;
    }
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 请求头包含数据长度,根据长度进行沾包拆包处理
        /**
         * maxFrameLength:指定了每个包所能传递的最大数据包大小;
         * lengthFieldOffset:指定了长度字段在字节码中的偏移量;
         * lengthFieldLength:指定了长度字段所占用的字节长度;
         * lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
         * initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
         */
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
        // 在请求头添加字节长度字段
        pipeline.addLast(new LengthFieldPrepender(2));
        // 将上一步解码后的数据转码为Message实例
        pipeline.addLast(new MessageDecodeHandler());
        // 对发送客户端的数据进行编码,字节长度不足补0
        pipeline.addLast(new MessageEncodeHandler());
        // 对数据进行最终处理
        pipeline.addLast(new ServerListenerHandler());
    }
}

总结

造成TCP协议粘包/拆包问题的原因是TCP协议数据传输是基于字节流的,它不包含消息、数据包等概念,是无界的,需要应用层协议自己设计消息的边界,即消息帧(Message Framing)。如果应用层协议没有使用基于长度或者基于分隔符(终结符)划分边界等方式进行处理,则会导致多个消息的粘包和拆包。

以上就是Netty 拆包沾包问题解决方案示例的详细内容,更多关于Netty 拆包沾包解决方案的资料请关注脚本之家其它相关文章!

相关文章

  • Java用文件流下载网络文件示例代码

    Java用文件流下载网络文件示例代码

    这篇文章主要介绍了Java用文件流的方式下载网络文件,大家参考使用吧
    2013-11-11
  • SpringBoot2整合activiti6环境搭建过程解析

    SpringBoot2整合activiti6环境搭建过程解析

    这篇文章主要介绍了SpringBoot2整合activiti6环境搭建过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • spring boot入门开始你的第一个应用

    spring boot入门开始你的第一个应用

    这篇文章主要介绍了spring boot入门开始你的第一个应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下
    2019-06-06
  • 简单工厂模式_动力节点Java学院整理

    简单工厂模式_动力节点Java学院整理

    这篇文章主要介绍了简单工厂模式的相关资料,和大家一起学习静态工厂方法模式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • 详解Java线程池和Executor原理的分析

    详解Java线程池和Executor原理的分析

    这篇文章主要介绍了详解Java线程池和Executor原理的分析的相关资料,这里提供实例及分析原理帮助大家理解这部分知识,需要的朋友可以参考下
    2017-07-07
  • java学生信息管理系统MVC架构详解

    java学生信息管理系统MVC架构详解

    这篇文章主要为大家详细介绍了java学生信息管理系统MVC架构的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-11-11
  • Spring boot从安装到交互功能实现零基础全程详解

    Spring boot从安装到交互功能实现零基础全程详解

    这篇文章主要介绍了Spring boot从安装到交互功能得实现全程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Java状态机的一种优雅写法分享

    Java状态机的一种优雅写法分享

    状态机是一种数学模型,对于我们业务实现有很大的帮助。我们可以用非常多的方法实现状态机,这篇文章就来介绍一个状态机优雅的实现方法,希望对大家有所帮助
    2023-04-04
  • Java实现删除PDF中指定页面

    Java实现删除PDF中指定页面

    这篇文章主要为大家详细介绍了如何使用一个免费的国产Java库来删除PDF中的指定页面或者删除PDF中的空白页,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-11-11
  • java随机抽取指定范围内不重复的n个数

    java随机抽取指定范围内不重复的n个数

    这篇文章主要为大家详细介绍了java随机抽取指定范围内不重复的n个数,感兴趣的小伙伴们可以参考一下
    2016-02-02

最新评论