This commit is contained in:
LEED
2025-07-19 11:29:07 +08:00
parent 4c1521bda0
commit a9895296f0
45 changed files with 929 additions and 70 deletions

View File

@@ -0,0 +1,180 @@
//package com.xinda.mqtt.server;
//
////import com.xinda.server.Server;
////import com.xinda.mqtt.handler.adapter.MqttMessageAdapter;
////import io.netty.bootstrap.AbstractBootstrap;
////import io.netty.bootstrap.ServerBootstrap;
////import io.netty.channel.ChannelInitializer;
////import io.netty.channel.ChannelOption;
////import io.netty.channel.EventLoopGroup;
////import io.netty.channel.nio.NioEventLoopGroup;
////import io.netty.channel.socket.nio.NioServerSocketChannel;
////import io.netty.channel.socket.nio.NioSocketChannel;
////import io.netty.handler.codec.mqtt.MqttDecoder;
////import io.netty.handler.codec.mqtt.MqttEncoder;
////import io.netty.handler.logging.LogLevel;
////import io.netty.handler.logging.LoggingHandler;
////import io.netty.handler.timeout.IdleStateHandler;
////import io.netty.util.concurrent.DefaultThreadFactory;
////import lombok.extern.slf4j.Slf4j;
////import org.springframework.beans.factory.annotation.Autowired;
////import org.springframework.stereotype.Component;
////
////import java.util.concurrent.*;
////
////@Component
////@Slf4j
////public class MqttServer extends Server {
////
//// @Autowired
//// private MqttMessageAdapter messageAdapter;
////
//// private EventLoopGroup bossGroup;
//// private EventLoopGroup workerGroup;
////
//// @Override
//// protected AbstractBootstrap initialize() {
//// int bossThreads = 1;
//// // Increase worker threads based on CPU cores and expected load
//// int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
////
//// bossGroup = new NioEventLoopGroup(bossThreads, new DefaultThreadFactory("MQTT-Boss", Thread.MAX_PRIORITY));
//// workerGroup = new NioEventLoopGroup(workerThreads, new DefaultThreadFactory("MQTT-Worker", Thread.MAX_PRIORITY));
////
//// if (config.businessCore > 0) {
//// businessService = new ThreadPoolExecutor(
//// config.businessCore,
//// config.businessCore * 2, // Allow some extra threads for bursty workloads
//// 60L, TimeUnit.SECONDS, // Keep idle threads alive longer
//// new LinkedBlockingQueue<>(2048), // Increase queue length
//// new DefaultThreadFactory("MQTT-Business", true, Thread.NORM_PRIORITY),
//// new ThreadPoolExecutor.CallerRunsPolicy()); // Queue full policy
//// }
////
//// return new ServerBootstrap()
//// .group(bossGroup, workerGroup)
//// .channel(NioServerSocketChannel.class)
//// .handler(new LoggingHandler(LogLevel.INFO))
//// .option(ChannelOption.SO_BACKLOG, 2048) // Increase backlog
//// .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
//// .childOption(ChannelOption.TCP_NODELAY, true)
//// .childOption(ChannelOption.SO_SNDBUF, 2 * 1024 * 1024) // Increase send buffer size
//// .childOption(ChannelOption.SO_RCVBUF, 2 * 1024 * 1024) // Increase receive buffer size
//// .childHandler(new ChannelInitializer<NioSocketChannel>() {
//// @Override
//// protected void initChannel(NioSocketChannel channel) {
//// channel.pipeline()
//// .addFirst(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
//// .addLast(new MqttDecoder(2 * 1024 * 1024 * 2)) // Increase max frame length
//// .addLast(MqttEncoder.INSTANCE)
//// .addLast(messageAdapter);
//// }
//// });
//// }
//
//import com.xinda.mqtt.handler.adapter.MqttMessageAdapter;
//import com.xinda.server.Server;
//import io.netty.bootstrap.AbstractBootstrap;
//import io.netty.bootstrap.ServerBootstrap;
//import io.netty.channel.ChannelInitializer;
//import io.netty.channel.ChannelOption;
//import io.netty.channel.EventLoopGroup;
//import io.netty.channel.nio.NioEventLoopGroup;
//import io.netty.channel.socket.nio.NioServerSocketChannel;
//import io.netty.channel.socket.nio.NioSocketChannel;
//import io.netty.handler.codec.mqtt.MqttDecoder;
//import io.netty.handler.codec.mqtt.MqttEncoder;
//import io.netty.handler.logging.LogLevel;
//import io.netty.handler.logging.LoggingHandler;
//import io.netty.handler.timeout.IdleStateHandler;
//import io.netty.util.concurrent.DefaultThreadFactory;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import java.util.concurrent.LinkedBlockingQueue;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
//@Component
//@Slf4j
//public class MqttServer1 extends Server {
//
// @Autowired
// private MqttMessageAdapter messageAdapter;
//
// private EventLoopGroup bossGroup;
// private EventLoopGroup workerGroup;
//
//
// @Override
// protected AbstractBootstrap initialize() {
// int bossThreads = 1;
// int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
//
// bossGroup = new NioEventLoopGroup(bossThreads, new DefaultThreadFactory("MQTT-Boss", Thread.MAX_PRIORITY));
// workerGroup = new NioEventLoopGroup(workerThreads, new DefaultThreadFactory("MQTT-Worker", Thread.MAX_PRIORITY));
//
// if (config.businessCore > 0) {
// businessService = new ThreadPoolExecutor(
// config.businessCore,
// config.businessCore,
// 1L, TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(1024), // 增加队列长度
// new DefaultThreadFactory("MQTT-Business", true, Thread.NORM_PRIORITY),
// new ThreadPoolExecutor.CallerRunsPolicy()); // 队列满时由调用者处理
// }
//
// return new ServerBootstrap()
// .group(bossGroup, workerGroup)
// .channel(NioServerSocketChannel.class)
// .handler(new LoggingHandler(LogLevel.INFO))
// .option(ChannelOption.SO_BACKLOG, 1024)
// .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
// .childOption(ChannelOption.TCP_NODELAY, true)
// .childOption(ChannelOption.SO_SNDBUF, 1024 * 1024)
// .childOption(ChannelOption.SO_RCVBUF, 1024 * 1024)
// .childHandler(new ChannelInitializer<NioSocketChannel>() {
// @Override
// protected void initChannel(NioSocketChannel channel) {
// channel.pipeline()
// .addFirst(new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
// .addLast(new MqttDecoder(1024 * 1024 * 2))
// .addLast(MqttEncoder.INSTANCE)
// .addLast(messageAdapter); // 确保 messageAdapter 内部处理异步
// }
// });
// }
//
//
//
//// bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
//// workerGroup = new NioEventLoopGroup(config.workerCore, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
////
//// if (config.businessCore > 0) {
//// businessService = new ThreadPoolExecutor(config.businessCore, config.businessCore, 1L,
//// TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(config.name, true, Thread.NORM_PRIORITY));
//// }
//// return new ServerBootstrap()
//// .group(bossGroup, workerGroup)
//// .channel(NioServerSocketChannel.class)
//// .handler(new LoggingHandler(LogLevel.DEBUG))
//// .option(ChannelOption.SO_BACKLOG, 511)
//// .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
//// .childHandler(new ChannelInitializer<NioSocketChannel>() {
////
//// @Override
//// protected void initChannel(NioSocketChannel channel) {
//// //客户端心跳检测机制
//// channel.pipeline()
//// .addFirst(XinDaConstant.SERVER.IDLE
//// , new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
//// .addLast(XinDaConstant.SERVER.DECODER, new MqttDecoder(1024 * 1024 * 2))
//// .addLast(XinDaConstant.SERVER.ENCODER, MqttEncoder.INSTANCE)
//// .addLast(messageAdapter);
//// }
//// });
////
////
//// }
//}