package com.fzzy.conf;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang.StringUtils;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import javax.annotation.PostConstruct;
|
import java.util.HashMap;
|
import java.util.Map;
|
|
/**
|
* 自定义配置类
|
*/
|
@Slf4j
|
@Configuration
|
@ConfigurationProperties(prefix = "spring.kafka")
|
public class MyPartitionTemplate {
|
|
private String bootstrapServers;
|
|
|
private String securityProtocol;
|
|
|
private String saslMechanism;
|
|
|
private String saslJaasConfig;
|
|
|
private String saslUsername;
|
|
|
private String saslPassword;
|
|
KafkaTemplate<String, String> kafkaTemplate;
|
|
@PostConstruct
|
public void setKafkaTemplate() {
|
Map<String, Object> props = new HashMap<>();
|
|
props.put("bootstrap.servers", bootstrapServers);
|
props.put("security.protocol", securityProtocol);
|
props.put("sasl.mechanism", saslMechanism);
|
|
if (StringUtils.isNotEmpty(saslUsername) && StringUtils.isNotEmpty(saslPassword)) {
|
saslJaasConfig = saslJaasConfig.replace("{username}", saslUsername)
|
.replace("{password}", saslPassword);
|
props.put("sasl.jaas.config", saslJaasConfig);
|
|
log.debug("----sasl.jaas.config---{}", saslJaasConfig);
|
}
|
|
this.kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
|
}
|
|
public KafkaTemplate<String, String> getKafkaTemplate() {
|
return kafkaTemplate;
|
}
|
|
}
|