¶Ô±ÈÐÂÎļþ |
| | |
| | | package com.fzzy.mqtt; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Configuration; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | |
| | | @Configuration |
| | | @Slf4j |
| | | public class MqttConsumerConfig { |
| | | |
| | | @Autowired |
| | | private MqttProperties mqttProperties; |
| | | @Autowired |
| | | private MqttConsumerCallBack mqttConsumerCallBack; |
| | | |
| | | /** |
| | | * 客æ·ç«¯å¯¹è±¡ |
| | | */ |
| | | private MqttClient client; |
| | | |
| | | /** |
| | | * å¨beanåå§ååè¿æ¥å°æå¡å¨ |
| | | */ |
| | | @PostConstruct |
| | | public void init() { |
| | | connect(); |
| | | } |
| | | |
| | | /** |
| | | * 客æ·ç«¯è¿æ¥æå¡ç«¯ |
| | | */ |
| | | public void connect() { |
| | | try { |
| | | //å建MQTT客æ·ç«¯å¯¹è±¡ |
| | | client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); |
| | | //è¿æ¥è®¾ç½® |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | //æ¯å¦æ¸
空sessionï¼è®¾ç½®ä¸ºfalse表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿æ¥è®°å½ï¼å®¢æ·ç«¯éè¿ä¹åè½è·åå°æå¡å¨å¨å®¢æ·ç«¯æå¼è¿æ¥æé´æ¨éçæ¶æ¯ |
| | | //设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥å°æå¡ç«¯é½æ¯ä»¥æ°ç身份 |
| | | options.setCleanSession(true); |
| | | //è®¾ç½®è¿æ¥ç¨æ·å |
| | | options.setUserName(mqttProperties.getUsername()); |
| | | //è®¾ç½®è¿æ¥å¯ç |
| | | options.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | //设置è¶
æ¶æ¶é´ï¼åä½ä¸ºç§ |
| | | options.setConnectionTimeout(10); |
| | | //设置å¿è·³æ¶é´ åä½ä¸ºç§ï¼è¡¨ç¤ºæå¡å¨æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯åéå¿è·³å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ |
| | | options.setKeepAliveInterval(20); |
| | | //设置é屿¶æ¯çè¯é¢ï¼è¥å®¢æ·ç«¯åæå¡å¨ä¹é´çè¿æ¥æå¤æå¼ï¼æå¡å¨å°åå¸å®¢æ·ç«¯çéå±ä¿¡æ¯ |
| | | options.setWill("willTopic", (mqttProperties.getClientInId() + "䏿å¡å¨æå¼è¿æ¥").getBytes(), 0, false); |
| | | //设置åè° |
| | | // client.setCallback(new MqttConsumerCallBack()); |
| | | client.setCallback(mqttConsumerCallBack); |
| | | client.connect(options); |
| | | //订é
ä¸»é¢ |
| | | //æ¶æ¯ç级ï¼å䏻颿°ç»ä¸ä¸å¯¹åºï¼æå¡ç«¯å°æç
§æå®ç级ç»è®¢é
äºä¸»é¢ç客æ·ç«¯æ¨éæ¶æ¯ |
| | | int[] qos = {1, 1}; |
| | | //ä¸»é¢ |
| | | String[] topics = mqttProperties.getTopics().split(","); |
| | | //订é
ä¸»é¢ |
| | | client.subscribe(topics, qos); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æå¼è¿æ¥ |
| | | */ |
| | | public void disConnect() { |
| | | try { |
| | | client.disconnect(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 订é
ä¸»é¢ |
| | | */ |
| | | public void subscribe(String topic, int qos) { |
| | | try { |
| | | client.subscribe(topic, qos); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |