package com.fzzy.gateway.hx2023.kafka;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
|
|
/**
|
* 使用KAFKA推动返回信息实现,针对当前网关
|
*/
|
@Slf4j
|
@Component
|
public class KafkaDeviceReportService {
|
|
|
@Resource
|
private KafkaTemplate<String, Object> kafkaTemplate;
|
|
|
public String publishWithTopic(String sendData, String topic) {
|
//推送信息
|
kafkaTemplate.send(topic, sendData).addCallback(success -> {
|
// 消息发送到的topic
|
// String topic = success.getRecordMetadata().topic();
|
// 消息发送到的分区
|
int partition = success.getRecordMetadata().partition();
|
// 消息在分区内的offset
|
long offset = success.getRecordMetadata().offset();
|
|
log.info("---推送至KAFKA成功--:{}-{}-{}-{}", topic, partition, offset, sendData);
|
}, failure -> {
|
log.info("---推送至KAFKA失败--:{}-{}-{}-{}", topic, sendData);
|
rePublishWithTopic(sendData, topic);
|
});
|
return "SUCCESS";
|
}
|
|
private void rePublishWithTopic(String sendData, String topic) {
|
//推送信息
|
kafkaTemplate.send(topic, sendData).addCallback(success -> {
|
// 消息发送到的topic
|
// String topic = success.getRecordMetadata().topic();
|
// 消息发送到的分区
|
int partition = success.getRecordMetadata().partition();
|
// 消息在分区内的offset
|
long offset = success.getRecordMetadata().offset();
|
|
log.info("---推送至KAFKA成功--:{}-{}-{}-{}", topic, partition, offset, sendData);
|
}, failure -> {
|
log.info("---推送至KAFKA失败--:{}-{}-{}-{}", topic, sendData);
|
});
|
}
|
|
|
@KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"})
|
public void onMessage1(ConsumerRecord<?, ?> record) {
|
// log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value());
|
}
|
|
}
|