package com.fzzy.mqtt;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
/**
|
* @Description : MQTT接受服务的客户端
|
*/
|
@Slf4j
|
@Component
|
public class MqttAcceptClient {
|
|
@Autowired
|
private MqttAcceptCallback mqttAcceptCallback;
|
|
@Autowired
|
private MqttProperties mqttProperties;
|
|
public static MqttClient client;
|
|
private static MqttClient getClient() {
|
return client;
|
}
|
|
private static void setClient(MqttClient client) {
|
MqttAcceptClient.client = client;
|
}
|
|
|
/**
|
* 客户端连接
|
*/
|
public void connect() {
|
MqttClient client;
|
try {
|
client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(),
|
new MemoryPersistence());
|
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setUserName(mqttProperties.getClientUsername());
|
options.setPassword(mqttProperties.getClientPassword().toCharArray());
|
options.setConnectionTimeout(mqttProperties.getClientTimeout());
|
options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
|
options.setAutomaticReconnect(mqttProperties.getReconnect());
|
options.setCleanSession(mqttProperties.getCleanSession());
|
MqttAcceptClient.setClient(client);
|
// 设置回调
|
client.setCallback(mqttAcceptCallback);
|
client.connect(options);
|
} catch (Exception e) {
|
log.error("MqttAcceptClient connect error,message:{}", e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 重新连接
|
*/
|
public void reconnection() {
|
try {
|
client.connect();
|
} catch (MqttException e) {
|
log.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 订阅某个主题
|
*
|
* @param topic 主题
|
* @param qos 连接方式
|
*/
|
public void subscribe(String topic, int qos) {
|
log.info("========================【开始订阅主题:" + topic + "】========================");
|
try {
|
client.subscribe(topic, qos);
|
} catch (MqttException e) {
|
log.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 取消订阅某个主题
|
*
|
* @param topic
|
*/
|
public void unsubscribe(String topic) {
|
log.info("========================【取消订阅主题:" + topic + "】========================");
|
try {
|
client.unsubscribe(topic);
|
} catch (MqttException e) {
|
log.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
}
|