深入理解Netty核心类及其作用
MessageToByteEncoder
MessageToByteEncoder是一个抽象编码器,子类可重写encode方法把对象编码为ByteBuf输出。
MessageToByteEncoder继承自ChannelOutboundHandlerAdapter,encode在出站是被调用。
public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> { @Override protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder.encode,被调用"); String json = JSONObject.toJSONString(msg); out.writeInt(json.getBytes(StandardCharsets.UTF_8).length); out.writeBytes(json.getBytes(StandardCharsets.UTF_8)); } }
ByteToMessageDecoder
ByteToMessageDecoder是一种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成一种Message,Message是应用可以自己定义的一种Java对象。
ByteToMessageDecoder:用于将字节转为消息,需要检测缓冲区是否有足够的字节。
public class MyMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被调用"); while (in.readableBytes() >= 4){ int num = in.readInt(); System.out.println("解码出一个整数:"+num); out.add(num); } } }
ReplayingDecoder
ReplayingDecoder:继承自ByteToMessageDecoder,不需要检测缓冲区是否有足够的字节,但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。
项目复杂度高用ReplayingDecoder,否则使用ByteToMessageDecoder。
public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被调用"); int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); String json = new String(content,StandardCharsets.UTF_8); MessagePO po = JSONObject.parseObject(json,MessagePO.class); out.add(po); } }
MessageToMessageEncoder
用于从一种消息编码为另外一种消息,例如从POJO到POJO,是一种ChannelOutboundHandler
MessageToMessageDecoder
从一种消息解码为另一种消息,例如POJO到POJO,是一种ChannelInboundHandler
MessageToMessageCodec
整合了MessageToMessageEncoder 和 MessageToMessageDecoder
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被调用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被调用 " + msg); RequestData po = JSONObject.parseObject(msg, RequestData.class); out.add(po); } }
ChannelInitializer
ChannelInitializer是一种特殊的ChannelInboundHandler,可以通过一种简单的方式(调用initChannel方法)来初始化Channel。
通常在Bootstrap.handler(ChannelHandler)
, ServerBootstrap.handler(ChannelHandler)
和 ServerBootstrap.childHandler(ChannelHandler)
中给Channel设置ChannelPipeline。
注意:当initChannel被执行完后,会将当前的handler从Pipeline中移除。
Bootstrap bootstrap = new Bootstrap().group(group)//设置线程组 .channel(NioSocketChannel.class)//设置客户端通道的实现类 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler());//加入自己的处理器 } });
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接的个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态 // .handler(null)//该Handler对应bossGroup .childHandler(new ChannelInitializer<SocketChannel>() {//给workerGroup的EventLoop对应的管道设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } });
SimpleChannelInboundHandler
SimpleChannelInboundHandler继承自ChannelInboundHandlerAdapter,可以通过泛型来规定消息类型。
处理入站的数据我们只需要实现channelRead0方法。
SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源,ChannelInboundHandlerAdapter不会自动释放。
public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception { System.out.println("收到服务端消息:" + msg); } }
DefaultEventLoopGroup
在向pipline中添加ChannelHandler时,可以提供一个新的线程组,Handler业务会在该线程中执行。
当加ChannelHandler需要执行多线程并发业务时,DefaultEventLoopGroup可以派上大用处。
如果没有设置DefaultEventLoopGroup,默认使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100); ... addLast(businessGroup, new MyNettyServerHandler())
/** * 读取客户端发送过来的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到客户信息:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); System.out.println("处理线程:" + Thread.currentThread().getName()); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任务执行完成1"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任务执行完成2"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任务执行完成3"); } catch (InterruptedException e) { e.printStackTrace(); } }); }
以上代码执行日志如下:
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60345
处理线程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序继续~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任务执行完成1
parent任务执行完成3
parent任务执行完成2
EventLoop定时任务
可以在Handler中通过方法ctx.channel().eventLoop().schedule()
添加定时任务
ctx.channel().eventLoop().schedule(()->{ try { System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("定时任务执行完成"); } catch (InterruptedException e) { e.printStackTrace(); } },10L,TimeUnit.SECONDS);
到此这篇关于深入理解Netty核心类及其作用的文章就介绍到这了,更多相关Netty核心类内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
在IDEA中配置tomcat并创建tomcat项目的图文教程
这篇文章主要介绍了在IDEA中配置tomcat并创建tomcat项目的图文教程,需要的朋友可以参考下2020-07-07SpringBoot如何设置404、500返回统一格式json
这篇文章主要介绍了SpringBoot如何设置404、500返回统一格式json问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-08-08springboot @Configuration和@Componment的区别及说明
这篇文章主要介绍了springboot @Configuration和@Componment的区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-06-06
最新评论