修改 前端菜单 管理菜单,角色前后端分配 及通道创建失败的问题

This commit is contained in:
LEED
2025-07-07 22:26:26 +08:00
parent b2cee8aeb7
commit ac40c5e5ec
29 changed files with 899 additions and 81 deletions

View File

@@ -0,0 +1,137 @@
//package com.xinda.mqtt.server;
//
//
//import com.xinda.common.constant.XinDaConstant;
//import com.xinda.mqtt.handler.adapter.MqttMessageAdapter;
//import com.xinda.server.Server;
//import groovy.util.logging.Slf4j;
//import io.netty.bootstrap.AbstractBootstrap;
//import io.netty.bootstrap.ServerBootstrap;
//import io.netty.buffer.PooledByteBufAllocator;
//import io.netty.channel.ChannelInitializer;
//import io.netty.channel.ChannelOption;
//import io.netty.channel.ServerChannel;
//import io.netty.channel.epoll.Epoll;
//import io.netty.channel.epoll.EpollEventLoopGroup;
//import io.netty.channel.epoll.EpollServerSocketChannel;
//import io.netty.channel.nio.NioEventLoopGroup;
//import io.netty.channel.socket.nio.NioServerSocketChannel;
//import io.netty.channel.socket.nio.NioSocketChannel;
//import io.netty.handler.codec.mqtt.MqttDecoder;
//import io.netty.handler.codec.mqtt.MqttEncoder;
//import io.netty.handler.logging.LogLevel;
//import io.netty.handler.logging.LoggingHandler;
//import io.netty.handler.timeout.IdleStateHandler;
//import io.netty.util.concurrent.DefaultThreadFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
///**
// * MQTT Broker 服务端,基于 Netty 实现。
// * 负责接收客户端连接、处理 MQTT 协议相关的消息。
// * 通过优化 Netty 配置提升高并发连接和消息处理性能。
// *
// * 主要优化点:
// * 1. 增加 worker 线程数,提升并发处理能力。
// * 2. 增大 backlog防止高并发连接时丢失请求。
// * 3. 启用 TCP 参数(如 TCP_NODELAY、SO_REUSEADDR降低延迟。
// * 4. 增大收发缓冲区,提升吞吐量。
// * 5. 使用 PooledByteBufAllocator 提升内存分配效率。
// * 6. 业务线程池支持突发流量。
// */
//@Component
//@Slf4j
//public class MqttServer2 extends Server {
// /**
// * MQTT 消息适配器,负责处理所有 MQTT 协议相关的消息。
// */
// @Autowired
// private MqttMessageAdapter messageAdapter;
//
// /**
// * 初始化 Netty 服务端,设置各项性能参数。
// *
// * @return Netty ServerBootstrap 实例
// */
// @Override
// protected AbstractBootstrap initialize() {
// // boss 线程负责接收连接,通常设置为 1 即可
// int bossThreads = 1;
// // worker 线程负责处理 IO根据 CPU 核心数设置,提升并发能力
// int workerThreads = Math.max(4, Runtime.getRuntime().availableProcessors() * 4);
// Class<? extends ServerChannel> channelClass;
// long startInit = System.currentTimeMillis();
// // Epoll 优先(仅限 Linux否则 NIO
// if (Epoll.isAvailable()) {
//// log.info("[性能优化] 使用 EpollEventLoopGroup");
// bossGroup = new EpollEventLoopGroup(bossThreads, new DefaultThreadFactory(config.name + "-Boss", Thread.MAX_PRIORITY));
// workerGroup = new EpollEventLoopGroup(workerThreads, new DefaultThreadFactory(config.name + "-Worker", Thread.MAX_PRIORITY));
// channelClass = EpollServerSocketChannel.class;
// } else {
//// log.info("[性能优化] 使用 NioEventLoopGroup");
// bossGroup = new NioEventLoopGroup(bossThreads, new DefaultThreadFactory(config.name + "-Boss", Thread.MAX_PRIORITY));
// workerGroup = new NioEventLoopGroup(workerThreads, new DefaultThreadFactory(config.name + "-Worker", Thread.MAX_PRIORITY));
// channelClass = NioServerSocketChannel.class;
// }
//
// // 业务线程池,处理耗时操作,避免阻塞 Netty IO 线程
// if (config.businessCore > 0) {
// int core = Math.max(8, config.businessCore * 2);
// int max = Math.max(16, config.businessCore * 4);
// businessService = new ThreadPoolExecutor(
// core,
// max, // 允许突发时扩展
// 120L, TimeUnit.SECONDS, // 空闲线程存活时间
// new LinkedBlockingQueue<>(4096), // 增加队列长度,防止高并发丢任务
// new DefaultThreadFactory(config.name + "-Business", true, Thread.NORM_PRIORITY),
// new ThreadPoolExecutor.CallerRunsPolicy()); // 队列满时由调用者处理,防止任务丢失
//// log.info("[性能优化] 业务线程池 core={} max={} queue=4096", core, max);
// }
// long endInit = System.currentTimeMillis();
//// log.info("[性能日志] Netty 服务端初始化耗时: {} ms", (endInit - startInit));
// return new ServerBootstrap()
// // 绑定 boss 和 worker 线程组
// .group(bossGroup, workerGroup)
// // 指定服务端通道类型
// .channel(channelClass)
// // 设置 Netty 日志级别
// .handler(new LoggingHandler(LogLevel.INFO))
// // 设置服务端 accept 队列长度,提升高并发连接能力
// .option(ChannelOption.SO_BACKLOG, 4096)
// // 启用端口复用,提升端口重启和高并发下的可用性
// .option(ChannelOption.SO_REUSEADDR, true)
// // 启用 TCP 保活
// .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
// // 禁用 Nagle 算法,降低延迟
// .childOption(ChannelOption.TCP_NODELAY, true)
// // 增大发送缓冲区
// .childOption(ChannelOption.SO_SNDBUF, 4 * 1024 * 1024)
// // 增大接收缓冲区
// .childOption(ChannelOption.SO_RCVBUF, 4 * 1024 * 1024)
// // 使用池化内存分配器,提升内存分配效率
// .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// // 初始化每个新连接的处理 pipeline
// .childHandler(new ChannelInitializer<NioSocketChannel>() {
// @Override
// protected void initChannel(NioSocketChannel channel) {
// long startConn = System.nanoTime();
// // 客户端心跳检测机制,防止空闲连接占用资源
// channel.pipeline()
// .addFirst(XinDaConstant.SERVER.IDLE
// , new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
// // MQTT 协议解码器,支持最大 2MB 报文
// .addLast(XinDaConstant.SERVER.DECODER, new MqttDecoder(1024 * 1024 * 2))
// // MQTT 协议编码器
// .addLast(XinDaConstant.SERVER.ENCODER, MqttEncoder.INSTANCE)
// // 业务消息适配器,处理所有 MQTT 消息
// .addLast(messageAdapter);
// long endConn = System.nanoTime();
//// log.debug("[性能日志] 新连接 pipeline 初始化耗时: {} μs", (endConn - startConn) / 1000);
// }
// });
// }
//}

View File

@@ -0,0 +1,71 @@
package com.xinda.mqtt.server;
import com.xinda.common.constant.XinDaConstant;
import com.xinda.mqtt.handler.adapter.MqttMessageAdapter;
import com.xinda.server.Server;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MqttServer3 extends Server {
@Autowired
private MqttMessageAdapter messageAdapter;
@Override
protected AbstractBootstrap initialize() {
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
workerGroup = new NioEventLoopGroup(config.workerCore, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
if (config.businessCore > 0) {
businessService = new ThreadPoolExecutor(config.businessCore, config.businessCore, 1L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(config.name, true, Thread.NORM_PRIORITY));
}
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// 设置服务端 accept 队列长度,提升高并发连接能力
.option(ChannelOption.SO_BACKLOG, 2048)
// 启用端口复用,提升端口重启和高并发下的可用性
.option(ChannelOption.SO_REUSEADDR, true)
// 禁用 Nagle 算法,降低延迟
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) {
//客户端心跳检测机制
channel.pipeline()
.addFirst(XinDaConstant.SERVER.IDLE
, new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
.addLast(XinDaConstant.SERVER.DECODER, new MqttDecoder(1024 * 1024 * 2))
.addLast(XinDaConstant.SERVER.ENCODER, MqttEncoder.INSTANCE)
.addLast(messageAdapter);
}
});
}
}