0


Kafka学习-Java使用Kafka

文章目录


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即

Kafka

。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即

offset

,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个

topic

,生产者按消息的类型投递到不同的

topic

中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个

partition

分区,每个消费者负责一个

partition

在这里插入图片描述

3、高扩展

broker

随着

partition

过多,所有的

partition

都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将

partition

分散部署在不同的机器上。每台机器就代表一个

broker


我们可以增加

broker

来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个

broker

挂了, 那么其中

partition

中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给

partition

多加几个副本,统称

replicas

,并将它们分为

leader

follower

leader

负责生产者和消费者的读写,

follower

只负责同步

leader

的数据。假如

leader

挂了,也不会影响

follower

,随后在

follower

中选出一个

leader

,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了

leader

挂掉的情况,如果所有的

broker

都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的

offset

接着消费,如果我想从某个

offset

开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了

Zookeeper

组件,它会定期与

broker

通信,获取

Kafka

集群的状态,判断哪些

broker

挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2、生产者

publicstaticvoidmain(String[] args)throwsInterruptedException{Properties prop =newProperties();

    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.ACKS_CONFIG,"all");
    prop.put(ProducerConfig.RETRIES_CONFIG,0);
    prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
    prop.put(ProducerConfig.LINGER_MS_CONFIG,1);
    prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);String topic ="hello";KafkaProducer<String,String> producer =newKafkaProducer<>(prop);for(int i =0; i <100; i++){
        producer.send(newProducerRecord<String,String>(topic,Integer.toString(2),"hello kafka"+ i));System.out.println("生产消息:"+ i);Thread.sleep(1000);}
    producer.close();}

3、消费者

publicstaticvoidmain(String[] args){Properties prop =newProperties();

    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.GROUP_ID_CONFIG,"con-1");// 消费者组
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//自动提交偏移量
    prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//自动提交时间KafkaConsumer<String,String> consumer =newKafkaConsumer<>(prop);ArrayList<String> topics =newArrayList<>();//可以订阅多个消息
    topics.add("hello");
    consumer.subscribe(topics);try{while(true){ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(10));for(TopicPartition topicPartition : poll.partitions()){//    通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息List<ConsumerRecord<String,String>> partitionRecords = poll.records(topicPartition);//    获取TopicPartition对应的主题名称String topic = topicPartition.topic();//    获取TopicPartition对应的分区位置int partition = topicPartition.partition();//    获取当前TopicPartition下的消息条数int size = partitionRecords.size();System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",
                        topic,
                        partition,
                        size);for(int i =0; i < size; i++){ConsumerRecord<String,String> consumerRecord = partitionRecords.get(i);//    实际的数据内容String key = consumerRecord.key();//    实际的数据内容String value = consumerRecord.value();//    当前获取的消息偏移量long offset = consumerRecord.offset();//    表示下一次从什么位置(offset)拉取消息long commitOffser = offset +1;System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",
                            key, value, offset, commitOffser);Thread.sleep(1500);}}}}catch(Exception e){
        e.printStackTrace();}finally{
        consumer.close();}}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

标签: kafka 学习 java

本文转载自: https://blog.csdn.net/weixin_49832841/article/details/138672643
版权归原作者 李子木、 所有, 如有侵权,请联系我们删除。

“Kafka学习-Java使用Kafka”的评论:

还没有评论