使用Netty快速实现一个群聊功能的示例详解
前言
通过之前的文章介绍,我们可以深刻认识到Netty在网络编程领域的卓越表现和强大实力。这篇文章将介绍如何利用 Netty 框架开发一个 WebSocket 服务端,从而实现一个简单的在线聊天功能。
声明
文章中所提供的代码仅供参考,旨在帮助无 Netty 经验的开发人员快速上手。请注意,这些代码并不适用于实际应用中。
功能说明
聊天页面:
- 用户进入页面后,会看到一个简单的文本框,可以用来发送消息。
- 页面下方会显示聊天的消息内容。
服务端主要有以下三个功能:
- 响应聊天页面:用来接收和响应聊天页面的请求。
- 处理消息:对接收到的消息进行处理。
- 实现群聊功能:提供群聊的功能,使多个用户能够在同一个聊天室中进行交流。
功能很简单,但是可以通过这个示例实现更多复杂的场景。
实现步骤
创建一个简单的 Maven 项目,直接引入 netty-all 包即可编码。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.28.Final</version> </dependency>
实现该功能共有五个类,如下:
├── MakeIndexPage.java
├── ProcessWsIndexPageHandler.java
├── ProcesssWsFrameHandler.java
├── WebSocketServer.java
└── WebSocketServerInitializer.java
下面对实现该功能所涉及的五个类的代码进行详细说明
WebSocket 服务启动
这个类是一个基于 Netty 启动的常规服务端。它包含了一些配置项,包括 Reactor 模式、IO 类型以及消息处理配置,大部分都是这样。代码如下:
/** * 类说明: */ public final class WebSocketServer { /*创建 DefaultChannelGroup,用来保存所 有已经连接的 WebSocket Channel,群发和一对一功能可以用上*/ private final static ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); static final boolean SSL = false;//是否启用ssl /*通过ssl访问端口为8443,否则为8080*/ static final int PORT = Integer.parseInt( System.getProperty("port", SSL? "8443" : "80")); public static void main(String[] args) throws Exception { /*SSL配置*/ final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer(sslCtx,channelGroup)); Channel ch = b.bind(PORT).sync().channel(); System.out.println("打开浏览器访问: " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
Channel 初始化
这个类的主要功能是创建了一个 ChannelInitializer,用于初始化 ChannelPipeline,并添加了一些通道处理器。这些处理器包括由Netty提供的处理SSL协议、处理HTTP协议和支持WebSocket协议的功能,还有一些由业务自定义的处理器,用于处理页面展示和处理WebSocket数据。代码如下:
/** * 类说明:增加handler */ public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private final ChannelGroup group; /*websocket访问路径*/ private static final String WEBSOCKET_PATH = "/chat"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx,ChannelGroup group) { this.sslCtx = sslCtx; this.group = group; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } /*增加对http的支持*/ pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); /*Netty提供,支持WebSocket应答数据压缩传输*/ pipeline.addLast(new WebSocketServerCompressionHandler()); /*Netty提供,对整个websocket的通信进行了初始化(发现http报文中有升级为websocket的请求) ,包括握手,以及以后的一些通信控制*/ pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); /*浏览器访问时展示index页面*/ pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH)); /*对websocket的数据进行处理*/ pipeline.addLast(new ProcesssWsFrameHandler(group)); } }
HTTP 请求处理
这个类的主要功能是在收到 HTTP 请求时,当 URI 为“/”或“/index.html”时,会返回一个聊天界面作为响应。代码如下:
/** * 类说明:对http请求,将index的页面返回给前端 */ public class ProcessWsIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String websocketPath; public ProcessWsIndexPageHandler(String websocketPath) { this.websocketPath = websocketPath; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 处理错误或者无法解析的http请求 if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } //只允许Get请求 if (req.method() != GET) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); return; } // 发送index页面的内容 if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) { //生成WebSocket的访问地址,写入index页面中 String webSocketLocation = getWebSocketLocation(ctx.pipeline(), req, websocketPath); System.out.println("WebSocketLocation:["+webSocketLocation+"]"); //生成index页面的具体内容,并送往浏览器 ByteBuf content = MakeIndexPage.getContent( webSocketLocation); FullHttpResponse res = new DefaultFullHttpResponse( HTTP_1_1, OK, content); res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); HttpUtil.setContentLength(res, content.readableBytes()); sendHttpResponse(ctx, req, res); } else { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } /*发送应答*/ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 错误的请求进行处理 (code<>200). if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpUtil.setContentLength(res, res.content().readableBytes()); } // 发送应答. ChannelFuture f = ctx.channel().writeAndFlush(res); //对于不是长连接或者错误的请求直接关闭连接 if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } /*根据用户的访问,告诉用户的浏览器,WebSocket的访问地址*/ private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { String protocol = "ws"; if (cp.get(SslHandler.class) != null) { protocol = "wss"; } return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path; } }
HTTP 页面内容
这个类的主要目的是生成一个包含消息发送框和内容展示功能的HTML页面,并实现WebSocket的相关功能,包括建立连接、向服务端发送消息以及接收服务端的响应。当然,也可以单独写一个HTML文件。代码如下:
/** * 类说明:生成index页面的内容 */ public final class MakeIndexPage { private static final String NEWLINE = "\r\n"; public static ByteBuf getContent(String webSocketLocation) { return Unpooled.copiedBuffer( "<html><head><title>Web Socket Test</title><meta charset=\"utf-8\" /></head>" + NEWLINE + "<body>" + NEWLINE + "<script type=\"text/javascript\">" + NEWLINE + "var socket;" + NEWLINE + "if (!window.WebSocket) {" + NEWLINE + " window.WebSocket = window.MozWebSocket;" + NEWLINE + '}' + NEWLINE + "if (window.WebSocket) {" + NEWLINE + " socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE + " socket.onmessage = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + '\\n' + event.data" + NEWLINE + " };" + NEWLINE + " socket.onopen = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = \"Web Socket opened!\";" + NEWLINE + " };" + NEWLINE + " socket.onclose = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE + " };" + NEWLINE + "} else {" + NEWLINE + " alert(\"Your browser does not support Web Socket.\");" + NEWLINE + '}' + NEWLINE + NEWLINE + "function send(message) {" + NEWLINE + " if (!window.WebSocket) { return; }" + NEWLINE + " if (socket.readyState == WebSocket.OPEN) {" + NEWLINE + " socket.send(message);" + NEWLINE + " } else {" + NEWLINE + " alert(\"The socket is not open.\");" + NEWLINE + " }" + NEWLINE + '}' + NEWLINE + "</script>" + NEWLINE + "<form onsubmit=\"return false;\">" + NEWLINE + "<input type=\"text\" name=\"message\" " + "value=\"Hi, 你好啊\"/>" + "<input type=\"button\" value=\"发送\"" + NEWLINE + " onclick=\"send(this.form.message.value)\" />" + NEWLINE + "<h3>消息内容</h3>" + NEWLINE + "<textarea id=\"responseText\" " + "style=\"width:500px;height:300px;\"></textarea>" + NEWLINE + "</form>" + NEWLINE + "</body>" + NEWLINE + "</html>" + NEWLINE, CharsetUtil.UTF_8); } }
WebSocket 请求处理
这个类的主要功能是处理与 Channel 相关的事件。例如,当一个 Channel 连接成功时,会将该 Channel 添加到一个 ChannelGroup 中。当接收到该 Channel 的数据时,可以通过向 ChannelGroup 写入数据来实现群聊效果。代码如下
/** * 类说明:对websocket的数据进行处理 */ public class ProcesssWsFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private final ChannelGroup group; public ProcesssWsFrameHandler(ChannelGroup group) { this.group = group; } private static final Logger logger = LoggerFactory.getLogger(ProcesssWsFrameHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { //判断是否为文本帧,目前只处理文本帧 if (frame instanceof TextWebSocketFrame) { // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); logger.info("{} received {}", ctx.channel(), request); // ctx.channel().writeAndFlush( // new TextWebSocketFrame(request.toUpperCase(Locale.CHINA))); /*群发实现:一对一道理一样*/ group.writeAndFlush(new TextWebSocketFrame( ctx.channel().remoteAddress() + " :" + request.toUpperCase(Locale.CHINA))); } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } } /*重写 userEventTriggered()方法以处理自定义事件*/ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { /*检测事件,如果是握手成功事件,做点业务处理*/ if (evt == WebSocketServerProtocolHandler .ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { //通知所有已经连接的 WebSocket 客户端新的客户端已经连接上了 group.writeAndFlush(new TextWebSocketFrame( "Client " + ctx.channel().remoteAddress() + " joined")); //将新的 WebSocket Channel 添加到 ChannelGroup 中, // 以便它可以接收到所有的消息 group.add(ctx.channel()); } else { super.userEventTriggered(ctx, evt); } } }
效果展示
服务端启动
聊天页面1
聊天页面2
总结
总的来说,基于 Netty 实现一个 WebSocket 功能是非常方便且高效的,但是我们需要知其所以然,要理解 Websocket 协议,也要懂的在 Netty 中,通过添加 ChannelHandler 来处理各种异常情况,例如握手失败、连接关闭等,当然,还要考虑安全性问题,例如处理跨站脚本攻击(XSS)、防止恶意数据传输等。
以上就是使用Netty快速实现一个群聊功能的示例详解的详细内容,更多关于Netty实现群聊的资料请关注脚本之家其它相关文章!
相关文章
关于SpringMVC中数据绑定@ModelAttribute注解的使用
这篇文章主要介绍了关于SpringMVC中数据绑定@ModelAttribute注解的使用,SpringMVC是一个基于Spring框架的Web框架,它提供了一种简单、灵活的方式来开发Web应用程序,在开发Web应用程序时,我们需要将用户提交的数据绑定到我们的Java对象上,需要的朋友可以参考下2023-07-07Java 中的 BufferedReader 介绍_动力节点Java学院整理
BufferedReader 是缓冲字符输入流。它继承于Reader。接下来通过本文给大家介绍BufferedReader的相关知识,需要的朋友参考下吧2017-05-05
最新评论