0


kafka常见命令介绍和使用

首先通过这个命令什么也不加参数可以看到参数的详解

./kafka-topics.sh

创建一个topic基本参数
连接kafka : --zookeeper
操作一个topic : --topic
对一个topic进行什么样的操作?增–create删–delete改–alter查–describe
指定分区数:–partitions
指定副本个数:–replication-factor
1、创建一个test0主题并指定分区数1副本数1

./kafka-topics.sh  --zookeeper192.168.124.8:2181 --topic test0 --create --replication-factor 1--partitions1

2、查看都有哪些主题

./kafka-topics.sh --zookeeper192.168.124.8:2181 --list

3、查看主题test0的详细信息

./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --describe

4、修改分区为3 分区数只能增加不能减少!

./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --alter--partitions3

5、另外这里不能通过命令行的方式去修改副本

./kafka-topics.sh --zookeeper192.168.124.8:2181 --topic test0 --alter --replication-factor 3

6、发送消息到topic

./kafka-console-producer.sh --broker-list 192.168.124.8:9092--topic test0

7、消费者查看消息

# 增量消费数据,以前发送的不能读取到
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message 
# --from-beginning 读取历史消息
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning

主题创建

./kafka-topics.sh  --zookeeper192.168.124.8:2181 --topic message --create --replication-factor 1--partitions1

生产者

kafka生产者发送消息

添加依赖

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
</dependency>
// 简单发送数据@TestvoidSimpleSendData(){Properties properties =newProperties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");// 指定key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producer 我们写入 hello 的时候 没有key 实际key="" value="hello" 所以都是String 对应下面的K, VKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);//简单消息发送
        kafkaProducer.send(newProducerRecord<>("message","hello world "));// close
        kafkaProducer.close();}

进入容器消费者查看消息是否发送成功

dockerexec-it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 消费者 消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning

发现消息正常消费。

带有回调函数发送消息

@TestvoidtestProducerCallback(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 也可以定义一个类实现Callback接口
        kafkaProducer.send(newProducerRecord<>("message","hello world  exec callback"),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :0
                     */}else{// 打印异常信息
                    exception.printStackTrace();}}});// close
        kafkaProducer.close();}

lombda简化写法

@TestvoidtestProducerCallbacklombda(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 也可以定义一个类实现Callback接口
        kafkaProducer.send(newProducerRecord<>("message","hello world  exec callback2"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :0
                     */}else{// 打印异常信息
                exception.printStackTrace();}}));// close
        kafkaProducer.close();}

上述都是异步发送消息

同步发送 sync

调用 send() 方法,然后再调用 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,
如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它来查看消息记录。
指定分区发送

@TestvoiduserPortitionsSend(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);/*
         默认的分区规则  DefaultPartitioner
         指定发送到哪个分区 0 后面有个key 空即可
        */
        kafkaProducer.send(newProducerRecord<>("message",2,"","hello world  exec callback3"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :2
                     */}else{// 打印异常信息
                exception.printStackTrace();}}));
        kafkaProducer.close();}

指定key 按照key的哈希值 对分区取模 映射

        kafkaProducer.send(newProducerRecord<>("message","a","hello world  exec callback"),newCallback(){@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :0
                     */}}});

希望把订单表里的所有数据发送到 kafka 的某一个分区 ? 实现 只需在key上放上订单的表名字 —一定会发到一个分区上

自定义分区器

1、需求:实现一个分区器实现,发送过来的数据中如果包含zero就发送0号分区,不包含zero就发往1号分区。
2、定义类实现Partitioner接口
MyPartitioner.java

publicclassMyPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){// get dataString msgValue = value.toString();int partition;if(msgValue.contains("zero")){
            partition=0;}else{
            partition=1;}return partition;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}
@TestvoidcustomPartitionSend(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);
        kafkaProducer.send(newProducerRecord<>("message","hello world  exec callback"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :2
                     */}else{// 打印异常信息
                exception.printStackTrace();}}));// close
        kafkaProducer.close();}

上述方式实现了自定义分区器。

提高生产者吞吐量

@Testvoidtestproducer(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);for(int i =0; i <5; i++){
            kafkaProducer.send(newProducerRecord<>("message","hello world "+i));}// close
        kafkaProducer.close();}

数据可靠性

acks=0,生产者发送过来数据就不管了,Leader一旦崩掉了,也没有办法。可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,如果应答完,Leader还没同步给Follower副本就挂了,此时新的leader就会产生,新的Leader就没有办法收到原数据(因为生产者已经认为发送成功了)。可靠性中等,效率中等;
-1(all):生产者发送过来的数据,Leader+isr队列里面的所有收齐数据后应答。-1和all等价

@Testvoidtestproducer(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy//----
        properties.put(ProducerConfig.ACKS_CONFIG,"1");// acks 数据可靠性 default all
        properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数  default max(int)//---// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);for(int i =0; i <5; i++){
            kafkaProducer.send(newProducerRecord<>("message","hello world "+i));}// close
        kafkaProducer.close();}

幂等性

生产者不论向Broker发送多少次重复数据,Broker端都只会持久化一次,保证了不重复。(幂等性默认开启,只保证单分区单会话内不重复,kafka挂掉再重启还是会产生重复数据)
生产者事务
开启事务必须开启幂等性。(!必须指定事务的id,ack=all)第五条消息发送失败,终止了。

@Testvoidtest(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小//批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K//linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms//压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy//----
        properties.put(ProducerConfig.ACKS_CONFIG,"all");// acks 数据可靠性 default all
        properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数  default max(int)//---// 必须指定事务id 否则失败 事务id任意取 只要保证全局唯一即可
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 初始化 即初始化事务
        kafkaProducer.initTransactions();// 开启事务
        kafkaProducer.beginTransaction();try{for(int i =0; i <5; i++){
                kafkaProducer.send(newProducerRecord<>("message","hello world "+i));if(i==4){int j=1/0;}}
            kafkaProducer.commitTransaction();}catch(ProducerFencedException e){
            kafkaProducer.abortTransaction();}finally{// close
            kafkaProducer.close();}}

消费者

一个消费者去消费某个主题的数据

dockerexec-it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 生产者 生产消息
./kafka-console-producer.sh --broker-list 192.168.124.8:9092 --topic message

生产消息。

publicstaticvoidmain(String[] args){Properties properties=newProperties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);

        consumer.subscribe(Arrays.asList("message"));while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{System.out.println(data);});}}

消费者消费一个分区

使用生产者对某个分区生产数据

@TestvoiduserPortitionsSend(){Properties properties =newProperties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create producerKafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);/*
         默认的分区规则  DefaultPartitioner
         指定发送到哪个分区 0 后面有个key 空即可
        */
        kafkaProducer.send(newProducerRecord<>("message",2,"","hello world  exec callback3"),((metadata, exception)->{if(exception==null){// 没有异常发送成功System.out.println("topic :"+metadata.topic());System.out.println("分区partition :"+metadata.partition());/*
                        topic :message
                        分区partition :2
                     */}else{// 打印异常信息
                exception.printStackTrace();}}));
        kafkaProducer.close();}

针对特定分区进行消费

@TestvoidconsumerOnePartition(){Properties properties=newProperties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);ArrayList<TopicPartition> topicPartitions =newArrayList<>();
        topicPartitions.add(newTopicPartition("message",2));// 订阅主题对应的分区
        consumer.assign(topicPartitions);while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{System.out.println(data);});}}

offset

kafka默认自动提交offest 默认5s提交一次。
手动提交offest
1、同步提交(commitSync)必须等待offest提交完毕,再去消费下一批数据
2、异步提交(commitAsync)发送完提交offest请求后,就开始消费下一批数据了。
手动提交

@TestvoidcommitCustom(){Properties properties=newProperties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");// 手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);ArrayList<TopicPartition> topicPartitions =newArrayList<>();
        topicPartitions.add(newTopicPartition("message",2));// 订阅主题对应的分区
        consumer.assign(topicPartitions);while(true){ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{System.out.println(data);});// 手动提交 同步提交
            consumer.commitSync();// 异步提交//consumer.commitAsync();}}
标签: kafka

本文转载自: https://blog.csdn.net/qq_43566782/article/details/128667529
版权归原作者 Nuyoahll-_-ll 所有, 如有侵权,请联系我们删除。

“kafka常见命令介绍和使用”的评论:

还没有评论