package com.fzzy.gateway.hx2023.kafka;
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.fzzy.gateway.hx2023.data.KafaGrainData;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
|
|
/**
|
* 使用KAFKA推动返回信息实现,针对当前网关
|
*/
|
@Slf4j
|
@Component
|
public class KafkaDeviceReport {
|
|
|
@Resource
|
private KafkaTemplate<String, Object> kafkaTemplate;
|
|
/**
|
* 推动粮情到云端系统
|
*
|
* @param data
|
* @return
|
*/
|
public String sendGrainData2Cloud(KafaGrainData data) {
|
|
|
String strData = JSONObject.toJSONString(data);
|
//log.debug("----推送粮情信息到云端---{}", strData);
|
|
//推送信息
|
kafkaTemplate.send("TOPIC_ZLJ_GRAIN_TEMPERATURE", strData).addCallback(success -> {
|
// 消息发送到的topic
|
String topic = success.getRecordMetadata().topic();
|
// 消息发送到的分区
|
int partition = success.getRecordMetadata().partition();
|
// 消息在分区内的offset
|
long offset = success.getRecordMetadata().offset();
|
|
log.info("---粮情-->>云端成功:{}-{}-{}-{}", topic, partition, offset, strData);
|
}, failure -> {
|
log.info("---粮情-->>云端失败:{}-{}", data.getDeviceID(), failure.getMessage());
|
});
|
|
return "SUCCESS";
|
|
}
|
|
@KafkaListener(topics = {"TOPIC_ZLJ_GRAIN_TEMPERATURE"})
|
public void onMessage1(ConsumerRecord<?, ?> record) {
|
// log.debug("获取消费信息:" + record.topic() + "-" + record.partition() + "-" + record.value());
|
}
|
|
}
|