package com.fzzy.conf; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.Map; /** * 自定义配置类 */ @Slf4j @Configuration @ConfigurationProperties(prefix = "spring.kafka") public class MyPartitionTemplate { private String bootstrapServers; private String securityProtocol; private String saslMechanism; private String saslJaasConfig; private String saslUsername; private String saslPassword; KafkaTemplate kafkaTemplate; @PostConstruct public void setKafkaTemplate() { Map props = new HashMap<>(); props.put("bootstrap.servers", bootstrapServers); props.put("security.protocol", securityProtocol); props.put("sasl.mechanism", saslMechanism); if (StringUtils.isNotEmpty(saslUsername) && StringUtils.isNotEmpty(saslPassword)) { saslJaasConfig = saslJaasConfig.replace("{username}", saslUsername) .replace("{password}", saslPassword); props.put("sasl.jaas.config", saslJaasConfig); log.debug("----sasl.jaas.config---{}", saslJaasConfig); } this.kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } public KafkaTemplate getKafkaTemplate() { return kafkaTemplate; } }