jiazx0107@163.com
2023-10-31 e5e40869b53a0e8c9c4e86d3f488dceb08b3b845
src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java
@@ -31,12 +31,13 @@
     * @return
     */
    public String sendGrainData2Cloud(GrainData data) {
       log.debug("----推送粮情信息到云端---{}",data);
        String strData = JSONObject.toJSONString(data);
        log.debug("----推送粮情信息到云端---{}", strData);
        //推送信息
        kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE",JSONObject.toJSONString(data)).addCallback(success -> {
        kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE", strData).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
@@ -44,11 +45,10 @@
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            log.info("粮情推送成功:" + topic + "-" + partition + "-" + offset);
            log.info("粮情推送成功:{}-{}-{}-{}", topic, partition, offset, data.getDeviceID());
        }, failure -> {
            log.error("粮情推送失败:" + failure.getMessage());
            log.info("粮情推送失败:{}-{}", data.getDeviceID(), failure.getMessage());
        });
        return "SUCCESS";
@@ -56,7 +56,7 @@
    @KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"})
    public void onMessage1(ConsumerRecord<?, ?> record) {
      //  log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value());
        //  log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }
}