文章目录
前提条件
- 搭建Kafka环境,参考Kafka集群环境搭建及使用
- Java环境:JDK1.8
- Maven版本:apache-maven-3.6.3
- 开发工具:IntelliJ IDEA
项目环境
- 创建maven项目。
- pom.xml文件中引入kafka依赖。
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>2.1.0</version></dependency></dependencies>
创建Topic
创建topic命名为testtopic并指定2个分区。
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create--topic testtopic --partitions2
生产消息
publicclassProducer{publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{// 生产参数配置Properties properties =newProperties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);int i=0;while(true){//生产消息Future<RecordMetadata> future = kafkaProducer.send(newProducerRecord<String,String>("testtopic","key"+i,"value"+i));//获取生产的数据信息RecordMetadata recordMetadata = future.get();System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());Thread.sleep(1000);
i+=1;}}}
生产者参数配置
// ACK机制,默认为1 (0,1,-1)
properties.setProperty(ProducerConfig.ACKS_CONFIG,"");// Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG,"");// Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG,"");// 生产者客户端发送消息的最大值,默认1M
properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"");// 发送消息异常时重试次数,默认为0
properties.setProperty(ProducerConfig.RETRIES_CONFIG,"");// 重试间隔时间,默认100
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,"");// 生产消息自定义分区策略类
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"");// 开启幂等 ,默认true
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"");
更多配置信息查看ProducerConfig类
生产自定义分区策略
- 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。
publicclassPartitionPolicyimplementsPartitioner{privatefinalConcurrentMap<String,AtomicInteger> topicCounterMap =newConcurrentHashMap();@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(keyBytes ==null){int nextValue =this.nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if(availablePartitions.size()>0){int part =Utils.toPositive(nextValue)% availablePartitions.size();return((PartitionInfo)availablePartitions.get(part)).partition();}else{returnUtils.toPositive(nextValue)% numPartitions;}}else{returnUtils.toPositive(Utils.murmur2(keyBytes))% numPartitions;}}privateintnextValue(String topic){AtomicInteger counter =(AtomicInteger)this.topicCounterMap.get(topic);if(null== counter){
counter =newAtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter =(AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if(currentCounter !=null){
counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> map){}}
- 参数配置。
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,PartitionPolicy.class.getName());
生产到指定分区
ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)
。
Future<RecordMetadata> future = kafkaProducer.send(newProducerRecord<String,String>("testtopic",1,"key"+i,"value"+i));
消费消息
publicclassConsumer{publicstaticvoidmain(String[] args)throwsInterruptedException{Properties properties =newProperties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//约定的编解码
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test_group");//默认为自动提交
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//当设置为自动提交时,默认5秒自动提交//properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");////properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(properties);//订阅topic
kafkaConsumer.subscribe(Arrays.asList("testtopic"));Set<TopicPartition> assignment = kafkaConsumer.assignment();ConsumerRecords<String,String> records =null;while(assignment.size()==0){
records = kafkaConsumer.poll(Duration.ofMillis(100));
assignment = kafkaConsumer.assignment();}/*//1.根据时间戳获取 offset,设置 offset
Map<TopicPartition, Long> offsetsForTimes=new HashMap<>();
for (TopicPartition topicPartition : assignment) {
offsetsForTimes.put(topicPartition,1669972273941L);
}
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);
offsetAndTimestampMap.forEach((tp,offsettime)->{
kafkaConsumer.seek(tp,offsettime.offset());
});*//*//2.指定从头开始消费
kafkaConsumer.seekToBeginning(assignment);*//*//3.指定从某offset开始消费
kafkaConsumer.seek(tp,0);*/while(true){if(records.isEmpty()){Thread.sleep(3000);}else{System.out.printf("records count:"+ records.count());Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while(iterator.hasNext()){ConsumerRecord<String,String> record = iterator.next();System.out.println(" time:"+ record.timestamp()+" key:"+ record.key()+" value:"+ record.value()+" partition:"+ record.partition()+" offset:"+ record.offset());}
kafkaConsumer.commitSync();}
records = kafkaConsumer.poll(Duration.ofMillis(0));}}}
消费参数配置
// 消费者必须指定一个消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"");// 消费者每次最多POLL的数量
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"");// 消费者POLL的时间间隔
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC,"");// 设置是否自动提交,默认为true
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"");// 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"");// 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。
更多配置信息查看ConsumerConfig类
offset设置方式
如代码所示,设置offset的几种方式:
- 指定 offset,需要自己维护 offset,方便重试。
- 指定从头开始消费。
- 指定 offset 为最近可用的 offset (默认)。
- 根据时间戳获取 offset,设置 offset。
代码仓库
本文转载自: https://blog.csdn.net/qq_28314431/article/details/128135288
版权归原作者 叫我二蛋 所有, 如有侵权,请联系我们删除。
版权归原作者 叫我二蛋 所有, 如有侵权,请联系我们删除。