package com.fzzy.mqtt; import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; @Configuration @Slf4j @IntegrationComponentScan public class MqttConsumerConfig { @Autowired private OnReceiveMqttService onReceiveMqttService; @Autowired private MqttProperties mqttProperties; @Bean public MqttConnectOptions getReceiverMqttConnectOptionsForSub() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); List hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); return mqttConnectOptions; } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSub() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { // List topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = mqttProperties.getTopics().split(","); //topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttProperties.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); // 这里可以处理接收的数据 log.info("----------------------------收到订阅内容---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", msg); onReceiveMqttService.onReceiveMessage(topic, msg); } }; } }