package com.fzzy.gateway.hx2023.kafka;
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.fzzy.gateway.hx2023.data.GrainData;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
|
|
/**
|
* 使用KAFKA推动返回信息实现,针对当前网关
|
*/
|
@Slf4j
|
@Component
|
public class KafkaDeviceReport {
|
|
|
@Resource
|
private KafkaTemplate<String, Object> kafkaTemplate;
|
|
/**
|
* 推动粮情到云端系统
|
*
|
* @param data
|
* @return
|
*/
|
public String sendGrainData2Cloud(GrainData data) {
|
|
|
log.debug("----推送粮情信息到云端---{}",data);
|
|
//推送信息
|
kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE",JSONObject.toJSONString(data)).addCallback(success -> {
|
// 消息发送到的topic
|
String topic = success.getRecordMetadata().topic();
|
// 消息发送到的分区
|
int partition = success.getRecordMetadata().partition();
|
// 消息在分区内的offset
|
long offset = success.getRecordMetadata().offset();
|
|
log.info("粮情推送成功:" + topic + "-" + partition + "-" + offset);
|
}, failure -> {
|
log.error("粮情推送失败:" + failure.getMessage());
|
});
|
|
|
return "SUCCESS";
|
|
}
|
|
@KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"})
|
public void onMessage1(ConsumerRecord<?, ?> record) {
|
// log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value());
|
}
|
|
}
|