package com.fzzy.mqtt;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.DirectChannel;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
/**
|
* MQTT生产端
|
*/
|
@Slf4j
|
@Configuration
|
@IntegrationComponentScan
|
public class MqttOutboundConfiguration {
|
@Autowired
|
private MqttProperties mqttProperties;
|
|
@Bean
|
public MessageChannel mqttOutboundChannel() {
|
return new DirectChannel();
|
}
|
|
@Bean
|
public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
String[] array = mqttProperties.getHost().split(",");
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setServerURIs(array);
|
|
if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" ");
|
if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" ");
|
options.setUserName(mqttProperties.getClientUsername());
|
options.setPassword(mqttProperties.getClientPassword().toCharArray());
|
// 接受离线消息
|
options.setCleanSession(false); //告诉代理客户端是否要建立持久会话 false为建立持久会话
|
options.setMqttVersion(4);
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
@Bean
|
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
public MessageHandler mqttOutbound() {
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend());
|
messageHandler.setAsync(true);
|
return messageHandler;
|
}
|
|
}
|