package com.fzzy.conf; import lombok.AllArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConfig { private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.properties.security.protocol}") private String securityProtocol; @Value("${spring.kafka.properties.sasl.mechanism}") private String saslMechanism; @Value("${spring.kafka.properties.sasl.jaas.config}") private String saslJaasConfig; @Value("${spring.kafka.properties.sasl.username}") private String saslUsername; @Value("${spring.kafka.properties.sasl.password}") private String saslPassword; @Bean public ProducerFactory producerFactory() { DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); return producerFactory; } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); props.put("security.protocol", securityProtocol); props.put("sasl.mechanism", saslMechanism); props.put("sasl.username", saslUsername); props.put("sasl.password", saslPassword); // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); props.put("sasl.jaas.config",saslJaasConfig); return props; } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } }