0


SpringBoot 集成 Kafka (SSL证书)

1.SSL证书

使用特定的SSL证书才可以进行访问,可用于多个项目间中转或项目中使用到的Kafka。

client.keystore.jks、client.truststore.jks

2.全局配置

demo:
    kafka:
        address: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
        password: xxxxxxxx
        group-id: xxxx

3.Producer配置

KafkaTemplate配置

@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {

    @Value("${demo.kafka.address}")
    private String addressStr;

    @Value("${demo.kafka.password}")
    private String password;

    //注入kafkaTemplate
    @Bean
    KafkaTemplate<String,String> myKafkaTemplate() throws FileNotFoundException {
        DefaultKafkaProducerFactory<String, String> producerFactory =
           new DefaultKafkaProducerFactory<>(producerProperties(addressStr));
        return new KafkaTemplate<>(producerFactory);
    }

    //kafka的配置
    private Map<String,Object> producerProperties(String addressStr) throws FileNotFoundException {
        // kafka的相关参数 比如ip地址和分组这些参数
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
             properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);

        return properties;
    }

}

发送至对应Kafka服务器

@Service
public class ProductService {
    
    //配置的KafkaTemplate
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    //发送至对于Kafka
    public void sendXxxxx() throws Exception {
        ListenableFuture<SendResult<String, String>> demo_topic = kafkaTemplate.send("demo_topic", "message");
        demo_topic.addCallback(success -> {
            log.info("推送成功!!!");
        },error -> {
            log.info("推送失败!!!");
        });
    }

}

4.Consumer配置

KafkaListenerContainerFactory配置

@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {

    @Value("${demo.kafka.address}")
    private String addressStr;

    @Value("${demo.kafka.password}")
    private String password;

    @Value("${demo.kafka.group-id}")
    private String groupId;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,byte[]>> kafkaFactory() throws FileNotFoundException {
        ConcurrentKafkaListenerContainerFactory<String,byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        // 创建消费的kafka工厂对象
        ConsumerFactory<String,byte[]> consumerFactory = new
                DefaultKafkaConsumerFactory<>(consumerProperties(dataCenterKafkaIps,groupId));
        factory.setConsumerFactory(consumerFactory);
        //手动提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    // kafka的配置
    private Map<String,Object> consumerProperties(String addressStr,String groupId) throws FileNotFoundException {
        // kafka的相关参数 比如ip地址和分组这些参数
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);

        return properties;
    }

}

监听Topic

@Component
@Slf4j
public class DemoListener {

    @KafkaListener(topics = "demo_topic",containerFactory = "kafkaFactory")
    public void demo(ConsumerRecord<String,String> record, Acknowledgment ack) throws Exception {
        try{
        
        log.info(record.value());
        
        //业务逻辑

        ack.acknowledge();

        }catch(Exception e){
        
        throw new Exception("程序异常");

        }
    }
}

5.运行异常汇总

Error connecting to node host:9092 (id: 1003 rack: null)

需在服务器/etc/hosts中配置【ip:host】对应关系

使用docker、docker-compose等部署须在容器内部配置

其他问题还有待发现,不太记得了😂

标签: java kafka ssl

本文转载自: https://blog.csdn.net/qq_50438358/article/details/131606656
版权归原作者 学而不思则忘. 所有, 如有侵权,请联系我们删除。

“SpringBoot 集成 Kafka (SSL证书)”的评论:

还没有评论