| | |
| | | String[] topics = mqttProperties.getTopics().split(","); |
| | | //topicList.toArray(topics); |
| | | |
| | | MqttPahoMessageDrivenChannelAdapter adapter = |
| | | new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); |
| | | MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); |
| | | adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); |
| | | adapter.setConverter(new DefaultPahoMessageConverter()); |
| | | adapter.setQos(mqttProperties.getQos()); |
| | |
| | | public void handleMessage(Message<?> message) throws MessagingException { |
| | | String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); |
| | | String msg = message.getPayload().toString(); |
| | | |
| | | // 这里可以处理接收的数据 |
| | | log.info("----------------------------收到订阅内容---------------------------"); |
| | | log.info("-----TOPIC-----{}", topic); |
| | | log.info("-----Message-----{}", msg); |
| | | |
| | | onReceiveMqttService.onReceiveMessage(topic, msg); |
| | | } |
| | | }; |