| | |
| | | package com.fzzy.conf; |
| | | |
| | | import lombok.AllArgsConstructor; |
| | | import org.apache.kafka.clients.producer.ProducerConfig; |
| | | import org.apache.kafka.common.config.SaslConfigs; |
| | | import org.apache.kafka.common.serialization.IntegerSerializer; |
| | | import org.apache.kafka.common.serialization.StringSerializer; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.boot.context.properties.EnableConfigurationProperties; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.kafka.annotation.EnableKafka; |
| | | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.kafka.core.ProducerFactory; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | @Configuration |
| | | @EnableKafka |
| | | public class KafkaConfig { |
| | | |
| | | private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; |
| | | |
| | | |
| | | @Value("${spring.kafka.bootstrap-servers}") |
| | | private String bootstrapServers; |
| | | |
| | | @Value("${spring.kafka.properties.security.protocol}") |
| | | private String securityProtocol; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.mechanism}") |
| | | private String saslMechanism; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.jaas.config}") |
| | | private String saslJaasConfig; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.username}") |
| | | private String saslUsername; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.password}") |
| | | private String saslPassword; |
| | | |
| | | |
| | | @Bean |
| | | public ProducerFactory<String, String> producerFactory() { |
| | | DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); |
| | | return producerFactory; |
| | | } |
| | | @Bean |
| | | public Map<String, Object> producerConfigs() { |
| | | Map<String, Object> props = new HashMap<>(); |
| | | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | props.put(ProducerConfig.RETRIES_CONFIG, 0); |
| | | props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); |
| | | props.put(ProducerConfig.LINGER_MS_CONFIG, 10); |
| | | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); |
| | | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); |
| | | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| | | props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); |
| | | |
| | | |
| | | props.put("security.protocol", securityProtocol); |
| | | props.put("sasl.mechanism", saslMechanism); |
| | | props.put("sasl.username", saslUsername); |
| | | props.put("sasl.password", saslPassword); |
| | | |
| | | |
| | | // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | |
| | | |
| | | props.put("sasl.jaas.config",saslJaasConfig); |
| | | |
| | | return props; |
| | | } |
| | | |
| | | @Bean |
| | | public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { |
| | | return new KafkaTemplate<>(producerFactory); |
| | | } |
| | | } |