| | |
| | | package com.fzzy.gateway.hx2023.kafka; |
| | | |
| | | import com.alibaba.fastjson2.JSONObject; |
| | | import com.fzzy.gateway.hx2023.data.GrainData; |
| | | import com.fzzy.gateway.hx2023.data.KafaGrainData; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | |
| | | * @param data |
| | | * @return |
| | | */ |
| | | public String sendGrainData2Cloud(GrainData data) { |
| | | |
| | | |
| | | log.debug("----推送粮情信息到云端---{}",data); |
| | | public String sendGrainData2Cloud(KafaGrainData data) { |
| | | |
| | | |
| | | String strData = JSONObject.toJSONString(data); |
| | | log.debug("----推送粮情信息到云端---{}", strData); |
| | | |
| | | //推送信息 |
| | | kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE",JSONObject.toJSONString(data)).addCallback(success -> { |
| | | kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE", strData).addCallback(success -> { |
| | | // 消息发送到的topic |
| | | String topic = success.getRecordMetadata().topic(); |
| | | // 消息发送到的分区 |
| | |
| | | // 消息在分区内的offset |
| | | long offset = success.getRecordMetadata().offset(); |
| | | |
| | | log.info("粮情推送成功:" + topic + "-" + partition + "-" + offset); |
| | | log.info("粮情推送成功:{}-{}-{}-{}", topic, partition, offset, data.getDeviceID()); |
| | | }, failure -> { |
| | | log.error("粮情推送失败:" + failure.getMessage()); |
| | | log.info("粮情推送失败:{}-{}", data.getDeviceID(), failure.getMessage()); |
| | | }); |
| | | |
| | | |
| | | return "SUCCESS"; |
| | | |
| | |
| | | |
| | | @KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"}) |
| | | public void onMessage1(ConsumerRecord<?, ?> record) { |
| | | // log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value()); |
| | | // log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value()); |
| | | } |
| | | |
| | | } |