package com.fzzy.conf; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.transaction.KafkaTransactionManager; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProviderConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.properties.security.protocol}") private String securityProtocol; @Value("${spring.kafka.properties.sasl.mechanism}") private String saslMechanism; @Value("${spring.kafka.properties.sasl.jaas.config}") private String saslJaasConfig; @Value("${spring.kafka.properties.sasl.username}") private String saslUsername; @Value("${spring.kafka.properties.sasl.password}") private String saslPassword; @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); props.put("sasl.username", saslUsername); props.put("sasl.password", saslPassword); props.put("sasl.jaas.config", saslJaasConfig); return props; } @Bean public ProducerFactory producerFactory() { DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); return producerFactory; } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } }