package com.fzzy.mqtt; import com.fzzy.gateway.hx2023.websocket.SocketService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MQTT消费端 */ @Slf4j @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSub() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getHost().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getClientUsername()); options.setPassword(mqttProperties.getClientPassword().toCharArray()); options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); //接受离线消息 options.setCleanSession(false); options.setMqttVersion(4); factory.setConnectionOptions(options); return factory; } //配置client,监听的topic @Bean public MessageProducer inbound() { String[] inboundTopics = mqttProperties.getClientTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //对inboundTopics主题进行监听 adapter.setCompletionTimeout(mqttProperties.getClientTimeout()); adapter.setQos(1); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") //异步处理 public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { log.info("----------------------"); //获取mqtt的topic String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); //使用webSocket返回给前端 SocketService socketService = new SocketService(); socketService.onMessage(message.getPayload().toString(), null, topic); log.info("message:" + message.getPayload()); log.info("PacketId:" + message.getHeaders().getId()); log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS)); log.info("topic:" + topic); } }; } }