li-apache-kafka-clients 使用教程
li-apache-kafka-clientsli-apache-kafka-clients is a wrapper library for the Apache Kafka vanilla clients. It provides additional features such as large message support and auditing to the Java producer and consumer in the open source Apache Kafka.项目地址:https://gitcode.com/gh_mirrors/li/li-apache-kafka-clients
1、项目介绍
li-apache-kafka-clients
是一个基于 Apache Kafka 原生客户端的封装库,由 LinkedIn 开发并开源。该库提供了额外的功能,如大消息支持、审计等,适用于 Java 的生产者和消费者。它旨在与 Apache Kafka 原生客户端完全兼容,并且高度可定制,允许用户插入自己的大消息段和审计器实现。
2、项目快速启动
环境准备
- Java 8 或更高版本
- Apache Kafka 2.0 或更高版本
- Gradle 构建工具
快速启动代码
- 克隆项目仓库:
git clone https://github.com/linkedin/li-apache-kafka-clients.gitcd li-apache-kafka-clients
- 构建项目:
./gradlew build
- 配置 Kafka 生产者:
import com.linkedin.kafka.clients.producer.LiKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); LiKafkaProducer<String, String> producer = new LiKafkaProducer<>(props); String topic = "test-topic"; String key = "key"; String value = "Hello, Kafka!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); producer.close(); }}
- 配置 Kafka 消费者:
import com.linkedin.kafka.clients.consumer.LiKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); LiKafkaConsumer<String, String> consumer = new LiKafkaConsumer<>(props); String topic = "test-topic"; consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }}
3、应用案例和最佳实践
应用案例
- 大消息处理:在某些场景下,用户需要发送超过 Kafka 默认消息大小限制的消息。
li-apache-kafka-clients
通过将大消息分割成段并在消费者端重新组装来处理这种情况。 - 审计功能:LinkedIn 使用基于计数的
li-apache-kafka-clientsli-apache-kafka-clients is a wrapper library for the Apache Kafka vanilla clients. It provides additional features such as large message support and auditing to the Java producer and consumer in the open source Apache Kafka.项目地址:https://gitcode.com/gh_mirrors/li/li-apache-kafka-clients
版权归原作者 章炎滔 所有, 如有侵权,请联系我们删除。