package com.fzzy.conf; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; //@Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.properties.security.protocol}") private String securityProtocol; @Value("${spring.kafka.properties.sasl.mechanism}") private String saslMechanism; @Value("${spring.kafka.properties.sasl.jaas.config}") private String saslJaasConfig; @Value("${spring.kafka.properties.sasl.username}") private String saslUsername; @Value("${spring.kafka.properties.sasl.password}") private String saslPassword; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); props.put("sasl.username", saslUsername); props.put("sasl.password", saslPassword); // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); props.put("sasl.jaas.config", saslJaasConfig); return props; } // @Bean // public ConsumerFactory consumerFactory() { // //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 // try (JsonDeserializer deserializer = new JsonDeserializer<>()) { // // deserializer.trustedPackages("*"); // return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); // } // } // // @Bean // public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { // ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // factory.setConsumerFactory(consumerFactory()); // //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 // // factory.setConcurrency(concurrency); // //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 // // factory.setMissingTopicsFatal(missingTopicsFatal); // //自动提交关闭,需要设置手动消息确认 // // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // // factory.getContainerProperties().setPollTimeout(pollTimeout); // //设置为批量监听,需要用List接收 // //factory.setBatchListener(true); // return factory; // } }