首先通过这个命令什么也不加参数可以看到参数的详解
./kafka-topics.sh
创建一个topic基本参数
连接kafka : --zookeeper
操作一个topic : --topic
对一个topic进行什么样的操作?增–create删–delete改–alter查–describe
指定分区数:–partitions
指定副本个数:–replication-factor
1、创建一个test0主题并指定分区数1副本数1
./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --create --replication-factor 1--partitions1
2、查看都有哪些主题
./kafka-topics.sh --zookeeper192.168.124.8:2181 --list
3、查看主题test0的详细信息
./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --describe
4、修改分区为3 分区数只能增加不能减少!
./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --alter--partitions3
5、另外这里不能通过命令行的方式去修改副本
./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --alter --replication-factor 3
6、发送消息到topic
./kafka-console-producer.sh --broker-list 192.168.124.8:9092--topic test0
7、消费者查看消息
# 增量消费数据,以前发送的不能读取到
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message
# --from-beginning 读取历史消息
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning
主题创建
./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic message --create --replication-factor 1--partitions1
生产者
kafka生产者发送消息
添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
// 简单发送数据@TestvoidSimpleSendData(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");// 指定key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producer 我们写入 hello 的时候 没有key 实际key="" value="hello" 所以都是String 对应下面的K, VKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);//简单消息发送
kafkaProducer.send(newProducerRecord<>("message","hello world "));// close
kafkaProducer.close();}
进入容器消费者查看消息是否发送成功
dockerexec-it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 消费者 消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning
发现消息正常消费。
带有回调函数发送消息
@TestvoidtestProducerCallback(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 也可以定义一个类实现Callback接口
kafkaProducer.send(newProducerRecord<>("message","hello world exec callback"),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :0
*/}else{// 打印异常信息
exception.printStackTrace();}}});// close
kafkaProducer.close();}
lombda简化写法
@TestvoidtestProducerCallbacklombda(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 也可以定义一个类实现Callback接口
kafkaProducer.send(newProducerRecord<>("message","hello world exec callback2"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :0
*/}else{// 打印异常信息
exception.printStackTrace();}}));// close
kafkaProducer.close();}
上述都是异步发送消息
同步发送 sync
调用 send() 方法,然后再调用 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,
如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它来查看消息记录。
指定分区发送
@TestvoiduserPortitionsSend(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);/*
默认的分区规则 DefaultPartitioner
指定发送到哪个分区 0 后面有个key 空即可
*/
kafkaProducer.send(newProducerRecord<>("message",2,"","hello world exec callback3"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :2
*/}else{// 打印异常信息
exception.printStackTrace();}}));
kafkaProducer.close();}
指定key 按照key的哈希值 对分区取模 映射
kafkaProducer.send(newProducerRecord<>("message","a","hello world exec callback"),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :0
*/}}});
希望把订单表里的所有数据发送到 kafka 的某一个分区 ? 实现 只需在key上放上订单的表名字 —一定会发到一个分区上
自定义分区器
1、需求:实现一个分区器实现,发送过来的数据中如果包含zero就发送0号分区,不包含zero就发往1号分区。
2、定义类实现Partitioner接口
MyPartitioner.java
publicclassMyPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){// get dataString msgValue = value.toString();int partition;if(msgValue.contains("zero")){
partition=0;}else{
partition=1;}return partition;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}
@TestvoidcustomPartitionSend(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);
kafkaProducer.send(newProducerRecord<>("message","hello world exec callback"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :2
*/}else{// 打印异常信息
exception.printStackTrace();}}));// close
kafkaProducer.close();}
上述方式实现了自定义分区器。
提高生产者吞吐量
@Testvoidtestproducer(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小 batch.size linger.ms 批次设置32k 延迟设置 5ms 两个合理设置 等5ms 处理
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);for(int i =0; i <5; i++){
kafkaProducer.send(newProducerRecord<>("message","hello world "+i));}// close
kafkaProducer.close();}
数据可靠性
acks=0,生产者发送过来数据就不管了,Leader一旦崩掉了,也没有办法。可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,如果应答完,Leader还没同步给Follower副本就挂了,此时新的leader就会产生,新的Leader就没有办法收到原数据(因为生产者已经认为发送成功了)。可靠性中等,效率中等;
-1(all):生产者发送过来的数据,Leader+isr队列里面的所有收齐数据后应答。-1和all等价
@Testvoidtestproducer(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小 batch.size linger.ms 批次设置32k 延迟设置 5ms 两个合理设置 等5ms 处理
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy//----
properties.put(ProducerConfig.ACKS_CONFIG,"1");// acks 数据可靠性 default all
properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数 default max(int)//---// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);for(int i =0; i <5; i++){
kafkaProducer.send(newProducerRecord<>("message","hello world "+i));}// close
kafkaProducer.close();}
幂等性
生产者不论向Broker发送多少次重复数据,Broker端都只会持久化一次,保证了不重复。(幂等性默认开启,只保证单分区单会话内不重复,kafka挂掉再重启还是会产生重复数据)
生产者事务
开启事务必须开启幂等性。(!必须指定事务的id,ack=all)第五条消息发送失败,终止了。
@Testvoidtest(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小 batch.size linger.ms 批次设置32k 延迟设置 5ms 两个合理设置 等5ms 处理
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy//----
properties.put(ProducerConfig.ACKS_CONFIG,"all");// acks 数据可靠性 default all
properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数 default max(int)//---// 必须指定事务id 否则失败 事务id任意取 只要保证全局唯一即可
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 初始化 即初始化事务
kafkaProducer.initTransactions();// 开启事务
kafkaProducer.beginTransaction();try{for(int i =0; i <5; i++){
kafkaProducer.send(newProducerRecord<>("message","hello world "+i));if(i==4){int j=1/0;}}
kafkaProducer.commitTransaction();}catch(ProducerFencedException e){
kafkaProducer.abortTransaction();}finally{// close
kafkaProducer.close();}}
消费者
一个消费者去消费某个主题的数据
dockerexec-it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 生产者 生产消息
./kafka-console-producer.sh --broker-list 192.168.124.8:9092 --topic message
生产消息。
publicstaticvoidmain(String[] args){Properties properties=newProperties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);
consumer.subscribe(Arrays.asList("message"));while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
consumerRecords.forEach(data->{System.out.println(data);});}}
消费者消费一个分区
使用生产者对某个分区生产数据
@TestvoiduserPortitionsSend(){Properties properties =newProperties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);/*
默认的分区规则 DefaultPartitioner
指定发送到哪个分区 0 后面有个key 空即可
*/
kafkaProducer.send(newProducerRecord<>("message",2,"","hello world exec callback3"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
topic :message
分区partition :2
*/}else{// 打印异常信息
exception.printStackTrace();}}));
kafkaProducer.close();}
针对特定分区进行消费
@TestvoidconsumerOnePartition(){Properties properties=newProperties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);ArrayList<TopicPartition> topicPartitions =newArrayList<>();
topicPartitions.add(newTopicPartition("message",2));// 订阅主题对应的分区
consumer.assign(topicPartitions);while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
consumerRecords.forEach(data->{System.out.println(data);});}}
offset
kafka默认自动提交offest 默认5s提交一次。
手动提交offest
1、同步提交(commitSync)必须等待offest提交完毕,再去消费下一批数据
2、异步提交(commitAsync)发送完提交offest请求后,就开始消费下一批数据了。
手动提交
@TestvoidcommitCustom(){Properties properties=newProperties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");// 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);ArrayList<TopicPartition> topicPartitions =newArrayList<>();
topicPartitions.add(newTopicPartition("message",2));// 订阅主题对应的分区
consumer.assign(topicPartitions);while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
consumerRecords.forEach(data->{System.out.println(data);});// 手动提交 同步提交
consumer.commitSync();// 异步提交//consumer.commitAsync();}}
版权归原作者 Nuyoahll-_-ll 所有, 如有侵权,请联系我们删除。