使用java项目搭建一个netty服务

 更新时间:2024年10月30日 11:50:04   作者:傀儡师  
这篇文章主要为大家详细介绍了如何使用java项目搭建一个netty服务,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

映入依赖,只要保证有这个依赖,就不需要单独引入依赖,支持多个端口直连,支持多个实现层解析数据,

  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
  <version>3.3.4</version>

yml配置

# TCP设备对接
iot:
  device:
    port1: 1883
    port2: 1885
package com.cqcloud.platform.handler;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.service.impl.IotNbIotServiceImpl;
import com.cqcloud.platform.service.impl.IotPushServiceImpl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
/**
 * @author weimeilayer@gmail.com ✨
 * @date 💓💕 2022年3月8日🐬🐇 💓💕
 */
@Component
public class NettyTcpServer {
	/**
	 * 用于自设备1协议端口
	 */
    private static int PORT1;
    /**
     * 来自设备2协议端口
     */
    private static int PORT2;

    @Value("${iot.device.port1}")
    public int port1Value;

    @Value("${iot.device.port2}")
    public int port2Value;
    
    @PostConstruct
    public void init() {
        PORT1 = port1Value;
        PORT2 = port2Value;
    }

	public void start() throws Exception {
		final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
	    final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
	    try {
	        ServerBootstrap bootstrap = new ServerBootstrap();
	        // 创建 MqttService 和 MqttPushService 实例
	        IotNbIotMqttService iotNbIotMqttService = new IotNbIotServiceImpl();
	        IotPushService iotPushService = new IotPushServiceImpl();

	        bootstrap.group(bossGroup, workerGroup)
	            .channel(NioServerSocketChannel.class)
	            .childHandler(new ChannelInitializer<SocketChannel>() {
	                @Override
	                protected void initChannel(SocketChannel ch) {
	                    ChannelPipeline pipeline = ch.pipeline();
	                    // 直接使用 ByteBuf,无需编码器和解码器
	                    // 根据端口注入不同的服务
	                    if (ch.localAddress().getPort() == PORT1) {
	                        pipeline.addLast(new TcpIotNbServerHandler(iotNbIotMqttService)); // 业务逻辑处理器
	                    } else if (ch.localAddress().getPort() == PORT2) {
	                        pipeline.addLast(new TcpIotServerHandler(iotPushService)); // 新处理器
	                    }
	                }
	            });

	        // 绑定第一个端口并启动
	        ChannelFuture future1 = bootstrap.bind(PORT1).sync();
	        // 绑定第二个端口并启动
	        ChannelFuture future2 = bootstrap.bind(PORT2).sync();

	        // 等待服务器关闭
	        future1.channel().closeFuture().sync();
	        future2.channel().closeFuture().sync();
	    } finally {
	        // 优雅地关闭线程池
	        workerGroup.shutdownGracefully();
	        bossGroup.shutdownGracefully();
	    }
	}
}

启动类需要

public static void main(String[] args) throws IOException {
		ConfigurableEnvironment env = new SpringApplication(DynamicYearningApplication.class).run(args).getEnvironment();
		String envPort = env.getProperty("server.port");
		String port = Objects.isNull(envPort) ? "8000" : envPort;
		String envContext = env.getProperty("server.servlet.context-path");
		String contextPath = Objects.isNull(envContext) ? "" : envContext;
		String path = port + contextPath + "/doc.html";
		String externalAPI = InetAddress.getLocalHost().getHostAddress();
		Console.log("Access URLs:\n\t-------------------------------------------------------------------------\n\tLocal-swagger: \t\thttp://127.0.0.1:{}\n\tExternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalAPI, path);
		// 加上以下代码
		NettyTcpServer server = new NettyTcpServer();
		try {
			server.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

创建TcpIotServerHandler

package com.cqcloud.platform.handler;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.cqcloud.platform.entity.IotCommandRecords;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * 设备协议
 * @author weimeilayer@gmail.com ✨
 * @date 💓💕 2022年3月8日 🐬🐇 💓💕
 */
@Slf4j
public class TcpIotServerHandler extends SimpleChannelInboundHandler<ByteBuf>  {

	// 接口注入
	private final IotPushService iotPushService;

	public TcpIotServerHandler(IotPushService iotPushService) {
		this.iotPushService = iotPushService;
	}
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
		byte[] byteArray;
		if (in.readableBytes() <= 0) {
			in.release();
			return;
		}
		byteArray = new byte[in.readableBytes()];
		in.readBytes(byteArray);
		if (byteArray.length <= 0) {
			in.release();
			return;
		}
		// 将消息传递给 iotPushService
		iotPushService.pushMessageArrived(byteArray);
	}
	// 发送响应的统一辅助方法
	private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {
		byte[] responseBytes = hexStringToByteArray(hexResponse);
		ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
		ctx.writeAndFlush(responseBuffer);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		// 打印异常堆栈跟踪,便于调试和错误排查
		cause.printStackTrace();
		// 关闭当前的通道,释放相关资源
		ctx.close();
	}
}

创建 TcpIotNbServerHandler

package com.cqcloud.platform.handler;

import com.cqcloud.platform.service.IotNbIotMqttService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * NB-IOT CAT1数据格协议
 * 
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public class TcpIotNbServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
	private final IotNbIotMqttService iotNbIotMqttService;

	// 构造函数注入 MqttService
	public TcpIotNbServerHandler(IotNbIotMqttService iotNbIotMqttService) {
		this.iotNbIotMqttService = iotNbIotMqttService;
	}

	@Override
	public void channelRead0(ChannelHandlerContext ctx,ByteBuf in) {
		byte[] byteArray;
		if (in.readableBytes() <= 0) {
			in.release();
			return;
		}
		byteArray = new byte[in.readableBytes()];
		in.readBytes(byteArray);
		if (byteArray.length <= 0) {
			in.release();
			return;
		}
	    // 将 byte[] 数据传递给 iotNbIotMqttService
	    iotNbIotMqttService.messageArrived(byteArray); 
		//发送固定事件默认回复
        sendResponse(ctx);
	}
	
	// 发送响应的统一辅助方法
    private void sendResponse(ChannelHandlerContext ctx) {
    	// 回复客户端--向设备回复AAAA8001(设备将保持20秒不休眠),平台尽量在10秒
    	byte[] responseBytes = new byte[] { (byte) 0xAA, (byte) 0xAA, (byte) 0x80, (byte) 0x01 };
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
        ctx.writeAndFlush(responseBuffer);
    }
    
    //将响应消息转换为字节数组
    public static byte[] hexStringToByteArray(String s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
                    + Character.digit(s.charAt(i + 1), 16));
        }
        return data;
    }
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

创建接口类IotPushService

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public interface IotPushService {
	public void pushMessageArrived(byte[] message);
}

创建IotNbIotMqttService 类

package com.cqcloud.platform.service;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
public interface IotNbIotMqttService {

	public void messageArrived(byte[] message);
}

创建实现类IotNbIotServiceImpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.Service;

import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.utils.DataParser;

import lombok.AllArgsConstructor;

/**
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
@Service
@AllArgsConstructor
public class IotNbIotServiceImpl implements IotNbIotMqttService {

	@Override
	public void messageArrived(byte[] message) {
		// 将 byte 数组转换为十六进制字符串  
		String convertData = printByteArray(message);
		// 打印字节数组内容
		System.out.println("来自于xxx数据格式协议的1883端口的数据字节数组内容:"+ convertData);
        //调用解析方法
        dispatchMessage(convertData);
	}
	
	// 将 byte[] 转换为十六进制字符串的辅助方法  
    public static String bytesToHex(byte[] bytes) {  
        StringBuilder hex = new StringBuilder();  
        for (byte b : bytes) {  
            // 将每个字节转换为两位的十六进制表示  
            hex.append(String.format("%02X", b));  
        }  
        return hex.toString();  
    }  

	public static String printByteArray(byte[] byteArray) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : byteArray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexString.append(String.format("%02X", b & 0xFF));
		}
		System.out.println("Byte Array (Hex): " + hexString.toString());
		return hexString.toString();
	}
	
	public void dispatchMessage(String byteArray) {
        String prefix = byteArray.substring(0, 2);  
        // 根据 messageID 进行判断
          System.out.println("来自于数据格式协议来自于1883端口的数据处理消息:" +byteArray);
	}
}

创建 IotPushServiceImpl

package com.cqcloud.platform.service.impl;

import org.springframework.stereotype.Service;

import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;

import lombok.AllArgsConstructor;

/**
 * 发送指令实现类
 * @author weimeilayer@gmail.com
 * @date 💓💕2022年3月8日🐬🐇💓💕
 */
@Service
@AllArgsConstructor
public class IotPushServiceImpl implements IotPushService {

	@Override
	public void pushMessageArrived(byte[] message) {
		// 解析字节数组
		System.out.println("来自物联网平台的设备协议于1885端口的数据设备返回的的内容处理");
		//打印数据
		printByteArray(message);
		//调用解析方法
		dispatchMessage(message);
	}

	//设备回复的接受内容
	public static void dispatchMessage(byte[] byteArray) {
       
	}
	
	public static void printByteArray(byte[] byteArray) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : byteArray) {
			// 将字节转换为无符号的十六进制字符串,去掉空格
			hexString.append(String.format("%02X", b & 0xFF));
		}
		System.out.println("Byte Array (Hex): " + hexString.toString());
	}

	 // 将十六进制字符串转换为字节数组的实用方法
    public static byte[] stringToBytes(String s) {
        int len = s.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
                                  + Character.digit(s.charAt(i+1), 16));
        }
        return data;
    }
	// 提取设备类型的十六进制字符串
    private static String extractDeviceTypeHex(byte[] byteArray) {
        // 转换为十六进制字符串
        String hexString = bytesToHex(byteArray);
        // 提取设备类型
        return hexString.substring(10, 12); // 设备类型的位数
    }

    // 辅助方法:将字节数组转换为十六进制字符串
    private static String bytesToHex(byte[] bytes) {
        StringBuilder hexString = new StringBuilder();
        for (byte b : bytes) {
            String hex = Integer.toHexString(0xFF & b);
            if (hex.length() == 1) {
                hexString.append('0'); // 确保每个字节都为两位
            }
            hexString.append(hex);
        }
        return hexString.toString().toUpperCase(); // 返回大写格式
    }

    // 将十六进制字符串转换为 byte
    private static byte hexStringToByte(String hex) {
        return (byte) Integer.parseInt(hex, 16);
    }
}

然后使用网络根据助手请求。

以上就是使用java项目搭建一个netty服务的详细内容,更多关于java搭建netty服务的资料请关注脚本之家其它相关文章!

相关文章

  • 基于Elasticsearch5.4的常见问题总结

    基于Elasticsearch5.4的常见问题总结

    下面小编就为大家分享一篇基于Elasticsearch5.4的常见问题总结,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • Java多线程死锁与资源限制操作

    Java多线程死锁与资源限制操作

    这篇文章主要介绍了Java多线程死锁与资源限制操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • 详解Spring依赖注入的三种方式以及优缺点

    详解Spring依赖注入的三种方式以及优缺点

    IoC 和 DI 是 Spring 中最重要的两个概念,其中 IoC(Inversion of Control)为控制反转的思想,而 DI(Dependency Injection)依赖注入为其(IoC)具体实现。那么 DI 实现依赖注入的方式有几种?这些注入方式又有什么不同?本文就来和大家一起详细聊聊
    2022-08-08
  • Java实现控制台输出两点间距离

    Java实现控制台输出两点间距离

    这篇文章主要介绍了Java实现控制台输出两点间距离,涉及了部分编程坐标的问题,具有一定参考价值,需要的朋友可以了解下
    2017-09-09
  • Java中的接口以及常见的Cloneable接口用法

    Java中的接口以及常见的Cloneable接口用法

    这篇文章主要介绍了Java中的接口以及常见的Cloneable接口用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • java.util.NoSuchElementException原因及两种解决方法

    java.util.NoSuchElementException原因及两种解决方法

    本文主要介绍了java.util.NoSuchElementException原因及两种解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • Event Sourcing事件溯源模式优化业务系统

    Event Sourcing事件溯源模式优化业务系统

    这篇文章主要为大家介绍了Event Sourcing事件溯源模式优化业务系统示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • java-servlet-转发AND路径(详解)

    java-servlet-转发AND路径(详解)

    下面小编就为大家带来一篇java-servlet-转发AND路径(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-10-10
  • SpringBoot集成slf4j日志配置的方法

    SpringBoot集成slf4j日志配置的方法

    本文主要介绍了SpringBoot集成slf4j日志配置的方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • 解决IDEA光标变成白色粗条的问题

    解决IDEA光标变成白色粗条的问题

    这篇文章主要介绍了解决IDEA光标变成白色粗条的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02

最新评论