0


Kafka内容分享(一):Kafka 基础知识

一、什么是Kafka

1.1 Kafka的定义

Kafka是一种分布式的消息系统,用于实现高可靠性、高吞吐量、低延迟的数据传输。可以把Kafka想象成一个邮局,生产者(相当于寄信人)把消息(信件)发给Kafka,消费者(相当于收信人)从Kafka中获取消息(信件)。这个过程可以实现多生产者、多消费者、多主题的消息传递。

例如,有一个电商网站,用户在线下单后,需要将订单信息通知到多个模块,如库存模块、物流模块、财务模块等。如果通过直接调用这些模块的接口进行订单通知,则每个接口都需要等待通知完成后才能继续执行,严重影响处理效率。而使用Kafka,则可以将订单信息发送到Kafka中,各个模块作为消费者从Kafka中获取订单信息,不会因为等待而阻塞。这样可以提高整个系统的处理能力和响应速度。

1.2 Kafka的特点

Kafka具有以下特点:

  • 高性能:Kafka能够提供每秒数百万级别的消息传输,可适应高吞吐量的数据处理场景。
  • 可扩展性:Kafka支持水平扩展,用户可以通过增加broker节点来提高Kafka的吞吐量和容错性能。
  • 可靠性:Kafka提供多副本备份机制,当某些节点故障时,可以自动进行副本切换,确保消息不会丢失。
  • 持久化:Kafka采用磁盘存储,可以长期保存消息,也可根据需要设置消息的保留时间或删除策略。
  • 灵活性和可定制性:Kafka提供各种配置选项,可以根据需要进行灵活定制。
  • 大数据生态系统集成:Kafka可以很好地集成到Hadoop、Spark、Storm等大数据处理系统中,提供数据源或目标的功能。

总之,Kafka是一种强大的、高效的、可扩展的、可靠的分布式消息系统,具有广泛的应用场景。

二、Kafka的架构

2.1 Kafka的组成部分

Kafka的主要组成部分包括生产者(producer)、消费者(consumer)、Broker、Topic、Partition和Offset等。

  • 生产者:用于产生消息并发送到Kafka的指定Topic,将消息写入Topic的一个或多个Partition中。
  • 消费者:用于从Kafka的指定Topic消费消息,并且可以控制对消息的读取速度。
  • Broker:Kafka集群中的一个节点,存储了Topic的分区数据副本。
  • Topic:是消息的逻辑分类,一个Topic可以被划分为多个Partition,每个Partition存储了一部分数据。
  • Partition:是物理存储单元,每个Partition都保存了一个有序的消息序列。
  • Offset:是消息在Partition中的偏移量,消费者可以通过指定Offset来读取指定位置的消息。

2.2 Kafka的工作流程

afka的工作流程分为两部分:发布和订阅。

  • 发布:生产者将消息发送到Kafka的指定Topic,Kafka根据Partition数选择合适的Partition存储消息。如果指定了Key,则消息会基于Key进行Hash,并且被写入到特定Partition中;如果没有指定Key,则消息会随机分布到所有Partition中。
  • 订阅:消费者从Kafka的指定Topic订阅消息,Kafka会将消息按照Partition顺序进行读取,并且确保每个Partition内的消息顺序不变。消费者可以在每个Partition内指定一个Offset来读取消息,也可以从最新的消息开始读取。

三、Kafka的使用场景

3.1 Kafka的典型应用场景

Kafka具有以下典型应用场景:

  • 日志收集:Kafka可用于日志文件的收集和处理,生产者将日志写入Kafka的Topic中,而消费者可以消费这些日志并将它们存储到Hadoop、Elasticsearch等系统中。
  • 消息队列:Kafka可作为异步消息传递的消息队列,生产者可以将消息放入Kafka的Topic中,而消费者则可以从该Topic中获取消息进行处理。
  • 流处理:Kafka可用于实时数据流处理,例如实时预测、实时监控、实时计算等。
  • 事件溯源:Kafka可用于事件溯源,将事件记录到Kafka Topic中,并创建事件流以跟踪数据的变化历史。

3.2 Kafka的优缺点

3.2.1 优点

  • 高性能:Kafka具有高吞吐量和低延迟,能够处理大量数据并保证高效的消息传输。
  • 可扩展性:Kafka可水平扩展,支持增加Broker节点来提高吞吐量和容错性能。
  • 可靠性:Kafka支持多副本备份机制,可以避免故障导致的数据丢失和服务不可用。
  • 持久化:Kafka采用磁盘存储,可以长期保存消息,也可根据需要设置消息的保留时间或删除策略。
  • 灵活性和可定制性:Kafka提供各种配置选项,可以根据需要进行灵活定制。
  • 大数据生态系统集成:Kafka可以很好地集成到Hadoop、Spark、Storm等大数据处理系统中,提供数据源或目标的功能。

3.2.2 缺点

  • 部署和维护成本高:Kafka需要部署在一个分布式环境中,对于初学者来说,安装、配置和维护是一个挑战。
  • 系统复杂度高:Kafka的系统设计和架构比较复杂,需要理解其概念和原理才能正确使用。
  • 数据一致性问题:由于存在多个副本,可能会存在数据一致性的问题,需要进行合理的配置和管理。

四、Kafka的安装与配置

4.1 Kafka的下载与准备

Windows平台

  1. 访问Kafka官网:Apache Kafka
  2. 选择合适的版本进行下载
  3. 解压文件到指定目录,如D:/kafka/
  4. 下载并安装Java运行环境

Linux/Mac平台

在终端中执行以下命令下载Kafka最新版本:

Codewget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz

解压文件到指定目录,如/opt/kafka/:

Codetar -xzf kafka_2.13-3.0.0.tgz
mv kafka_2.13-3.0.0 /opt/kafka

安装Java运行环境

4.2 Kafka的安装与启动

Windows平台

  1. 打开cmd窗口,进入Kafka解压目录的bin文件夹
  2. 执行以下命令启动Kafka服务器:
 Code.\windows\bin\windows\kafka-server-start.bat .\config\server.properties

Linux/Mac平台

启动Zookeeper服务

Codecd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

打开新的终端窗口,进入Kafka解压目录的bin文件夹

执行以下命令启动Kafka服务器:

 Codecd /opt/kafka
bin/kafka-server-start.sh config/server.properties

4.3 Kafka的配置文件解析

Kafka的配置文件主要包含以下参数:

  • broker.id:Broker节点的唯一标识。
  • listeners:监听器列表,用于指定Broker节点所提供的服务和端口号。
  • log.dirs:日志存储路径,用于存放Kafka的日志数据。
  • zookeeper.connect:Zookeeper服务器的地址和端口号。
  • advertised.listeners:外部访问Broker节点的地址和端口号,用于跨机器的通信。

示例:

 Codebroker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
advertised.listeners=PLAINTEXT://your_hostname:9092

五、Kafka的基本操作

Kafka是一种高性能、分布式的消息中间件。它能够处理大量的数据流,并将其发送给订阅者。以下是Kafka的基本操作步骤:

1.安装和启动Kafka

首先,需要从Kafka官网下载Kafka二进制文件,然后解压到指定目录。使用命令行进入Kafka目录,输入以下命令启动Kafka:

 Codebin/kafka-server-start.sh config/server.properties

2.创建主题

在Kafka中,消息是通过主题来进行分类的。所以在开始发送和接收消息之前,需要先创建主题。下面是创建主题的示例命令:

 Codebin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic

其中,--replication-factor指定主题的复制因子,--partitions指定主题的分区数。

3.发送消息

使用Kafka提供的命令行工具kafka-console-producer.sh来发送消息。示例命令如下:

 Codebin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

在该命令中,--broker-list指定Kafka服务器的地址,--topic指定消息的主题。在控制台输入消息内容,按Enter键即可发送消息。

4.接收消息

使用Kafka提供的命令行工具kafka-console-consumer.sh来接收消息。示例命令如下:

Codebin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

在该命令中,--bootstrap-server指定Kafka服务器的地址,--topic指定要接收的消息主题。--from-beginning表示从最初开始接收消息。

5.删除主题

使用命令行工具kafka-topics.sh来删除主题。示例命令如下:

Codebin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic my_topic

在该命令中,--delete指定要删除主题,--zookeeper指定Zookeeper的地址和端口。

6.列出所有主题

使用命令行工具kafka-topics.sh来列出所有主题。示例命令如下:

Codebin/kafka-topics.sh --list --zookeeper localhost:2181

六、Kafka的高级操作

6.1 消息的生产与消费保障

在生产和消费消息时,可以通过指定acks参数和retry机制来保证消息的可靠性。

  • acks:用于指定需要等待多少个副本成功写入数据后,生产者才会认为消息发送成功。
  • retries:用于指定生产者在发送消息时失败后的重试次数。

以下是Java代码示例:

CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
Producer<String, String> producer = new KafkaProducer<>(props);
​
String topic = "test";
String key = "key1";
String value = "hello world";
​
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
​
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
​
producer.close();

6.2 消息的持久化

Kafka的消息是持久化到磁盘上的,可以设置消息的过期时间来自动清理过期消息。在配置文件server.properties中,可以设置以下参数:

  • log.retention.hours:消息保存的时间,默认为168小时(7天)。
  • log.segment.bytes:每个日志段的大小,默认为1GB。
  • log.cleanup.policy:指定日志清理策略。

以下是Java代码示例:

CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
Producer<String, String> producer = new KafkaProducer<>(props);
​
String topic = "test";
String key = "key1";
String value = "hello world";
​
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
​
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
        } else {
            exception.printStackTrace();
        }
    }
});
​
producer.close();

6.3 Kafka的集群模式

Kafka支持分布式的集群模式,可以通过添加Broker节点来提高集群的吞吐量和容错性能。在配置文件server.properties中,可以设置以下参数:

  • broker.id:每个Kafka节点都需要唯一的Broker ID,这个ID需要在集群中是唯一的。
  • listeners:Kafka的监听器,用于处理客户端和Broker之间的网络通信。默认为PLAINTEXT协议,也可以配置其他安全协议如SSL。
  • log.dirs:Kafka的日志目录,用于存储Topic的消息数据。
  • zookeeper.connect:Zookeeper的连接参数,用于集群管理。

以下是Java代码示例:

CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
Producer<String, String> producer = new KafkaProducer<>(props);
​
String topic = "test";
String key = "key1";
String value = "hello world";
​
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
​
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
        } else {
            exception.printStackTrace();
        }
    }
});
​
producer.close();

6.4 数据迁移及主题管理

Kafka支持对现有的Topic进行数据迁移和拆分,以及删除和创建Topic。在Kafka集群中,可以使用命令行工具kafka-topics来执行这些操作。

以下是Java代码示例:

CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
​
AdminClient client = AdminClient.create(props);
​
String topic = "test";
​
NewTopic newTopic = new NewTopic(topic, 3, (short)2);
client.createTopics(Collections.singleton(newTopic));
​
DescribeTopicsResult result = client.describeTopics(Collections.singleton(topic));
Map<String, TopicDescription> topicDescriptions = result.all().get();
​
for (TopicDescription topicDescription : topicDescriptions.values()) {
    System.out.println("Topic: " + topicDescription.name() + ", Partitions: " + topicDescription.partitions().size());
}
​
client.close();

6.5 Broker间数据同步

Kafka支持集群环境下的Broker间数据同步。当一个Broker接收到消息后,它会将消息保存到本地磁盘上,并将消息的副本发送给其他Broker。这样,即使某个Broker出现故障,其他Broker仍然可以提供数据服务。

Kafka中的数据同步是通过Zookeeper来协调的。当Broker加入或退出Kafka集群时,Zookeeper会负责通知集群中的其他Broker,从而实现数据同步。

为了确保数据同步效果,Kafka在默认情况下会将消息的副本发送到多个Broker上。这里的复制因子可以通过配置文件来修改,默认值为1。

6.6 消息过期时间

在实际应用中,我们可能需要对消息进行过期处理,以避免消息堆积导致系统资源浪费。Kafka提供了消息过期时间的功能,在消息发送时可以指定消息过期时间,Kafka会自动删除已经过期的消息。

Kafka中消息过期时间是通过在Producer端设置消息的timestamp属性实现的。具体来说,可以通过以下两种方式设置消息过期时间:

  • 在Producer端设置消息过期时间 javaCopy Codelong currentTime = System.currentTimeMillis(); // 获取当前时间 long expireTime = currentTime + expirationTime; // 计算消息过期时间 ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", messageKey, messageValue); record.timestamp(expireTime); // 设置消息的过期时间 producer.send(record);
  • 在Kafka Broker端设置全局的消息过期时间 在Kafka配置文件中添加如下配置: Copy Codelog.retention.hours=24 上述配置表示Kafka Broker会自动删除24小时之前的消息。

6.7 消费者组

在实际应用中,我们通常需要将同一组消费者同时消费同一个主题下的消息。这时就需要使用消费者组来协调消息的消费情况。

Kafka消费者组是由多个消费者实例组成的,每个消费者实例都负责消费主题下的一部分消息。当一个新消费者加入消费者组时,它会接收到尚未被消费的消息。当某个消费者出现故障时,Kafka会自动将该消费者负责消费的消息分配给其他消费者。

Kafka中的消费者组是通过使用相同的group.id来关联一组消费者实现的。例如,以下代码创建了一个名为my_consumer_group的消费者组:

 CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_consumer_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

6.8 消息过滤

在实际应用中,我们可能需要对消息进行过滤,只选择部分消息进行消费。Kafka提供了多种方式来实现消息过滤,包括按主题、按消息头、按消息内容等。

以下是按主题过滤消息的示例代码:

 CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_consumer_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my_topic"));

上述代码中,使用subscribe方法订阅名为my_topic的主题,该主题下的所有消息都会被消费。

6.9 自定义分区器

在Kafka中,每个主题可以由多个分区组成。当Producer发送消息时,需要指定消息所属的分区。Kafka默认提供了一些分区策略,但是在实际应用中,我们可能需要自定义分区器来满足特定的业务需求。

自定义分区器需要实现org.apache.kafka.clients.producer.Partitioner接口。以下是一个自定义的分区器示例:

Codepublic class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partition = 0;
        if (key == null) {
            partition = ThreadLocalRandom.current().nextInt(numPartitions);
        } else {
            partition = Math.abs(key.hashCode() % numPartitions);
        }
        return partition;
    }
​
    @Override
    public void close() {
        // do nothing
    }
​
    @Override
    public void configure(Map<String, ?> configs) {
        // do nothing
    }
}

上述代码中,MyPartitioner类实现了partition方法,根据消息的key值计算出要发送到哪个分区。如果消息的key值为null,那么就随机选择一个分区。该分区器还可以通过configure方法进行配置,这里并没有进行任何配置。

七、Kafka的应用实践

7.1 实现高可用性、高并发度的日志系统

Kafka可以作为高可用性和高并发度的日志系统,采用以下方案:

  • 使用多个Broker组成Kafka集群,提供高并发度和高可用性。
  • 使用Zookeeper来进行集群管理和节点选举,防止单点故障。
  • 使用Producer将日志信息写入Kafka的Topic中,消费者可以从Topic中读取消息进行处理。
  • 使用Flume或Logstash等工具来实现日志的采集和预处理。

示例:

Java代码示例:使用Kafka Java客户端向名为“log”的主题发送一条日志消息;

 CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
Producer<String, String> producer = new KafkaProducer<>(props);
​
String topic = "log";
String key = "key1";
String value = "hello world";
​
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
​
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
        } else {
            exception.printStackTrace();
        }
    }
});
​
producer.close();

7.2 基于Kafka的数据传输与处理

Kafka可以作为数据传输和处理的框架,采用以下方案:

  • 使用Producer将数据写入Kafka的Topic中,消费者可以从Topic中读取数据并进行处理。
  • 使用Kafka Connect来将数据源连接到Kafka集群。
  • 使用Kafka Streams API来进行流处理。

示例:

Java代码示例:使用Kafka Java客户端向名为“data”的主题发送一条数据消息;

 CodeProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
Producer<String, String> producer = new KafkaProducer<>(props);
​
String topic = "data";
String key = "key1";
String value = "{ \"id\":1, \"name\":\"Alice\", \"age\":30 }";
​
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
​
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
        } else {
            exception.printStackTrace();
        }
    }
});
​
producer.close();

7.3 将Kafka集成到大数据处理系统中

将Kafka集成到大数据处理系统中可以采用以下方案:

  • 使用Hadoop MapReduce来读取和写入Kafka的数据。
  • 使用Apache Spark Streaming来进行实时流数据处理。
  • 使用Apache Flink来进行实时流数据处理。

示例:

Java代码示例:使用Apache Spark Streaming从名为“data”的主题读取数据,并进行词频统计;

CodeSparkConf conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
​
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
​
Collection<String> topics = Arrays.asList("data");
​
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
​
JavaDStream<String> lines = stream.map(ConsumerRecord::value);
​
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")).iterator());
​
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
​
wordCounts.print();
​
jssc.start();
jssc.awaitTermination();

7.4 将Kafka与SpringBoot集成

将Kafka与Spring Boot集成可以通过Spring Boot提供的spring-kafka模块来实现。以下是实现步骤:

7.4.1 在pom.xml文件中添加spring-kafka依赖:

xmlCopy Code<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
</dependency>

7.4.2创建Kafka配置类,配置Kafka属性:

Code@Configuration
@EnableKafka
public class KafkaConfig {
​
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
​
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
​
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
​
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

7.4.3 创建Kafka消息生产者类:

@Component
public class KafkaProducer {
​
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
​
    @Value("${kafka.topic}")
    private String topic;
​
    public void sendMessage(String message) {
        kafkaTemplate.send(topic, message);
    }
}

7.4.4 在application.properties文件中配置Kafka属性:

Codekafka.bootstrap-servers=localhost:9092
kafka.topic=my_topic

7.4.5 创建RESTful API控制器,通过KafkaProducer发送消息:

Code@RestController
@RequestMapping("/api")
public class MessageController {
​
    @Autowired
    private KafkaProducer kafkaProducer;
​
    @PostMapping("/message")
    public ResponseEntity<?> sendMessage(@RequestBody String message) {
        kafkaProducer.sendMessage(message);
        return new ResponseEntity<>(HttpStatus.OK);
    }
}

在Spring Boot应用程序中使用@EventListener注解监听Kafka消息:

 Code@Service
public class KafkaConsumer {
​
    @EventListener
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("Received message: " + record.value());
    }
}

7.4.6 示例中的代码仅仅是最基本的实现,可以在此基础上进一步拓展和优化。

将Kafka与Spring Boot集成可以通过Spring Boot提供的spring-kafka模块来实现。以下是实现步骤:

1.在pom.xml文件中添加spring-kafka依赖:

Code<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
</dependency>

2.创建Kafka配置类,配置Kafka属性:

 Code@Configuration
@EnableKafka
public class KafkaConfig {
​
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
​
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
​
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
​
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

3.创建Kafka消息生产者类:

Code@Component
public class KafkaProducer {
​
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
​
    @Value("${kafka.topic}")
    private String topic;
​
    public void sendMessage(String message) {
        kafkaTemplate.send(topic, message);
    }
}

4.在application.properties文件中配置Kafka属性:

Codekafka.bootstrap-servers=localhost:9092
kafka.topic=my_topic

5.创建RESTful API控制器,通过KafkaProducer发送消息:

 Code@RestController
@RequestMapping("/api")
public class MessageController {
​
    @Autowired
    private KafkaProducer kafkaProducer;
​
    @PostMapping("/message")
    public ResponseEntity<?> sendMessage(@RequestBody String message) {
        kafkaProducer.sendMessage(message);
        return new ResponseEntity<>(HttpStatus.OK);
    }
}

6.在Spring Boot应用程序中使用@EventListener注解监听Kafka消息:

Code@Service
public class KafkaConsumer {
​
    @EventListener
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("Received message: " + record.value());
    }
}

示例中的代码仅仅是最基本的实现,可以在此基础上进一步拓展和优化。

当将Kafka与Spring Boot集成时,需要特别注意以下几点:

  1. Kafka和Zookeeper的版本需要兼容,否则可能会导致无法连接Zookeeper或者其他的问题。
  2. 在Kafka生产者中使用KafkaTemplate发送消息时,需要注意设置Kafka主题(topic)的名称。
  3. 在Kafka消费者中使用@KafkaListener注解监听主题时,需要注意设置主题名称。
  4. 在Kafka消费者中使用@EventListener注解监听主题时,需要在配置类中添加KafkaListenerContainerFactory来实现对应的KafkaListenerEndpointRegistry。
  5. 在Kafka消费者中使用@EventListener注解监听主题时,需要让KafkaConsumer实现ApplicationListener接口,并指定泛型为KafkaEvent,监听KafkaEvent事件。

本文转载自: https://blog.csdn.net/qq_45038038/article/details/134867448
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。

“Kafka内容分享(一):Kafka 基础知识”的评论:

还没有评论