package com.fzzy.mqtt;
|
|
import com.fzzy.gateway.hx2023.websocket.SocketService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
import org.springframework.messaging.Message;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessagingException;
|
|
/**
|
* MQTT消费端
|
*/
|
@Slf4j
|
@Configuration
|
@IntegrationComponentScan
|
public class MqttInboundConfiguration {
|
|
@Autowired
|
private MqttProperties mqttProperties;
|
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
String[] array = mqttProperties.getHost().split(",");
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(array);
|
options.setUserName(mqttProperties.getClientUsername());
|
options.setPassword(mqttProperties.getClientPassword().toCharArray());
|
options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
|
//接受离线消息
|
options.setCleanSession(false);
|
options.setMqttVersion(4);
|
|
factory.setConnectionOptions(options);
|
|
return factory;
|
}
|
|
//配置client,监听的topic
|
@Bean
|
public MessageProducer inbound() {
|
String[] inboundTopics = mqttProperties.getClientTopics().split(",");
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //对inboundTopics主题进行监听
|
adapter.setCompletionTimeout(mqttProperties.getClientTimeout());
|
adapter.setQos(1);
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
//通过通道获取数据
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
|
public MessageHandler handler() {
|
return new MessageHandler() {
|
@Override
|
public void handleMessage(Message<?> message) throws MessagingException {
|
log.info("----------------------");
|
//获取mqtt的topic
|
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
|
//使用webSocket返回给前端
|
SocketService socketService = new SocketService();
|
socketService.onMessage(message.getPayload().toString(), null, topic);
|
|
log.info("message:" + message.getPayload());
|
log.info("PacketId:" + message.getHeaders().getId());
|
log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
|
log.info("topic:" + topic);
|
}
|
};
|
}
|
}
|