package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Description : MQTT接受服务的客户端 */ @Slf4j @Component public class MqttAcceptClient { @Autowired private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客户端连接 */ public void connect() { MqttClient client; try { client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getClientUsername()); options.setPassword(mqttProperties.getClientPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getClientTimeout()); options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); // 设置回调 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { log.error("MqttAcceptClient connect error,message:{}", e.getMessage()); e.printStackTrace(); } } /** * 重新连接 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { log.error("MqttAcceptClient reconnection error,message:{}", e.getMessage()); e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */ public void subscribe(String topic, int qos) { log.info("========================【开始订阅主题:" + topic + "】========================"); try { client.subscribe(topic, qos); } catch (MqttException e) { log.error("MqttAcceptClient subscribe error,message:{}", e.getMessage()); e.printStackTrace(); } } /** * 取消订阅某个主题 * * @param topic */ public void unsubscribe(String topic) { log.info("========================【取消订阅主题:" + topic + "】========================"); try { client.unsubscribe(topic); } catch (MqttException e) { log.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage()); e.printStackTrace(); } } }