Nett中的心跳机制与断线重连详解
心跳机制
我们以客户端发送心跳为例,平时我们的心跳实现方式可能是搞个定时器,定时发送是吧,但是在Netty中却不一样,心跳被称为空闲检测,因为心跳的最主要作用就是判断是否存活、是否假死等情况,所以Netty中并不是定时发送,而是空闲的情况下才发送,空闲指的是一段时间内无读写事件的发生,也就是没有信息接收和信息发送啦,这时候我就要检测一下了
所以Netty的机制就是通过监测一段时间内是否有信息的读、写发生,然后产生一个事件,我们可以处理这个事件来达到我们的目的
IdleStateHandler
心跳与其他的数据处理Handler一样,都是建立连接后的数据处理,所以同样都是处理管道中的,IdleStateHandler就是Netty帮我们封装好了的处理类,我们可以直接使用并将它放入管道中,就可以自动检测一段时间内是否有读、写发生了,常用的构造有4个参数:
- readerIdleTime:读事件检测,该时间内没有信息读取就会产生一个读信息的事件
- writerIdleTime:写事件检测,该时间内没有信息发送就会产生一个写信息的事件
- allIdleTime:读写事件检测,该时间内没有信息发送或者读取就会产生一个读写信息的事件
- unit:时间单位
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); }
使用:
// 管道中直接添加 // 时间配置为0则代表不检测对应事件 channel.pipeline().addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS))
触发的事件类型:
- IdleState.READER_IDLE:读信息的事件
- IdleState.WRITER_IDLE:写信息的事件
- IdleState.ALL_IDLE:读写信息的事件
举个例子:假设我们是客户端主动发送心跳,那客户端就要检测一段时间内是否有写信息的操作,如果没有就会触发IdleState.WRITER_IDLE事件,那我们监听这个事件,就做出自己的处理,那就是主动发送心跳包嘛
下面我们就以客户端发送心跳包为例!
客户端
像上面那样直接使用IdleStateHandler,那还需要单独写一个事件处理的类,所以这里我们直接继承IdleStateHandler
ClientHeartbeatHandler 如下:
我们设置5秒,5秒内没有写过信息,我们就发送心跳包(这里消息很随意,图方便)
@Slf4j public class ClientHeartbeatHandler extends IdleStateHandler { // 设置写事件为5s // 如果5s没有写事件发生 就会触发下面的IdleStateEvent private static final int WRITE_IDLE_TIME = 5; public ClientHeartbeatHandler() { super(0, WRITE_IDLE_TIME, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { // 指定时间内没有写事件发送 就会触发 IdleState.WRITER_IDLE 类型事件 // 我们就可以对该连接进行处理 主动发送心跳 if(evt.state()== IdleState.WRITER_IDLE){ log.info("{} 秒内没有发送数据,再不发送小心和服务端断开连接", WRITE_IDLE_TIME); ctx.writeAndFlush(new NettyMsg(ServiceCodeEnum.TEST_TYPE,"我还活着,不要断开连接")); } } }
NettyClient
管道中加入这个即可,注意要放在编解码之后
服务端
基本跟客户端一样,但是由于是客户端给我发心跳,所以服务端要监测读事件
ServerHeartbeatHandler
这里我们设置10s,一般会更多因为要给客户端一些容忍度,适量就好,一旦没满足就和客户端断开连接
@Slf4j public class ServerHeartbeatHandler extends IdleStateHandler { // 设置读取事件为10s // 如果10s没有读事件发生 就会触发下面的IdleStateEvent private static final int READER_IDLE_TIME = 10; public ServerHeartbeatHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { // 指定时间内没有读事件发送 就会触发 IdleState.READER_IDLE 类型事件 // 我们就可以对该连接进行处理 这里是直接关闭 if(evt.state()== IdleState.READER_IDLE){ log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME); ctx.channel().close(); } } }
NettyServer
测试
正常情况
服务端
客户端
异常情况
我们将客户端心跳拉长
可以发现时间一到自动断连
总结
这里是以客户端发送心跳为例,如果是以服务端发送心跳也是一样,主要就是对读写事件的监听,然后做出相应处理,上面其实是很随意的哈,正常情况下心跳包是会用专门的消息类型的,而且服务端也需要做出应答的,心跳应答也会有对应的消息类型
断线重连
这个就很简单了,肯定是断线后,客户端发起重连嘛,就是在客户端检测到连接断开的时候,重新连接就好了
ClientHeartbeatHandler加上一段:
看看效果,客户端心跳时间还是12s哈
能这么随意吗?当然不行啊,别学我,正常会搞多次重连,而且谨慎起见,每次重连还可以加一个时间的间隔,可以学一下nacos里面的高级玩法,随着重连次数的增加,间隔重试的时间也随之增加,比如:
第一次重连是4s,第二次重连是8s,第三次是16s,以此类推
也就是设置重连间隔时间基数为4s,每尝试一次,左移一位(4<<1),连接成功后基数复位为4s
需要设置最大重试次数,最长间隔时间(一直左移后面会很长时间,所以要设置一个上限,超过了就以上限为准)
伪代码如下:
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info(ctx.channel().remoteAddress() + " 已断开连接,开始重连"); int maxRetryNum=6; //最大重连次数 int maxRetryTime=60; // 最大重连时间 int baseRetryTime=4; // 间隔时间基数 // 开始重连 for (int i = 1; i <= maxRetryNum; i++) { log.info("第{}次重连,间隔{}s后开始",i,baseRetryTime > maxRetryTime ? maxRetryTime:baseRetryTime); ctx.channel().eventLoop().schedule(()->{ // 重连操作 // ....... // 判断是否超过隔间上限 },baseRetryTime > maxRetryTime ? maxRetryTime:baseRetryTime,TimeUnit.SECONDS); // 左移扩大一倍 baseRetryTime = baseRetryTime << 1; } log.info("这么多次重连都不行寄了吧"); }
伪代码哈,伪代码!
结果如下:
建立连接后,手动关闭服务端,可以看到重连了6次,每次间隔时间翻倍,达到间隔时间上限后,就以上限的时间为准
到此这篇关于Nett中的心跳机制与断线重连详解的文章就介绍到这了,更多相关Nett心跳机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
最新评论