| | |
| | | package com.fzzy.mqtt; |
| | | |
| | | import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.integration.annotation.IntegrationComponentScan; |
| | | import org.springframework.integration.annotation.ServiceActivator; |
| | | import org.springframework.integration.channel.DirectChannel; |
| | | import org.springframework.integration.core.MessageProducer; |
| | | import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; |
| | | import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
| | | import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
| | | import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
| | | import org.springframework.messaging.Message; |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | import org.springframework.messaging.MessagingException; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | |
| | | @Configuration |
| | | @Slf4j |
| | | @IntegrationComponentScan |
| | | public class MqttConsumerConfig { |
| | | |
| | | @Autowired |
| | | private MqttProperties mqttProperties; |
| | | private OnReceiveMqttService onReceiveMqttService; |
| | | |
| | | @Autowired |
| | | private MqttConsumerCallBack mqttConsumerCallBack; |
| | | private MqttProperties mqttProperties; |
| | | |
| | | /** |
| | | * 客户端对象 |
| | | */ |
| | | private MqttClient client; |
| | | |
| | | /** |
| | | * 在bean初始化后连接到服务器 |
| | | */ |
| | | @PostConstruct |
| | | public void init() { |
| | | connect(); |
| | | @Bean |
| | | public MqttConnectOptions getReceiverMqttConnectOptionsForSub() { |
| | | MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
| | | mqttConnectOptions.setUserName(mqttProperties.getUsername()); |
| | | mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); |
| | | String[] serverURIs = new String[hostList.size()]; |
| | | hostList.toArray(serverURIs); |
| | | mqttConnectOptions.setServerURIs(serverURIs); |
| | | mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); |
| | | return mqttConnectOptions; |
| | | } |
| | | |
| | | /** |
| | | * 客户端连接服务端 |
| | | */ |
| | | public void connect() { |
| | | try { |
| | | //创建MQTT客户端对象 |
| | | client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); |
| | | //连接设置 |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 |
| | | //设置为true表示每次连接到服务端都是以新的身份 |
| | | options.setCleanSession(true); |
| | | //设置连接用户名 |
| | | options.setUserName(mqttProperties.getUsername()); |
| | | //设置连接密码 |
| | | options.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | //设置超时时间,单位为秒 |
| | | options.setConnectionTimeout(10); |
| | | //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 |
| | | options.setKeepAliveInterval(20); |
| | | //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 |
| | | options.setWill("willTopic", (mqttProperties.getClientInId() + "与服务器断开连接").getBytes(), 0, false); |
| | | //设置回调 |
| | | // client.setCallback(new MqttConsumerCallBack()); |
| | | client.setCallback(mqttConsumerCallBack); |
| | | client.connect(options); |
| | | //订阅主题 |
| | | //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 |
| | | int[] qos = {1, 1}; |
| | | //主题 |
| | | String[] topics = mqttProperties.getTopics().split(","); |
| | | //订阅主题 |
| | | client.subscribe(topics, qos); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | @Bean |
| | | public MqttPahoClientFactory receiverMqttClientFactoryForSub() { |
| | | DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
| | | factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); |
| | | return factory; |
| | | } |
| | | |
| | | /** |
| | | * 断开连接 |
| | | */ |
| | | public void disConnect() { |
| | | try { |
| | | client.disconnect(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | //接收通道 |
| | | @Bean |
| | | public MessageChannel mqttInputChannel() { |
| | | return new DirectChannel(); |
| | | } |
| | | |
| | | //配置client,监听的topic |
| | | @Bean |
| | | public MessageProducer inbound() { |
| | | |
| | | /** |
| | | * 订阅主题 |
| | | */ |
| | | public void subscribe(String topic, int qos) { |
| | | try { |
| | | client.subscribe(topic, qos); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | // List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); |
| | | String[] topics = mqttProperties.getTopics().split(","); |
| | | //topicList.toArray(topics); |
| | | |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); |
| | | adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(mqttProperties.getQos()); |
| | | adapter.setOutputChannel(mqttInputChannel()); |
| | | return adapter; |
| | | } |
| | | } |
| | | |
| | | //通过通道获取数据 |
| | | @Bean |
| | | @ServiceActivator(inputChannel = "mqttInputChannel") |
| | | public MessageHandler handler() { |
| | | return new MessageHandler() { |
| | | @Override |
| | | public void handleMessage(Message<?> message) throws MessagingException { |
| | | String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); |
| | | String msg = message.getPayload().toString(); |
| | | |
| | | // 这里可以处理接收的数据 |
| | | log.info("----------------------------收到订阅内容---------------------------"); |
| | | log.info("-----TOPIC-----{}", topic); |
| | | log.info("-----Message-----{}", msg); |
| | | |
| | | onReceiveMqttService.onReceiveMessage(topic, msg); |
| | | } |
| | | }; |
| | | } |
| | | } |