0


kafka学习笔记

  1. 主题Topic

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。
但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念。

2.分区Partition

每个topic都可以被划分成一个或者多个分区(至少有一个分区),它是topic物理上的分组,在创建topic的时候指定

一个Partition只对应一个Broker(即一台服务器),一个Broker可以管理多个Partition。

在一个分区内消息是顺序的,在不同的分区之间,kafka并不保证消息的顺序

同一个主题下,不同分区所包含的内容是不同的,每个消息被添加到分区当中时,会被分配一个偏移量(offset),它是消息在分区当中的唯一编号,kafka是通过offset来确保一个分区内的消息顺序的,offset的顺序并不跨越分区。

在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1

kafka常用命令

kafka-server-start.sh #进程启动
kafka-server-stop.sh #进程停止

kafka-topics.sh --create --zookeeper $zookeeper --replication-factor $replCount --partitions $partitionCount --topic $topicName

#指定副本数、pritition数创建topic

kafka-topics.sh --zookeeper $zkhost --alter --topic $topicName --partitions $partitionCount #扩容分区、只能扩,不能缩(涉及数据迁移、合并所以不支持))

kafka服务端常用配置

#唯一标识在集群中的ID,要求是正数。

broker.id=0

#服务端口,默认9092

port=9092

#监听地址,不设为所有地址

host.name=debugo01

#zookeeper连接地址

zookeeper.connect=1270.0.01:2181,127.0.0.2:2182/zookeeper

连接zk的超时时间

zookeeper.connection.timeout.ms=1000000

#日志存放目录,多个目录使用逗号分割

log.dirs=/var/log/kafka

 

kafka生产者代码示例

Map<String, Object> configs = new HashMap<>(); // 指定初始连接用到的 broker 地址 configs.put("bootstrap.servers", "127.0.0.1:9092"); // 指定 key 的序 configs.put("key.列化类serializer", IntegerSerializer.class); // 指定 value 的序列化类 configs.put("value.serializer", StringSerializer.class); configs.put("acks", "all"); configs.put("reties", "3"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);

详细说明:

atk:该选项控制着已发送消息的持久性。
acks=0 :生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1。
acks=1 :leader将记录写到它本地日志,就响应客户端确认消息,而不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。
acks=all :leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1]

retries:设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设置max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647]

batch.size生产者在发送消息时,可以将即将发往同一个分区的消息放在一个批次里,然后将这个批次整体进行发送,这样可以节约网络带宽,提升性能。该参数就是用来规约一个批次的大小的。但是生产者并不是说要等到一个批次装满之后,才会发送,不是这样的,有时候半满,甚至只有一个消息的时候,也可能会发送,具体怎么选择,我们不知道,但是不是说非要等装满才发。因此,如果把该参数调的比较大的话,是不会造成消息发送延迟的,但是会占用比较大的内存。但是如果设置的太小,会造成消息发送次数增加,会有额外的IO开销
linger.ms生产者在发送一个批次之前,可以适当的等一小会,这样可以让更多的消息加入到该批次。这样会造成延时增加,但是降低了IO开销,增加了吞吐量

kafka消费者代码示例:

Properties prop = new Properties(); prop.put("bootstrap.servers", "127.0.0.1:9092") // 指定消费者组 prop.put("group.id", "group1"); // 指定消费的key的反序列化方式 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定消费的value的反序列化方式 prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 得到Consumer实例 KafkaConsumer kafkaConsumer = new KafkaConsumer(prop); // 订阅topic kafkaConsumer.subscribe(Collections.singletonList("mytopic"));

group.id一个字符串用来指示一组consumer所在的组群。实现同一个topic可由不同的组群消费,必须指定

max.poll.interval.ms:拉取的最大时间间隔,如果你一次拉取的比较多,建议加大这个值,
长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了
max.poll.records**:**
默认值:
500,
Consumer每次调用poll()时取到的records的最大数。
其他说明:

kafka的幂等性是指在消息处理过程中,即使发生错误或重试操作,最终的结果也应该保持一致。Kafka通过引入ProducerID和Sequence Number来实现这一特性。

Producer ID是在每个新的Producer初始化和分配的一个唯一标识符,它对客户端使用者是不可见的。

Sequence Number则是为每个Producer ID及其对应的Topic和Partition分配的单调递增序列号。

在实际应用中,Producer在发送消息时会包含该Producer ID和当前的Sequence Number。这使得在任何异常中断或者重试之后,Kafka能够识别出哪些消息已经被处理过,从而避免重复处理。因此,无论在网络延迟或其他故障导致的重试期间,Kafka都能确保所有消息都被正确地处理且不会被重复存储或分发给消费者。

总结一下,Kafka的幂等性机制的核心在于:

Producer ID作为唯一的消息标识符。

Sequence Number用于确保每个消息被有序且唯一地处理。

这样的设计保证了Kafka能够在不同的网络条件下维持消息处理的可靠性,同时防止了因错误或重试而导致的数据不一致性

幂等性保证了数据的一致性,但还是会有可能造成数据的重复性,这时可以使用事务来解决

标签: kafka

本文转载自: https://blog.csdn.net/joke__/article/details/135231382
版权归原作者 joke–plus 所有, 如有侵权,请联系我们删除。

“kafka学习笔记”的评论:

还没有评论