package com.fzzy.mqtt; import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttConsumerCallBack implements MqttCallback { @Autowired private OnReceiveMqttService onReceiveMqttService; /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { log.info("与服务器断开连接,可重连"); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String messageStr = new String(message.getPayload()); log.info(String.format("接收消息主题 : %s", topic)); log.info(String.format("接收消息Qos : %d", message.getQos())); log.info(String.format("接收消息内容 : %s", messageStr)); log.info(String.format("接收消息retained : %b", message.isRetained())); onReceiveMqttService.onReceiveMessage(messageStr); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info(String.format("接收消息成功")); } }