1.SSL证书
使用特定的SSL证书才可以进行访问,可用于多个项目间中转或项目中使用到的Kafka。
client.keystore.jks、client.truststore.jks
2.全局配置
demo:
kafka:
address: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
password: xxxxxxxx
group-id: xxxx
3.Producer配置
KafkaTemplate配置
@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {
@Value("${demo.kafka.address}")
private String addressStr;
@Value("${demo.kafka.password}")
private String password;
//注入kafkaTemplate
@Bean
KafkaTemplate<String,String> myKafkaTemplate() throws FileNotFoundException {
DefaultKafkaProducerFactory<String, String> producerFactory =
new DefaultKafkaProducerFactory<>(producerProperties(addressStr));
return new KafkaTemplate<>(producerFactory);
}
//kafka的配置
private Map<String,Object> producerProperties(String addressStr) throws FileNotFoundException {
// kafka的相关参数 比如ip地址和分组这些参数
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//ssl加密和认证配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
//获取Resources配置文件中client.keystore.jks
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
//设置为空字符串来禁用服务器主机名验证
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
//获取Resources配置文件中client.truststore.jks
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
return properties;
}
}
发送至对应Kafka服务器
@Service
public class ProductService {
//配置的KafkaTemplate
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
//发送至对于Kafka
public void sendXxxxx() throws Exception {
ListenableFuture<SendResult<String, String>> demo_topic = kafkaTemplate.send("demo_topic", "message");
demo_topic.addCallback(success -> {
log.info("推送成功!!!");
},error -> {
log.info("推送失败!!!");
});
}
}
4.Consumer配置
KafkaListenerContainerFactory配置
@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {
@Value("${demo.kafka.address}")
private String addressStr;
@Value("${demo.kafka.password}")
private String password;
@Value("${demo.kafka.group-id}")
private String groupId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,byte[]>> kafkaFactory() throws FileNotFoundException {
ConcurrentKafkaListenerContainerFactory<String,byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 创建消费的kafka工厂对象
ConsumerFactory<String,byte[]> consumerFactory = new
DefaultKafkaConsumerFactory<>(consumerProperties(dataCenterKafkaIps,groupId));
factory.setConsumerFactory(consumerFactory);
//手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
// kafka的配置
private Map<String,Object> consumerProperties(String addressStr,String groupId) throws FileNotFoundException {
// kafka的相关参数 比如ip地址和分组这些参数
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//ssl加密和认证配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
//获取Resources配置文件中client.keystore.jks
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
//设置为空字符串来禁用服务器主机名验证
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
//获取Resources配置文件中client.truststore.jks
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
return properties;
}
}
监听Topic
@Component
@Slf4j
public class DemoListener {
@KafkaListener(topics = "demo_topic",containerFactory = "kafkaFactory")
public void demo(ConsumerRecord<String,String> record, Acknowledgment ack) throws Exception {
try{
log.info(record.value());
//业务逻辑
ack.acknowledge();
}catch(Exception e){
throw new Exception("程序异常");
}
}
}
5.运行异常汇总
Error connecting to node host:9092 (id: 1003 rack: null)
需在服务器/etc/hosts中配置【ip:host】对应关系
使用docker、docker-compose等部署须在容器内部配置
其他问题还有待发现,不太记得了😂
版权归原作者 学而不思则忘. 所有, 如有侵权,请联系我们删除。