From de3d8ad9d76a6130fa3751897ea3dcc6429cd7ed Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期二, 31 十月 2023 19:02:02 +0800 Subject: [PATCH] 四川省网关接口相关13 --- src/main/resources/application-devGateway.yml | 10 + src/main/java/com/fzzy/conf/KafkaConfig.java | 73 ---------- src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java | 1 src/main/java/com/fzzy/conf/KafkaConsumerConfig.java | 98 ++++++++++++++ src/main/java/com/fzzy/conf/KafkaConfig2.java | 104 ++++++++++++++ src/main/java/com/fzzy/conf/KafkaProviderConfig.java | 95 +++++++++++++ 6 files changed, 307 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/fzzy/conf/KafkaConfig.java b/src/main/java/com/fzzy/conf/KafkaConfig.java index 7f4c881..34b3a25 100644 --- a/src/main/java/com/fzzy/conf/KafkaConfig.java +++ b/src/main/java/com/fzzy/conf/KafkaConfig.java @@ -1,82 +1,9 @@ package com.fzzy.conf; -import lombok.AllArgsConstructor; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -import java.util.HashMap; -import java.util.Map; @Configuration @EnableKafka public class KafkaConfig { - - private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; - - - @Value("${spring.kafka.bootstrap-servers}") - private String bootstrapServers; - - @Value("${spring.kafka.properties.security.protocol}") - private String securityProtocol; - - @Value("${spring.kafka.properties.sasl.mechanism}") - private String saslMechanism; - - @Value("${spring.kafka.properties.sasl.jaas.config}") - private String saslJaasConfig; - - @Value("${spring.kafka.properties.sasl.username}") - private String saslUsername; - - @Value("${spring.kafka.properties.sasl.password}") - private String saslPassword; - - - @Bean - public ProducerFactory<String, String> producerFactory() { - DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); - return producerFactory; - } - @Bean - public Map<String, Object> producerConfigs() { - Map<String, Object> props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.RETRIES_CONFIG, 0); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); - props.put(ProducerConfig.LINGER_MS_CONFIG, 10); - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); - - - props.put("security.protocol", securityProtocol); - props.put("sasl.mechanism", saslMechanism); - props.put("sasl.username", saslUsername); - props.put("sasl.password", saslPassword); - - - // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); - - - props.put("sasl.jaas.config",saslJaasConfig); - - return props; - } - - @Bean - public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { - return new KafkaTemplate<>(producerFactory); - } } diff --git a/src/main/java/com/fzzy/conf/KafkaConfig2.java b/src/main/java/com/fzzy/conf/KafkaConfig2.java new file mode 100644 index 0000000..4539ca1 --- /dev/null +++ b/src/main/java/com/fzzy/conf/KafkaConfig2.java @@ -0,0 +1,104 @@ +package com.fzzy.conf; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +import java.util.HashMap; +import java.util.Map; + +//@Configuration +//@EnableKafka +//@SpringBootConfiguration +public class KafkaConfig2 { + + // private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; + + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.properties.sasl.username}") + private String saslUsername; + + @Value("${spring.kafka.properties.sasl.password}") + private String saslPassword; + + + @Bean + public Map<String, Object> producerConfigs() { + Map<String, Object> props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put("sasl.username", saslUsername); + props.put("sasl.password", saslPassword); + + + // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); + + + props.put("sasl.jaas.config", saslJaasConfig); + + return props; + } + +// @Bean +// public Map<String, Object> consumerConfigs() { +// Map<String, Object> props = new HashMap<>(); +// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// +// +// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); +// props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); +// props.put("sasl.username", saslUsername); +// props.put("sasl.password", saslPassword); +// +// +// // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); +// +// +// props.put("sasl.jaas.config", saslJaasConfig); +// +// return props; +// } + +// @Bean +// public ProducerFactory<Object, Object> producerFactory() { +// DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); +// return factory; +// } +// +// +// @Bean +// public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { +// return new KafkaTransactionManager<>(producerFactory); +// } +// +// @Bean +// public KafkaTemplate<Object, Object> kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } + + +} diff --git a/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java b/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java new file mode 100644 index 0000000..bff1cd6 --- /dev/null +++ b/src/main/java/com/fzzy/conf/KafkaConsumerConfig.java @@ -0,0 +1,98 @@ +package com.fzzy.conf; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +//@Configuration +public class KafkaConsumerConfig { + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.properties.sasl.username}") + private String saslUsername; + + @Value("${spring.kafka.properties.sasl.password}") + private String saslPassword; + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + + @Bean + public Map<String, Object> consumerConfigs() { + Map<String, Object> props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); + + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put("sasl.username", saslUsername); + props.put("sasl.password", saslPassword); + + + // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); + + + props.put("sasl.jaas.config", saslJaasConfig); + + + return props; + } + + +// @Bean +// public ConsumerFactory<Object, Object> consumerFactory() { +// //閰嶇疆娑堣垂鑰呯殑 Json 鍙嶅簭鍒楀寲鐨勫彲淇¤禆鍖咃紝鍙嶅簭鍒楀寲瀹炰綋绫婚渶瑕� +// try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) { +// // deserializer.trustedPackages("*"); +// return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); +// } +// } +// +// @Bean +// public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// //鍦ㄤ睛鍚櫒瀹瑰櫒涓繍琛岀殑绾跨▼鏁帮紝涓�鑸缃负 鏈哄櫒鏁�*鍒嗗尯鏁� +// // factory.setConcurrency(concurrency); +// //娑堣垂鐩戝惉鎺ュ彛鐩戝惉鐨勪富棰樹笉瀛樺湪鏃讹紝榛樿浼氭姤閿欙紝鎵�浠ヨ缃负false蹇界暐閿欒 +// // factory.setMissingTopicsFatal(missingTopicsFatal); +// //鑷姩鎻愪氦鍏抽棴锛岄渶瑕佽缃墜鍔ㄦ秷鎭‘璁� +// // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); +// // factory.getContainerProperties().setPollTimeout(pollTimeout); +// //璁剧疆涓烘壒閲忕洃鍚紝闇�瑕佺敤List鎺ユ敹 +// //factory.setBatchListener(true); +// return factory; +// } + +} diff --git a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java new file mode 100644 index 0000000..d7a3c40 --- /dev/null +++ b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java @@ -0,0 +1,95 @@ +package com.fzzy.conf; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProviderConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.properties.sasl.username}") + private String saslUsername; + + @Value("${spring.kafka.properties.sasl.password}") + private String saslPassword; + +// @Value("${spring.kafka.producer.transaction-id-prefix}") +// private String transactionIdPrefix; + + @Bean + public Map<String, Object> producerConfigs() { + Map<String, Object> props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + props.put(ProducerConfig.LINGER_MS_CONFIG, 10); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put("sasl.username", saslUsername); + props.put("sasl.password", saslPassword); + + + // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); + + + props.put("sasl.jaas.config", saslJaasConfig); + + return props; + } + + @Bean + public ProducerFactory<String, String> producerFactory() { + DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); + return producerFactory; + } + +// @Bean +// public ProducerFactory<Object, Object> producerFactory() { +// DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); +// //寮�鍚簨鍔★紝浼氬鑷� LINGER_MS_CONFIG 閰嶇疆澶辨晥 +// // factory.setTransactionIdPrefix(transactionIdPrefix); +// return factory; +// } + +// @Bean +// public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { +// return new KafkaTransactionManager<>(producerFactory); +// } + + @Bean + public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java b/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java index 684bdd8..e2ba899 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java +++ b/src/main/java/com/fzzy/gateway/hx2023/kafka/KafkaDeviceReport.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index e6029e2..cc2c8df 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -48,8 +48,15 @@ timeout: 6000 kafka: bootstrap-servers: 103.203.217.16:9092 + producer: + retries: 0 + acks: 1 + batch-size: 16384 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: - security.protocol: sasl_plaintext + security.protocol: SASL_PLAINTEXT sasl.mechanism: PLAIN sasl.username: sc001 sasl.password: wCV0ISwmoKwbx1lpBKMW @@ -62,6 +69,7 @@ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + mqtt: host: tcp://10.13.4.84:11883 client-id: -- Gitblit v1.9.3