From de3d8ad9d76a6130fa3751897ea3dcc6429cd7ed Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期二, 31 十月 2023 19:02:02 +0800
Subject: [PATCH] 四川省网关接口相关13

---
 src/main/resources/application-devGateway.yml                      |   10 +
 src/main/java/com/fzzy/conf/KafkaConfig.java                       |   73 ----------
 src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java |    1 
 src/main/java/com/fzzy/conf/KafkaConsumerConfig.java               |   98 ++++++++++++++
 src/main/java/com/fzzy/conf/KafkaConfig2.java                      |  104 ++++++++++++++
 src/main/java/com/fzzy/conf/KafkaProviderConfig.java               |   95 +++++++++++++
 6 files changed, 307 insertions(+), 74 deletions(-)

diff --git a/src/main/java/com/fzzy/conf/KafkaConfig.java b/src/main/java/com/fzzy/conf/KafkaConfig.java
index 7f4c881..34b3a25 100644
--- a/src/main/java/com/fzzy/conf/KafkaConfig.java
+++ b/src/main/java/com/fzzy/conf/KafkaConfig.java
@@ -1,82 +1,9 @@
 package com.fzzy.conf;
 
-import lombok.AllArgsConstructor;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
 
 @Configuration
 @EnableKafka
 public class KafkaConfig {
-
-    private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";";
-
-
-    @Value("${spring.kafka.bootstrap-servers}")
-    private String bootstrapServers;
-
-    @Value("${spring.kafka.properties.security.protocol}")
-    private String securityProtocol;
-
-    @Value("${spring.kafka.properties.sasl.mechanism}")
-    private String saslMechanism;
-
-    @Value("${spring.kafka.properties.sasl.jaas.config}")
-    private String saslJaasConfig;
-
-    @Value("${spring.kafka.properties.sasl.username}")
-    private String saslUsername;
-
-    @Value("${spring.kafka.properties.sasl.password}")
-    private String saslPassword;
-
-
-    @Bean
-    public ProducerFactory<String, String> producerFactory() {
-        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
-        return producerFactory;
-    }
-    @Bean
-    public Map<String, Object> producerConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.RETRIES_CONFIG, 0);
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
-        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
-        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
-
-
-        props.put("security.protocol", securityProtocol);
-        props.put("sasl.mechanism", saslMechanism);
-        props.put("sasl.username", saslUsername);
-        props.put("sasl.password", saslPassword);
-
-
-      //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
-
-
-        props.put("sasl.jaas.config",saslJaasConfig);
-
-        return props;
-    }
-
-    @Bean
-    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
-        return new KafkaTemplate<>(producerFactory);
-    }
 }
diff --git a/src/main/java/com/fzzy/conf/KafkaConfig2.java b/src/main/java/com/fzzy/conf/KafkaConfig2.java
new file mode 100644
index 0000000..4539ca1
--- /dev/null
+++ b/src/main/java/com/fzzy/conf/KafkaConfig2.java
@@ -0,0 +1,104 @@
+package com.fzzy.conf;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.transaction.KafkaTransactionManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+//@Configuration
+//@EnableKafka
+//@SpringBootConfiguration
+public class KafkaConfig2 {
+
+    //  private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";";
+
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.properties.security.protocol}")
+    private String securityProtocol;
+
+    @Value("${spring.kafka.properties.sasl.mechanism}")
+    private String saslMechanism;
+
+    @Value("${spring.kafka.properties.sasl.jaas.config}")
+    private String saslJaasConfig;
+
+    @Value("${spring.kafka.properties.sasl.username}")
+    private String saslUsername;
+
+    @Value("${spring.kafka.properties.sasl.password}")
+    private String saslPassword;
+
+
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+        props.put("sasl.username", saslUsername);
+        props.put("sasl.password", saslPassword);
+
+
+        //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
+
+
+        props.put("sasl.jaas.config", saslJaasConfig);
+
+        return props;
+    }
+
+//    @Bean
+//    public Map<String, Object> consumerConfigs() {
+//        Map<String, Object> props = new HashMap<>();
+//        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+//
+////        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+////        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+//
+//
+//        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+//        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+//        props.put("sasl.username", saslUsername);
+//        props.put("sasl.password", saslPassword);
+//
+//
+//        //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
+//
+//
+//        props.put("sasl.jaas.config", saslJaasConfig);
+//
+//        return props;
+//    }
+
+//    @Bean
+//    public ProducerFactory<Object, Object> producerFactory() {
+//        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
+//        return factory;
+//    }
+//
+//
+//    @Bean
+//    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
+//        return new KafkaTransactionManager<>(producerFactory);
+//    }
+//
+//    @Bean
+//    public KafkaTemplate<Object, Object> kafkaTemplate() {
+//        return new KafkaTemplate<>(producerFactory());
+//    }
+
+
+}
diff --git a/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java b/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java
new file mode 100644
index 0000000..bff1cd6
--- /dev/null
+++ b/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java
@@ -0,0 +1,98 @@
+package com.fzzy.conf;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+//@Configuration
+public class KafkaConsumerConfig {
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.properties.security.protocol}")
+    private String securityProtocol;
+
+    @Value("${spring.kafka.properties.sasl.mechanism}")
+    private String saslMechanism;
+
+    @Value("${spring.kafka.properties.sasl.jaas.config}")
+    private String saslJaasConfig;
+
+    @Value("${spring.kafka.properties.sasl.username}")
+    private String saslUsername;
+
+    @Value("${spring.kafka.properties.sasl.password}")
+    private String saslPassword;
+
+    @Value("${spring.kafka.consumer.group-id}")
+    private String groupId;
+
+    @Bean
+    public Map<String, Object> consumerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
+
+
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+
+
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+        props.put("sasl.username", saslUsername);
+        props.put("sasl.password", saslPassword);
+
+
+        //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
+
+
+        props.put("sasl.jaas.config", saslJaasConfig);
+
+
+        return props;
+    }
+
+
+//    @Bean
+//    public ConsumerFactory<Object, Object> consumerFactory() {
+//        //閰嶇疆娑堣垂鑰呯殑 Json 鍙嶅簭鍒楀寲鐨勫彲淇¤禆鍖咃紝鍙嶅簭鍒楀寲瀹炰綋绫婚渶瑕�
+//        try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
+//            // deserializer.trustedPackages("*");
+//            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
+//        }
+//    }
+//
+//    @Bean
+//    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
+//        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
+//        factory.setConsumerFactory(consumerFactory());
+//        //鍦ㄤ睛鍚櫒瀹瑰櫒涓繍琛岀殑绾跨▼鏁帮紝涓�鑸缃负 鏈哄櫒鏁�*鍒嗗尯鏁�
+//        // factory.setConcurrency(concurrency);
+//        //娑堣垂鐩戝惉鎺ュ彛鐩戝惉鐨勪富棰樹笉瀛樺湪鏃讹紝榛樿浼氭姤閿欙紝鎵�浠ヨ缃负false蹇界暐閿欒
+//        // factory.setMissingTopicsFatal(missingTopicsFatal);
+//        //鑷姩鎻愪氦鍏抽棴锛岄渶瑕佽缃墜鍔ㄦ秷鎭‘璁�
+//       // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+//        //  factory.getContainerProperties().setPollTimeout(pollTimeout);
+//        //璁剧疆涓烘壒閲忕洃鍚紝闇�瑕佺敤List鎺ユ敹
+//        //factory.setBatchListener(true);
+//        return factory;
+//    }
+
+}
diff --git a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java
new file mode 100644
index 0000000..d7a3c40
--- /dev/null
+++ b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java
@@ -0,0 +1,95 @@
+package com.fzzy.conf;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.transaction.KafkaTransactionManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaProviderConfig {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.properties.security.protocol}")
+    private String securityProtocol;
+
+    @Value("${spring.kafka.properties.sasl.mechanism}")
+    private String saslMechanism;
+
+    @Value("${spring.kafka.properties.sasl.jaas.config}")
+    private String saslJaasConfig;
+
+    @Value("${spring.kafka.properties.sasl.username}")
+    private String saslUsername;
+
+    @Value("${spring.kafka.properties.sasl.password}")
+    private String saslPassword;
+
+//    @Value("${spring.kafka.producer.transaction-id-prefix}")
+//    private String transactionIdPrefix;
+
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.RETRIES_CONFIG, 0);
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+        props.put("sasl.username", saslUsername);
+        props.put("sasl.password", saslPassword);
+
+
+        //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
+
+
+        props.put("sasl.jaas.config", saslJaasConfig);
+
+        return props;
+    }
+
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
+        return producerFactory;
+    }
+
+//    @Bean
+//    public ProducerFactory<Object, Object> producerFactory() {
+//        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
+//        //寮�鍚簨鍔★紝浼氬鑷� LINGER_MS_CONFIG 閰嶇疆澶辨晥
+//       // factory.setTransactionIdPrefix(transactionIdPrefix);
+//        return factory;
+//    }
+
+//    @Bean
+//    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
+//        return new KafkaTransactionManager<>(producerFactory);
+//    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
+        return new KafkaTemplate<>(producerFactory);
+    }
+
+}
diff --git a/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java b/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java
index 684bdd8..e2ba899 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java
@@ -6,6 +6,7 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml
index e6029e2..cc2c8df 100644
--- a/src/main/resources/application-devGateway.yml
+++ b/src/main/resources/application-devGateway.yml
@@ -48,8 +48,15 @@
       timeout: 6000
   kafka:
     bootstrap-servers: 103.203.217.16:9092
+    producer:
+      retries: 0
+      acks: 1
+      batch-size: 16384
+      buffer-memory: 33554432
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
     properties:
-      security.protocol: sasl_plaintext
+      security.protocol: SASL_PLAINTEXT
       sasl.mechanism: PLAIN
       sasl.username: sc001
       sasl.password: wCV0ISwmoKwbx1lpBKMW
@@ -62,6 +69,7 @@
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 
+
 mqtt:
   host: tcp://10.13.4.84:11883
   client-id:

--
Gitblit v1.9.3