package com.fzzy.mqtt;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import javax.annotation.PostConstruct;
|
import java.util.Arrays;
|
import java.util.List;
|
|
@Configuration
|
@Slf4j
|
@IntegrationComponentScan
|
public class MqttProviderConfig {
|
|
@Autowired
|
private MqttProperties mqttProperties;
|
|
@Bean
|
public MqttConnectOptions getReceiverMqttConnectOptionsForSend() {
|
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
|
mqttConnectOptions.setUserName(mqttProperties.getUsername());
|
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
|
List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
|
String[] serverURIs = new String[hostList.size()];
|
hostList.toArray(serverURIs);
|
mqttConnectOptions.setServerURIs(serverURIs);
|
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
|
return mqttConnectOptions;
|
}
|
|
@Bean
|
public MqttPahoClientFactory receiverMqttClientFactoryForSend() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend());
|
return factory;
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
public MessageHandler mqttOutbound() {
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend());
|
messageHandler.setAsync(false);
|
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
|
return messageHandler;
|
}
|
|
@Bean
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
}
|