| | |
| | | package com.fzzy.conf; |
| | | |
| | | import lombok.AllArgsConstructor; |
| | | import org.apache.kafka.clients.producer.ProducerConfig; |
| | | import org.apache.kafka.common.config.SaslConfigs; |
| | | import org.apache.kafka.common.serialization.IntegerSerializer; |
| | | import org.apache.kafka.common.serialization.StringSerializer; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.boot.context.properties.EnableConfigurationProperties; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.kafka.annotation.EnableKafka; |
| | | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.kafka.core.ProducerFactory; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | @Configuration |
| | | @EnableKafka |
| | | public class KafkaConfig { |
| | | |
| | | private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; |
| | | |
| | | |
| | | @Value("${spring.kafka.bootstrap-servers}") |
| | | private String bootstrapServers; |
| | | |
| | | @Value("${spring.kafka.properties.security.protocol}") |
| | | private String securityProtocol; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.mechanism}") |
| | | private String saslMechanism; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.jaas.config}") |
| | | private String saslJaasConfig; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.username}") |
| | | private String saslUsername; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.password}") |
| | | private String saslPassword; |
| | | |
| | | |
| | | @Bean |
| | | public ProducerFactory<String, String> producerFactory() { |
| | | DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); |
| | | return producerFactory; |
| | | } |
| | | @Bean |
| | | public Map<String, Object> producerConfigs() { |
| | | Map<String, Object> props = new HashMap<>(); |
| | | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | props.put(ProducerConfig.RETRIES_CONFIG, 0); |
| | | props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); |
| | | props.put(ProducerConfig.LINGER_MS_CONFIG, 10); |
| | | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); |
| | | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); |
| | | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| | | props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); |
| | | |
| | | |
| | | props.put("security.protocol", securityProtocol); |
| | | props.put("sasl.mechanism", saslMechanism); |
| | | props.put("sasl.username", saslUsername); |
| | | props.put("sasl.password", saslPassword); |
| | | |
| | | |
| | | // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | |
| | | |
| | | props.put("sasl.jaas.config",saslJaasConfig); |
| | | |
| | | return props; |
| | | } |
| | | |
| | | @Bean |
| | | public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { |
| | | return new KafkaTemplate<>(producerFactory); |
| | | } |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package com.fzzy.conf; |
| | | |
| | | import org.apache.kafka.clients.CommonClientConfigs; |
| | | import org.apache.kafka.clients.producer.ProducerConfig; |
| | | import org.apache.kafka.common.config.SaslConfigs; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.kafka.core.ProducerFactory; |
| | | import org.springframework.kafka.transaction.KafkaTransactionManager; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | //@Configuration |
| | | //@EnableKafka |
| | | //@SpringBootConfiguration |
| | | public class KafkaConfig2 { |
| | | |
| | | // private String defaultSaslJaasConfig="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{username}\" password=\"{password}\";"; |
| | | |
| | | |
| | | @Value("${spring.kafka.bootstrap-servers}") |
| | | private String bootstrapServers; |
| | | |
| | | @Value("${spring.kafka.properties.security.protocol}") |
| | | private String securityProtocol; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.mechanism}") |
| | | private String saslMechanism; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.jaas.config}") |
| | | private String saslJaasConfig; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.username}") |
| | | private String saslUsername; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.password}") |
| | | private String saslPassword; |
| | | |
| | | |
| | | @Bean |
| | | public Map<String, Object> producerConfigs() { |
| | | Map<String, Object> props = new HashMap<>(); |
| | | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | |
| | | |
| | | props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); |
| | | props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); |
| | | props.put("sasl.username", saslUsername); |
| | | props.put("sasl.password", saslPassword); |
| | | |
| | | |
| | | // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | |
| | | |
| | | props.put("sasl.jaas.config", saslJaasConfig); |
| | | |
| | | return props; |
| | | } |
| | | |
| | | // @Bean |
| | | // public Map<String, Object> consumerConfigs() { |
| | | // Map<String, Object> props = new HashMap<>(); |
| | | // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | // |
| | | //// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | //// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | // |
| | | // |
| | | // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); |
| | | // props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); |
| | | // props.put("sasl.username", saslUsername); |
| | | // props.put("sasl.password", saslPassword); |
| | | // |
| | | // |
| | | // // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | // |
| | | // |
| | | // props.put("sasl.jaas.config", saslJaasConfig); |
| | | // |
| | | // return props; |
| | | // } |
| | | |
| | | // @Bean |
| | | // public ProducerFactory<Object, Object> producerFactory() { |
| | | // DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); |
| | | // return factory; |
| | | // } |
| | | // |
| | | // |
| | | // @Bean |
| | | // public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { |
| | | // return new KafkaTransactionManager<>(producerFactory); |
| | | // } |
| | | // |
| | | // @Bean |
| | | // public KafkaTemplate<Object, Object> kafkaTemplate() { |
| | | // return new KafkaTemplate<>(producerFactory()); |
| | | // } |
| | | |
| | | |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package com.fzzy.conf; |
| | | |
| | | import org.apache.kafka.clients.CommonClientConfigs; |
| | | import org.apache.kafka.clients.consumer.ConsumerConfig; |
| | | import org.apache.kafka.common.config.SaslConfigs; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.boot.SpringBootConfiguration; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; |
| | | import org.springframework.kafka.config.KafkaListenerContainerFactory; |
| | | import org.springframework.kafka.core.ConsumerFactory; |
| | | import org.springframework.kafka.core.DefaultKafkaConsumerFactory; |
| | | import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; |
| | | import org.springframework.kafka.listener.ContainerProperties; |
| | | import org.springframework.kafka.support.serializer.JsonDeserializer; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | //@Configuration |
| | | public class KafkaConsumerConfig { |
| | | @Value("${spring.kafka.bootstrap-servers}") |
| | | private String bootstrapServers; |
| | | |
| | | @Value("${spring.kafka.properties.security.protocol}") |
| | | private String securityProtocol; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.mechanism}") |
| | | private String saslMechanism; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.jaas.config}") |
| | | private String saslJaasConfig; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.username}") |
| | | private String saslUsername; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.password}") |
| | | private String saslPassword; |
| | | |
| | | @Value("${spring.kafka.consumer.group-id}") |
| | | private String groupId; |
| | | |
| | | @Bean |
| | | public Map<String, Object> consumerConfigs() { |
| | | Map<String, Object> props = new HashMap<>(); |
| | | props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| | | |
| | | props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); |
| | | |
| | | |
| | | props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); |
| | | props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); |
| | | |
| | | |
| | | props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); |
| | | props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); |
| | | props.put("sasl.username", saslUsername); |
| | | props.put("sasl.password", saslPassword); |
| | | |
| | | |
| | | // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | |
| | | |
| | | props.put("sasl.jaas.config", saslJaasConfig); |
| | | |
| | | |
| | | return props; |
| | | } |
| | | |
| | | |
| | | // @Bean |
| | | // public ConsumerFactory<Object, Object> consumerFactory() { |
| | | // //é
ç½®æ¶è´¹è
ç Json ååºååçå¯ä¿¡èµå
ï¼ååºååå®ä½ç±»éè¦ |
| | | // try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) { |
| | | // // deserializer.trustedPackages("*"); |
| | | // return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer); |
| | | // } |
| | | // } |
| | | // |
| | | // @Bean |
| | | // public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() { |
| | | // ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | // factory.setConsumerFactory(consumerFactory()); |
| | | // //å¨ä¾¦å¬å¨å®¹å¨ä¸è¿è¡ççº¿ç¨æ°ï¼ä¸è¬è®¾ç½®ä¸º æºå¨æ°*ååºæ° |
| | | // // factory.setConcurrency(concurrency); |
| | | // //æ¶è´¹ç嬿¥å£çå¬ç主é¢ä¸å卿¶ï¼é»è®¤ä¼æ¥éï¼æä»¥è®¾ç½®ä¸ºfalse忽ç¥é误 |
| | | // // factory.setMissingTopicsFatal(missingTopicsFatal); |
| | | // //èªå¨æäº¤å
³éï¼éè¦è®¾ç½®æå¨æ¶æ¯ç¡®è®¤ |
| | | // // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); |
| | | // // factory.getContainerProperties().setPollTimeout(pollTimeout); |
| | | // //设置为æ¹éçå¬ï¼éè¦ç¨Listæ¥æ¶ |
| | | // //factory.setBatchListener(true); |
| | | // return factory; |
| | | // } |
| | | |
| | | } |
¶Ô±ÈÐÂÎļþ |
| | |
| | | package com.fzzy.conf; |
| | | |
| | | import org.apache.kafka.clients.CommonClientConfigs; |
| | | import org.apache.kafka.clients.producer.ProducerConfig; |
| | | import org.apache.kafka.common.config.SaslConfigs; |
| | | import org.apache.kafka.common.serialization.IntegerSerializer; |
| | | import org.apache.kafka.common.serialization.StringSerializer; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.boot.SpringBootConfiguration; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.kafka.annotation.EnableKafka; |
| | | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.kafka.core.ProducerFactory; |
| | | import org.springframework.kafka.transaction.KafkaTransactionManager; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | @Configuration |
| | | public class KafkaProviderConfig { |
| | | |
| | | @Value("${spring.kafka.bootstrap-servers}") |
| | | private String bootstrapServers; |
| | | |
| | | @Value("${spring.kafka.properties.security.protocol}") |
| | | private String securityProtocol; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.mechanism}") |
| | | private String saslMechanism; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.jaas.config}") |
| | | private String saslJaasConfig; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.username}") |
| | | private String saslUsername; |
| | | |
| | | @Value("${spring.kafka.properties.sasl.password}") |
| | | private String saslPassword; |
| | | |
| | | // @Value("${spring.kafka.producer.transaction-id-prefix}") |
| | | // private String transactionIdPrefix; |
| | | |
| | | @Bean |
| | | public Map<String, Object> producerConfigs() { |
| | | Map<String, Object> props = new HashMap<>(); |
| | | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| | | props.put(ProducerConfig.RETRIES_CONFIG, 0); |
| | | props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); |
| | | props.put(ProducerConfig.LINGER_MS_CONFIG, 10); |
| | | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); |
| | | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| | | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
| | | props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); |
| | | |
| | | props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); |
| | | props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); |
| | | props.put("sasl.username", saslUsername); |
| | | props.put("sasl.password", saslPassword); |
| | | |
| | | |
| | | // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); |
| | | |
| | | |
| | | props.put("sasl.jaas.config", saslJaasConfig); |
| | | |
| | | return props; |
| | | } |
| | | |
| | | @Bean |
| | | public ProducerFactory<String, String> producerFactory() { |
| | | DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); |
| | | return producerFactory; |
| | | } |
| | | |
| | | // @Bean |
| | | // public ProducerFactory<Object, Object> producerFactory() { |
| | | // DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); |
| | | // //å¼å¯äºå¡ï¼ä¼å¯¼è´ LINGER_MS_CONFIG é
置失æ |
| | | // // factory.setTransactionIdPrefix(transactionIdPrefix); |
| | | // return factory; |
| | | // } |
| | | |
| | | // @Bean |
| | | // public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { |
| | | // return new KafkaTransactionManager<>(producerFactory); |
| | | // } |
| | | |
| | | @Bean |
| | | public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { |
| | | return new KafkaTemplate<>(producerFactory); |
| | | } |
| | | |
| | | } |
| | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.kafka.annotation.KafkaListener; |
| | | import org.springframework.kafka.core.KafkaTemplate; |
| | | import org.springframework.stereotype.Component; |
| | |
| | | timeout: 6000 |
| | | kafka: |
| | | bootstrap-servers: 103.203.217.16:9092 |
| | | producer: |
| | | retries: 0 |
| | | acks: 1 |
| | | batch-size: 16384 |
| | | buffer-memory: 33554432 |
| | | key-serializer: org.apache.kafka.common.serialization.StringSerializer |
| | | value-serializer: org.apache.kafka.common.serialization.StringSerializer |
| | | properties: |
| | | security.protocol: sasl_plaintext |
| | | security.protocol: SASL_PLAINTEXT |
| | | sasl.mechanism: PLAIN |
| | | sasl.username: sc001 |
| | | sasl.password: wCV0ISwmoKwbx1lpBKMW |
| | |
| | | key-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
| | | value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
| | | |
| | | |
| | | mqtt: |
| | | host: tcp://10.13.4.84:11883 |
| | | client-id: |