package com.fzzy.conf;
|
|
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.common.config.SaslConfigs;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.SpringBootConfiguration;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
import org.springframework.kafka.core.ConsumerFactory;
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
import org.springframework.kafka.listener.ContainerProperties;
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
|
import java.util.HashMap;
|
import java.util.Map;
|
|
//@Configuration
|
public class KafkaConsumerConfig {
|
@Value("${spring.kafka.bootstrap-servers}")
|
private String bootstrapServers;
|
|
@Value("${spring.kafka.properties.security.protocol}")
|
private String securityProtocol;
|
|
@Value("${spring.kafka.properties.sasl.mechanism}")
|
private String saslMechanism;
|
|
@Value("${spring.kafka.properties.sasl.jaas.config}")
|
private String saslJaasConfig;
|
|
@Value("${spring.kafka.properties.sasl.username}")
|
private String saslUsername;
|
|
@Value("${spring.kafka.properties.sasl.password}")
|
private String saslPassword;
|
|
@Value("${spring.kafka.consumer.group-id}")
|
private String groupId;
|
|
@Bean
|
public Map<String, Object> consumerConfigs() {
|
Map<String, Object> props = new HashMap<>();
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
|
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
|
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
props.put("sasl.username", saslUsername);
|
props.put("sasl.password", saslPassword);
|
|
|
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
|
|
|
props.put("sasl.jaas.config", saslJaasConfig);
|
|
|
return props;
|
}
|
|
|
// @Bean
|
// public ConsumerFactory<Object, Object> consumerFactory() {
|
// //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
|
// try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
|
// // deserializer.trustedPackages("*");
|
// return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
|
// }
|
// }
|
//
|
// @Bean
|
// public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
|
// ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
// factory.setConsumerFactory(consumerFactory());
|
// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
|
// // factory.setConcurrency(concurrency);
|
// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
|
// // factory.setMissingTopicsFatal(missingTopicsFatal);
|
// //自动提交关闭,需要设置手动消息确认
|
// // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
|
// // factory.getContainerProperties().setPollTimeout(pollTimeout);
|
// //设置为批量监听,需要用List接收
|
// //factory.setBatchListener(true);
|
// return factory;
|
// }
|
|
}
|