mqtt通讯修改 增加模板 计算属性

This commit is contained in:
LEED
2025-03-09 12:39:52 +08:00
parent 20c17b772d
commit 6a8cbff440
41 changed files with 2359 additions and 31 deletions

View File

@@ -105,6 +105,15 @@ public class AuthService {
log.warn("=>设备端认证失败");
throw new ServiceException("设备端认证失败:" + response.getBody());
}
}else if (clientId.startsWith("8")) {
// 设备端认证
ResponseEntity response = toolService.clientAuthNoClient(clientId, username, password);
if (response.getStatusCodeValue() == HttpStatus.OK.value()) {
return true;
} else {
log.warn("=>设备端认证失败");
throw new ServiceException("设备端认证失败:" + response.getBody());
}
} else {
Product product = productService.selectProductByProductId(147L);
if (!username.equals(product.getMqttAccount()) && !password.equals(product.getMqttPassword())) {

View File

@@ -9,6 +9,7 @@ import com.xinda.common.core.mq.DeviceStatusBo;
import com.xinda.common.core.redis.RedisCache;
import com.xinda.common.enums.DeviceStatus;
import com.xinda.common.enums.ServerType;
import com.xinda.common.enums.TopicType;
import com.xinda.common.utils.DateUtils;
import com.xinda.common.utils.StringUtils;
import com.xinda.common.utils.gateway.mq.TopicsUtils;
@@ -59,9 +60,9 @@ public class MqttPublish implements MqttHandler {
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
JSONObject jsonObject = JSON.parseObject((String) message.payload());
System.out.println(jsonObject);
// JSONObject jsonObject = JSON.parseObject((String) message.payload());
//
// System.out.println(jsonObject);
// 查看消息
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
@@ -114,9 +115,18 @@ public class MqttPublish implements MqttHandler {
/*获取topic*/
String topicName = message.variableHeader().topicName();
byte[] source = ByteBufUtil.getBytes(message.content());
DeviceReportBo reportBo = DeviceReportBo.builder()
DeviceReportBo reportBo;
if (topicName.startsWith(TopicType.PROPERTY_XINDA_DEV.getTopicSuffix())) {
reportBo = DeviceReportBo.builder()
.serialNumber(topicsUtils.parseNewSerialNumber(topicName)).topicName(topicName).packetId((long) message.variableHeader().packetId())
.platformDate(DateUtils.getNowDate()).data(source).serverType(ServerType.MQTT).build();
}else{
reportBo = DeviceReportBo.builder()
.serialNumber(topicsUtils.parseSerialNumber(topicName)).topicName(topicName).packetId((long) message.variableHeader().packetId())
.platformDate(DateUtils.getNowDate()).data(source).serverType(ServerType.MQTT).build();
}
if (topicName.endsWith(XinDaConstant.TOPIC.MSG_REPLY) ||
topicName.endsWith(XinDaConstant.TOPIC.UPGRADE_REPLY)) {
/*设备应答服务器回调数据*/
@@ -126,14 +136,20 @@ public class MqttPublish implements MqttHandler {
reportBo.setReportType(1);
}
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(topicsUtils.parseSerialNumber(topicName), 1, topicName, new String(source));
MsgContext context;
if (topicName.startsWith(TopicType.PROPERTY_XINDA_DEV.getTopicSuffix())) {
context = ruleProcess.processNoRuleScript(topicsUtils.parseNewSerialNumber(topicName), 1, topicName, new String(source));
}else{
context = ruleProcess.processRuleScript(topicsUtils.parseSerialNumber(topicName), 1, topicName, new String(source));
}
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
reportBo.setTopicName(context.getTopic());
reportBo.setData(context.getPayload().getBytes(StandardCharsets.UTF_8));
}
if (reportBo.getTopicName().contains("property")) {
if (reportBo.getTopicName().contains("property")||reportBo.getTopicName().contains("/json/dev/")) {
MessageProducer.sendPublishMsg(reportBo);
} else if (reportBo.getTopicName().contains("status")) {
String jsonString = new String(reportBo.getData(), StandardCharsets.UTF_8);

View File

@@ -54,6 +54,9 @@ public class MqttRemoteManager {
public void pushDeviceStatus(Long productId, String serialNumber, DeviceStatus status){
//兼容emqx推送TCP客户端上线
Device device = deviceService.selectDeviceNoModel(serialNumber);
if (device == null) {//设备不存在,添加设备
return;
}
String message = "{\"status\":" + status.getType() + ",\"isShadow\":" + device.getIsShadow() + ",\"rssi\":" + device.getRssi() + "}";
String topic = topicsUtils.buildTopic(device.getProductId(), serialNumber, TopicType.STATUS_POST);
if (enabled){

View File

@@ -6,6 +6,8 @@ import com.xinda.common.enums.DeviceStatus;
import com.xinda.common.exception.ServiceException;
import com.xinda.common.utils.StringUtils;
import com.xinda.common.utils.spring.SpringUtils;
import com.xinda.iot.domain.Device;
import com.xinda.iot.service.IDeviceService;
import com.xinda.mq.redischannel.consumer.DeviceStatusConsumer;
import com.xinda.mq.service.IMessagePublishService;
import com.xinda.base.service.ISessionStore;
@@ -34,6 +36,7 @@ public class SessionManger {
private static ISessionStore sessionStore = SpringUtils.getBean(ISessionStore.class);
private static MqttRemoteManager remoteManager = SpringUtils.getBean(MqttRemoteManager.class);
private static DeviceStatusConsumer statusConsumer = SpringUtils.getBean(DeviceStatusConsumer.class);
private static IDeviceService deviceService = SpringUtils.getBean(IDeviceService.class);
/**
* 获取当前的在线客户Map
@@ -116,6 +119,11 @@ public class SessionManger {
sessionStore.cleanSession(clientId);
session.setMqttMessageType(MqttMessageType.DISCONNECT);
//发送至MQ,设备下线
Device device = deviceService.selectDeviceBySerialNumber(session.getClientId());
//设备不存在,不做处理
if (device == null) {
return;
}
DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.OFFLINE, session.getIp());
if (!statusBo.getSerialNumber().startsWith(XinDaConstant.SERVER.WM_PREFIX) &&
!statusBo.getSerialNumber().startsWith(XinDaConstant.SERVER.WS_PREFIX)) {

View File

@@ -251,7 +251,7 @@ public class DeviceReportMessageServiceImpl implements IDeviceReportMessageServi
if (message.getServerType().equals(ServerType.MQTT)) {
//处理topic以prop结尾上报的数据 (属性)
if (message.getServerType().equals(ServerType.MQTT)) {
if (!topicName.endsWith(TopicType.PROPERTY_POST.getTopicSuffix())) {
if (!topicName.endsWith(TopicType.PROPERTY_POST.getTopicSuffix())&&!topicName.startsWith(TopicType.PROPERTY_XINDA_DEV.getTopicSuffix())) {
return;
}
}
@@ -266,7 +266,12 @@ public class DeviceReportMessageServiceImpl implements IDeviceReportMessageServi
report.setDeviceName(message.getDeviceName());
report.setSources(message.getSources());
//属性上报执行规则引擎
report.setRuleEngine(true);
if(topicName.startsWith(TopicType.PROPERTY_XINDA_DEV.getTopicSuffix())){
report.setRuleEngine(false);
}else{
report.setRuleEngine(true);
}
dataHandler.reportData(report);
}

View File

@@ -131,17 +131,23 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
FunctionLog funcLog = this.handleLog(bo, thingModels.getName());
ServerType serverType = ServerType.explain(transport);
//组建下发服务指令
FunctionCallBackBo backBo = buildMessage(bo);
//FunctionCallBackBo backBo = buildMessage(bo);
FunctionCallBackBo backBo = buildNewMessage(bo);
switch (serverType) {
case MQTT:
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(serialNumber, 2, backBo.getTopicName(), new String(backBo.getMessage()));
MsgContext context;
if (backBo.getTopicName().startsWith(TopicType.PROPERTY_XINDA_DEV.getTopicSuffix())) {
context = ruleProcess.processNoRuleScript(serialNumber, 2, backBo.getTopicName(), new String(backBo.getMessage()));
}else{
context = ruleProcess.processRuleScript(serialNumber, 2, backBo.getTopicName(), new String(backBo.getMessage()));
}
// MsgContext context = ruleProcess.processRuleScript(serialNumber, 2, backBo.getTopicName(), new String(backBo.getMessage()));
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
backBo.setTopicName(context.getTopic());
backBo.setMessage(context.getPayload().getBytes());
}
publishWithLog(backBo.getTopicName(), backBo.getMessage(), funcLog);
log.debug("=>服务下发,topic=[{}],指令=[{}]", backBo.getTopicName(), new String(backBo.getMessage()));
break;
@@ -290,6 +296,21 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
return callBackBo;
}
public FunctionCallBackBo buildNewMessage(MQSendMessageBo bo) {
String protocolCode = bo.getDp().getProtocolCode();
String serialNumber = bo.getSerialNumber();
/*组建Topic*/
String topic = topicsUtils.buildNewTopic(serialNumber, TopicType.FUNCTION_XINDA_DOWN);
bo.setTopicName(topic);
/*获取编码协议*/
IProtocol protocolInstance = protocolManagerService.getProtocolByProtocolCode(protocolCode);
//根据协议编码后数据
FunctionCallBackBo callBackBo = protocolInstance.encode(bo);
callBackBo.setSerialNumber(serialNumber);
callBackBo.setTopicName(topic);
return callBackBo;
}
/**
* 1.发布设备状态
*/