package com.fzzy.mqtt;
|
|
import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.core.MessageProducer;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.messaging.Message;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessagingException;
|
|
import java.util.Arrays;
|
import java.util.List;
|
|
@Configuration
|
@Slf4j
|
@IntegrationComponentScan
|
public class MqttConsumerConfig {
|
|
@Autowired
|
private OnReceiveMqttService onReceiveMqttService;
|
|
@Autowired
|
private MqttProperties mqttProperties;
|
|
@Bean
|
public MqttConnectOptions getReceiverMqttConnectOptionsForSub() {
|
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
|
mqttConnectOptions.setUserName(mqttProperties.getUsername());
|
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
|
List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
|
String[] serverURIs = new String[hostList.size()];
|
hostList.toArray(serverURIs);
|
mqttConnectOptions.setServerURIs(serverURIs);
|
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
|
return mqttConnectOptions;
|
}
|
|
@Bean
|
public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub());
|
return factory;
|
}
|
|
//接收通道
|
@Bean
|
public MessageChannel mqttInputChannel() {
|
return new DirectChannel();
|
}
|
|
//配置client,监听的topic
|
@Bean
|
public MessageProducer inbound() {
|
|
// List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
|
String[] topics = mqttProperties.getTopics().split(",");
|
//topicList.toArray(topics);
|
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics);
|
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
adapter.setQos(mqttProperties.getQos());
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
//通过通道获取数据
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
public MessageHandler handler() {
|
return new MessageHandler() {
|
@Override
|
public void handleMessage(Message<?> message) throws MessagingException {
|
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
|
String msg = message.getPayload().toString();
|
onReceiveMqttService.onReceiveMessage(topic, msg);
|
}
|
};
|
}
|
}
|