package com.fzzy.gateway.hx2023.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 使用KAFKA推动返回信息实现,针对当前网关 */ @Slf4j @Component public class KafkaDeviceReportService { @Resource private KafkaTemplate kafkaTemplate; public String publishWithTopic(String sendData, String topic) { //推送信息 kafkaTemplate.send(topic, sendData).addCallback(success -> { // 消息发送到的topic // String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); log.info("---推送至KAFKA成功--:{}-{}-{}-{}", topic, partition, offset, sendData); }, failure -> { log.info("---推送至KAFKA失败--:{}-{}-{}-{}", topic, sendData); rePublishWithTopic(sendData, topic); }); return "SUCCESS"; } private void rePublishWithTopic(String sendData, String topic) { //推送信息 kafkaTemplate.send(topic, sendData).addCallback(success -> { // 消息发送到的topic // String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); log.info("---推送至KAFKA成功--:{}-{}-{}-{}", topic, partition, offset, sendData); }, failure -> { log.info("---推送至KAFKA失败--:{}-{}-{}-{}", topic, sendData); }); } @KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"}) public void onMessage1(ConsumerRecord record) { // log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value()); } }