package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Description : MQTT接受服务的回调类 */ @Slf4j @Component public class MqttAcceptCallback implements MqttCallbackExtended { @Autowired private MqttAcceptClient mqttAcceptClient; @Autowired private MqttProperties mqttProperties; /** * 客户端断开后触发 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("连接断开,可以重连"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info("【emqx重新连接】...................................................."); mqttAcceptClient.reconnection(); } } /** * 客户端收到消息触发 * * @param topic 主题 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("【接收消息主题】:" + topic); log.info("【接收消息Qos】:" + mqttMessage.getQos()); log.info("【接收消息内容】:" + new String(mqttMessage.getPayload())); // int i = 1/0; } /** * 发布消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("向主题【" + topic + "】发送消息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("【消息内容】:" + s); } catch (Exception e) { log.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage()); e.printStackTrace(); } } /** * 连接emq服务器后触发 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!============================="); // 以/#结尾表示订阅所有以test开头的主题 // 订阅所有机构主题 mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0); } }