package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * MQTT生产端 */ @Slf4j @Configuration @IntegrationComponentScan public class MqttOutboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getHost().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" "); if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" "); options.setUserName(mqttProperties.getClientUsername()); options.setPassword(mqttProperties.getClientPassword().toCharArray()); // 接受离线消息 options.setCleanSession(false); //告诉代理客户端是否要建立持久会话 false为建立持久会话 options.setMqttVersion(4); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend()); messageHandler.setAsync(true); return messageHandler; } }