0


Kafka

一、简介

  • Kafka是最初由Linkedin公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由Scala和Java编写,(也当做MQ系统,但不是纯粹的消息系统),一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。比如 网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域
  • 特点- 高吞吐量、低延迟: 每秒可以处理几十万条消息- 高并发:几千个客户端同时读写- 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败- 扩展性强:支持热扩展
  • 核心概念- Broker- Kafka的服务端程序,可以认为一个mq节点就是一个broker- broker存储topic的数据- Producer生产者- 创建消息Message,然后发布到MQ中- 该角色将消息发布到Kafka的topic中- Consumer消费者:- 消费队列里面的消息- ConsumerGroup消费者组- 同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息- Topic- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思- Partition分区- kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的- 一个Topic的多个partitions, 被分布在kafka集群中的多个server上- 消费者数量 <=小于或者等于Partition数量- Replication 副本(备胎)- 同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务- 默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定- 如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错- ReplicationLeader、ReplicationFollower- Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互- ReplicationFollower只是做一个备份,从replicationLeader进行同步- ReplicationManager- 负责Broker所有分区副本信息,Replication 副本状态切换- offset- 每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset- kafka把offset保存在消费端的消费者组里

二、安装部署

  • 需要的软件和环境版本说明- kafka-xx-yy- xx 是scala版本,yy是kafka版本(scala是基于jdk开发,需要安装jdk环境)- 下载地址:Apache Kafka- zookeeper- Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册- 下载地址:Apache ZooKeeper- jdk1.8
  • 安装步骤- 安装jdk#解压并重命名tar -zxvf jdk-8u181-linux-x64.tar.gz#编辑配置vim /etc/profile#配置以下信息JAVA_HOME=/usr/local/software/jdk1.8CLASSPATH=$JAVA_HOME/lib/PATH=$PATH:$JAVA_HOME/binexport PATH JAVA_HOME CLASSPATH#配置完毕立刻生效source /etc/profile#查看jdk安装成功与否java -version- 安装zookeeper 默认配置文件 zoo.cfg启动zkbin/zkServer.sh start- 安装kafka#修改配置 config目录下 server.properties#标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同broker.id=0#修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)listeners(内网Ip)advertised.listeners(公网ip)#修改zk地址,默认地址zookeeper.connection=localhost:2181#启动./kafka-server-start.sh ../config/server.properties &#停止kafka-server-stop.sh#守护进程启动./kafka-server-start.sh -daemon ../config/server.properties &

三、linux 命令行执行发送和消费

创建topic

#kafka bin目录下 副本1 分片2 主题test-topic 
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic test-topic

查看topic

#kafka bin目录下
./kafka-topics.sh --list --zookeeper 112.74.55.160:2181

生产者发送消息

#kafka bin目录下
./kafka-console-producer.sh --broker-list 112.74.55.160:9092  --topic test-topic

消费者消费消息

#kafka bin目录下  --from-beginning从头看是消费  --consumer.config指定配置文件(配置文件内可设置分组)
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic test-topic --consumer.config ../config/consumer.properties

删除topic

#kafka bin目录下
./kafka-topics.sh --zookeeper 112.74.55.160:2181 --delete --topic test-topic

查看broker节点topic状态信息

#kafka bin目录下
./kafka-topics.sh --describe --zookeeper 112.74.55.160:2181  --topic test-topic

四、kafka数据存储流程以及原理和LEO+HW

  • Partition- topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列- 是以文件夹的形式存储在具体Broker本机上

  • LEO(LogEndOffset)- 表示每个partition的log最后一条Message的位置。

  • HW(HighWatermark)- 表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置- HW之前的数据才是Commit后的,对消费者才可见- ISR集合里面最小leo

  • offset:- 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中- partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息- 可以认为offset是partition中Message的id

  • Segment:每个partition又由多个segment file组成;- segment file 由2部分组成,分别为index file和data file(log file),- 两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件- 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1-

  • Kafka高效文件存储设计特点:- Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。- 通过索引信息可以快速定位message- producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S

五、简单生产和消费以及常见策略

  • 生产者- 生产者到broker发送流程- Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch里面,再一次性发送到Broker上去的,这样性能才可能题高- broker到具体分片策略- 如果指定Partition ID,则PR(ProducerRecord)被发送至指定Partition - 如果未指定Partition ID,但指定了Key, PR会按照hash(key)发送至对应Partition- 如果未指定Partition ID也没指定Key,PR会按照默认 round-robin轮训模式发送到每个Partition- 如果同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)- 注意:Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
  • 消费者- 消费者根据什么模式从broker获取数据的- 消费者采用 pull 拉取方式,从broker的partition获取数据- pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样,如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回- 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。- 消费者对应主题分区关系- 一个 topic 有多个 partition,所以有多个partition leader,一个partition leader可以由一个消费者组中的一个消费者进行消费- 消费者消费分区拉取策略- round-robin (RoundRobinAssignor非默认策略)轮训- 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6- c-1: topic-p0/topic-p2/topic-p4/topic-p6- c-2:topic-p1/topic-p3/topic-p5- 弊端- 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀- 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2- t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))- 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2- range (RangeAssignor默认策略)范围- 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6- c-1: topic-p0/topic-p1/topic-p2/topic-p3- c-2:topic-p4/topic-p5/topic-p6- 弊端- 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大- 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降- 消费者重新分配策略- Rebalance- kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态- 70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,分区和消费者发生变化时都会发生rebalance操- offset维护机制- 消费者会记录offset,consumer故障恢复后会从当前offset继续消费,而这个offset会记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是__consumer_offsets-(0-49)- 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置- 通过消费者组groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中- 由消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值,三元组:group.id+topic+分区号,而 value 就是 offset 的值
  • 简单调用#依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> </dependencies>``````package com.wsl.kafka;import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.TopicPartition;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.time.Duration;import java.time.LocalDateTime;import java.util.*;import java.util.concurrent.ExecutionException;/** * description:kafka 简单实操 * * @author wsl * @since: 2023-12-24 * @version: 1.0 */@SpringBootTest(classes = KafkaApplication.class)@RunWith(SpringRunner.class)public class KafkaTest { public static final String TOPIC_NAME = "topic1"; public static final String brokerIpPort = "114.132.79.236:9092"; /** * 设置admin 客户端 * @return */ public static AdminClient initAdminClient() { Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIpPort); AdminClient adminClient = AdminClient.create(properties); return adminClient; } /** * 创建主题 */ @Test public void createTopic() { AdminClient adminClient = initAdminClient(); // 2个分区,1个副本 NewTopic newTopic = new NewTopic(TOPIC_NAME, 2, (short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); //future等待创建,成功不会有任何报错,如果创建失败和超时会报错。 try { createTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("创建新的topic"); } //列表 @Test public void listTopic() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); //是否查看内部的topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopics = adminClient.listTopics(options); Set<String> topics = listTopics.names().get(); for (String topic : topics) { System.err.println(topic); } } //删除 @Test public void delTopicTest() { AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic1")); try { deleteTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } } //获取指定topic的详细信息 @Test public void getTopicInfo() throws Exception { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue())); } /** * 增加分区数量 * * 如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性 * * 注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理 * * @throws Exception */ @Test public void incrPartitionsTest() throws Exception{ Map<String, NewPartitions> infoMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(5); AdminClient adminClient = initAdminClient(); infoMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); } /** * =======================生产者================================ */ //生产者配置 public static Properties getProducerProperties(){ Properties props = new Properties(); //指定自定义分区策略 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.wsl.kafka.CustomPartitioner"); props.put("bootstrap.servers", brokerIpPort); //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIpPort); //当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。 props.put("acks", "all"); //props.put(ProducerConfig.ACKS_CONFIG, "all"); //请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性 props.put("retries", 0); //props.put(ProducerConfig.RETRIES_CONFIG, 0); //生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB props.put("batch.size", 16384); /** * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满 * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求 * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送 */ props.put("linger.ms", 10); /** * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。 * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器 * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了 * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整 * 需要结合实际业务情况压测进行配置 */ props.put("buffer.memory", 33554432); /** * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置, * 即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类, * 将key序列化成字节数组。 */ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; } /** * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回 * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合 * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack * 发送消息后返回的一个 Future 对象,调用get即可 * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程 * 1)main线程发送消息到RecordAccumulator即返回 * 2)sender线程从RecordAccumulator拉取信息发送到broker * 3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数 */ @Test public void testSend(){ Properties props = getProducerProperties(); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 1; i < 3; i++){ //1.这种是同步阻塞返回 future.get() //该ProducerRecord构造方法可以指定分区等多种传参方式// Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "key"+i, "value"+i));// try {// RecordMetadata recordMetadata = future.get();//不关心是否发送成功,则不需要这行// System.out.println("发送状态:"+recordMetadata.toString());// } catch (InterruptedException e) {// e.printStackTrace();// } catch (ExecutionException e) {// e.printStackTrace();// } //2.这种是带回调确认是否发送成功 producer.send(new ProducerRecord<>(TOPIC_NAME, "xxx", "value" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //exception为空则是发送成功 if (exception == null) { System.out.println("发送状态:"+metadata.toString()); } else { exception.printStackTrace(); } } }); System.out.println(i+"发送:"+ LocalDateTime.now().toString()); } producer.close(); } /** * =====================================消费者============================= */ //消费者配置 public static Properties getConsumerProperties() { Properties props = new Properties(); //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest"); //broker地址 props.put("bootstrap.servers", brokerIpPort); //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 props.put("group.id", "consumer-0"); //开启自动提交offset true开始 false关闭 //一旦开启就很难确认是否真的被消费成功后提交,也有可能消费失败也提交了,所以一般关闭,都是手动提交// props.put("enable.auto.commit", "true"); //自动提交offset延迟时间 自动提交关闭了这个也不用配// props.put("auto.commit.interval.ms", "1000"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } /** * 消费 */ @Test public void simpleConsumerTest(){ Properties props = getConsumerProperties(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { //拉取时间控制,阻塞超时时间 一次拉取多条 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value()); } if(!records.isEmpty()){ //同步阻塞提交offset 失败了会自动重试 一般不使用// consumer.commitSync(); //异步提交offset consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception == null){ //异常为空说明手动提交成功了 System.err.println("手动提交offset成功:" + offsets.toString()); } else { //提交失败 System.err.println("手动提交offset失败:" + offsets.toString()); } } }); } } }}

六、数据的投递+存储可靠性和副本数据一致性

  • ACK 确保消息投递可靠和多副本确保数据存储可靠- 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,ack有3个可选值,分别是0, 1,all。- ack=0- producer发送一次就不再发送了,不管是否发送成功,发送出去的消息还在半路,或者还没写入磁盘, Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失- ack=1(默认)- 问题:万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了就消息丢失- 只要Partition Leader接收到消息而且写入【本地磁盘】,就认为成功了,不管他其他的Follower有没有同步过去这条消息了- ack= all(即-1)- producer只有收到分区内所有副本的成功写入全部落盘的通知才认为推送消息成功- 备注:leader会维持一个与其保持同步的replica集合,该集合就是ISR,leader副本也在isr里面- 但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复,也就是数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复,幂等消费得处理好- acks=all 也不代表数据一定不会丢失,当Partition只有一个副本,也就是一个Leader,任何Follower都没有,接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用- 在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数设定 ISR中的最小副本数是多少,默认值为1,改为 >=2,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常- ISR (in-sync replica set )- Partition Leader 发生故障之后,就会从 ISR 中选举新的 Partition Leader。- 如果Partition follower长时间(replica.lag.time.max.ms) 未向leader同步数据,则该Partition Follower将被踢出ISR移入OSR- Partition leader 保持同步的 Partition Follower 集合, 当 ISR 中的Partition Follower 完成数据的同步之后,就会给 leader 发送 ack- leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护, 要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功- OSR (out-of-sync-replica set)- 与leader副本分区同步滞后过多的副本集合- AR(Assign Replicas)- 分区中所有副本统称为AR
  • HighWatermark 最小LEO确保多副本间数据一致性- Follower故障- Follower发生故障后会被临时踢出ISR(动态变化),待该follower恢复后,follower会读取本地的磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的hw,即follower追上leader后,就可以重新加入ISR- Leader故障- Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉(新leader自己不会截掉),然后从新的leader同步数据

七、kafka日志清理

  • Kafka将数据持久化到了硬盘上,为了控制磁盘容量,需要对过去的消息进行清理,清理策略大致分两种,删除策略和压缩策略
  • 内部有个定时任务检测删除日志,默认是5分钟 log.retention.check.interval.ms,可在配置文件中配置- 启用cleaner- log.cleaner.enable=true- log.cleaner.threads = 2 (清理线程数配置)
  • 删除策略#指定删除策略log.cleanup.policy=delete#清理超过指定时间的消息,默认是168小时,7天,#还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级高到低log.retention.hours=168​#超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制log.retention.bytes=1073741824#log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除- 时间删除 - 配置了7天后删除,那7天如何确定呢?​每个日志段文件都维护一个最大时间戳字段,每次日志段写入新的消息时,都会更新该字段,一个日志段segment写满了被切分之后,就不再接收任何新的消息,最大时间戳字段的值也将保持不变,​kafka通过将当前时间与该最大时间戳字段进行比较,从而来判定是否过期- 大小阈值 - 假设日志段大小是500MB,当前分区共有4个日志段文件,大小分别是500MB,500MB,500MB和10MB,10MB那个文件就是active日志段。此时该分区总的日志大小是3*500MB+10MB=1500MB+10MB,如果阈值设置为1500MB,那么超出阈值的部分就是10MB,小于日志段大小500MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;如果阈值设置为1000MB,那么超过阈值的部分就是500MB+10MB > 500MB,此时Kafka会删除最老的那个日志段文件 注意:超过阈值的部分必须要大于一个日志段的大小
  • 压缩策略- log.cleanup.policy=compact 启用压缩策略- 按照消息key进行整理,有相同key不同value值,只保留最后一个

八、高性能原理之零拷贝

  • 零拷贝ZeroCopy(SendFile)- 例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)- File文件的经历了4次copy- 调用read,将文件拷贝到了kernel内核态- CPU控制 kernel态的数据copy到用户态- 调用write时,user态下的内容会copy到内核态的socket的buffer中- 最后将内核态socket buffer的数据copy到网卡设备中传送- 缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)- - ZeroCopy:- 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升- 对应零拷贝技术有mmap及sendfile- mmap:小文件传输快- sendfile:大文件传输比mmap快- 应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术
  • 高性能主要包含以下几点- 存储模型,topic多分区,每个分区多segment段- index索引文件查找,利用分段和稀疏索引- 磁盘顺序写入- 异步操作少阻塞sender和main线程,批量操作(batch缓存)- 页缓存Page cache,没利用JVM内存,因为容易GC影响性能- 零拷贝ZeroCopy(SendFile)

九、spring boot+kafka

依赖

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0-RC1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wsl</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

        <!--<dependency>-->
        <!--<groupId>org.apache.kafka</groupId>-->
        <!--<artifactId>kafka-clients</artifactId>-->
        <!--<version>2.4.0</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

配置

server:
  port: 8080
spring:
  application:
    name: kafka-service
  kafka:
    bootstrap-servers: 114.132.79.236:9092
    producer:
      # 消息重发的次数。配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      #事务id
      transaction-id-prefix: prefix-

    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 4
      #listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      #避免出现主题未创建报错
      missing-topics-fatal: false

代码

package com.wsl.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;

/**
 * description:kafka 简单实操
 *
 * @author wsl
 * @since: 2023-12-24
 * @version: 1.0
 */
@SpringBootTest(classes = KafkaApplication.class)
@RunWith(SpringRunner.class)
public class SpringKafkaTest {
    public static final String TOPIC_NAME = "topic1";
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Test
    public void test(){
        kafkaTemplate.send(TOPIC_NAME, "测试数据").addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }

    /**
     * 0.11版本开始加入事务
     * 注解方式的事务
     */

    @Test
    @Transactional(rollbackFor = Exception.class)
    public void sendMessage1() {
        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1  i="+1);
        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2  i="+1);
    }
    /**
     * 声明式事务支持
     */
    @Test
    public void sendMessage2() {
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
            @Override
            public Object doInOperations(KafkaOperations kafkaOperations) {
                kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1  i="+1);
//                int i = 1/0;
                kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2  i="+1);
                return true;
            }
        });
    }
}

/**
* 监听
*/
package com.wsl.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQListener {
    /**
     *  消费监听
     * @param record
     */
    @KafkaListener(topics = {"topic1"},groupId = "consumer-1")
    public void onMessage1(ConsumerRecord<?, ?> record,
                           Acknowledgment ack,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        // 打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        ack.acknowledge();
    }
}
标签: kafka 分布式

本文转载自: https://blog.csdn.net/Stitch12/article/details/135170359
版权归原作者 sl_w 所有, 如有侵权,请联系我们删除。

“Kafka”的评论:

还没有评论