0


Apache Kafka知识点表格总结

  之前的项目中用到RabbitMQ比较多,也有用到RocketMQ,,虽然项目中没有用到过Kafka,不过自己在空闲时间学习过,而且在面试中也会问到,因为还是有不少公司用到Kafka,所以做个总结,一方面是做为面试参考,还有就是以后项目如果用到,也可以作为实践参考。

 Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性、持久性、容错性等特点,并且能够高效地处理高负载数据。
** 1.Kafka 基本概念**

概念描述主题 (Topic)Kafka中消息的分类,类似于数据库中的表。记录 (Record)Kafka中消息的基本单位,包含键、值和时间戳。偏移量 (Offset)记录在日志中的位置,唯一标识一条记录。分区 (Partition)主题的子集,可以分布在不同的Broker上,用于实现负载均衡和并行处理。

2. Kafka 架构

组件描述示例生产者 (Producer)负责发送消息到Kafka集群的客户端。一个应用服务器,将用户活动数据发送到Kafka。消费者 (Consumer)负责从Kafka集群读取消息的客户端。一个数据分析系统,从Kafka读取数据进行实时分析。代理 (Broker)Kafka集群中的一个节点,负责维护主题和日志。一个运行中的Kafka实例,存储消息数据。Zookeeper集中管理集群的元数据,包括Broker状态、主题和分区信息。一个运行中的Zookeeper服务,协调Kafka集群的运行。主题 (Topic)Kafka中消息的分类,存储消息记录。如“user-activities”主题存储用户活动数据。分区 (Partition)主题的子集,可以分布在不同的Broker上,用于实现负载均衡。“user-activities”主题可能被分为多个分区以支持高吞吐量。

3.Kafka 数据持久化

概念描述示例日志 (Log)Kafka中存储消息的地方,由一系列有序的记录组成。Kafka集群中每个主题的每个分区都有一个日志。段文件 (Segment File)日志由多个段文件组成,每个段文件包含一系列的记录和索引。一个日志可能包含多个段文件,如

log.0、log.1

等。索引 (Index)帮助快速定位消息,每个段文件都有与之对应的索引文件。索引文件允许消费者快速定位到特定的消息偏移量。持久性 (Persistence)Kafka通过日志的持久化存储,确保消息不会丢失。Kafka配置允许设置日志的持久化策略,如保留时间或大小。清理策略 (Retention Policy)控制日志的存储时间或大小,以避免无限制增长。可以设置基于时间的保留策略,如保留7天的数据。压缩 (Compression)减少日志存储大小,提高存储效率。Kafka支持对日志段进行压缩,如使用Snappy或Gzip压缩算法。

4. Kafka 消息传递保证

保证类型描述如何实现可靠性 (Reliability)确保消息从生产者成功发送到Kafka,并且不会丢失。使用

acks

参数,设置为

all

以确保所有副本都已确认消息。持久性 (Durability)确保消息一旦被确认,就不会丢失,即使发生故障。Kafka将消息持久化到磁盘,并且通过副本机制保证数据不丢失。顺序性 (Ordering)确保消息在单个分区内的顺序性。通过为每个生产者分配一个唯一的序列号来保证消息顺序。消息确认 (Message Acknowledgment)生产者收到确认,表示消息已经成功发送。消费者通过自动提交或手动提交偏移量来确认消息。偏移量管理 (Offset Management)控制消息的消费进度。消费者可以手动管理偏移量,以控制从何处开始读取消息。副本 (Replication)提高消息的可用性和容错性。主题的每个分区都可以配置多个副本,分布在不同的Broker上。故障转移 (Failover)在领导者Broker故障时,自动将请求转移到另一个副本上。使用Zookeeper进行领导者选举,实现故障转移。

** 5.Kafka 消费者API**

API 类型描述特点高层次消费者 (High-Level Consumer)提供了更简单的API,自动管理偏移量和分区分配。

  • 自动提交偏移量

  • 消费者组管理

  • 简化的错误处理

  • 易于使用,适合大多数场景
    低层次消费者 (Low-Level Consumer)提供了更底层的控制,允许手动管理偏移量和分区分配。

  • 手动提交偏移量

  • 更细粒度的控制

  • 适合需要精细控制的场景
    消费者组 (Consumer Group)一组共享同一消费者组ID的消费者,共同消费主题中的所有消息。

  • 支持消息的广播和汇聚

  • 允许消费者并行消费

  • 保证消息至少被消费一次
    偏移量提交 (Offset Committing)控制消费者偏移量的提交,可以自动或手动。- 自动提交:消费者定期自动提交偏移量- 手动提交:消费者代码中显式提交偏移量再平衡 (Rebalance)当消费者组中的成员发生变化时,重新分配分区给消费者。

  • 确保所有分区都有活跃的消费者

  • 可能影响消息的实时性
    反序列化器 (Deserializer)用于将字节消息转换为特定类型的对象。

  • 支持自定义反序列化器

  • Kafka提供默认的字符串和字节反序列化器
    轮询 (Polling)消费者从Broker拉取消息的过程。

  • 通过轮询调用获取消息

  • 可以设置超时时间以控制轮询行为

6. Kafka 生产者API

API 类型描述特点高层次生产者 (High-Level Producer)提供了简单易用的API,支持异步发送和批处理。

  • 异步发送

  • 批处理优化性能

  • 自动管理消息确认

  • 易于实现高吞吐量
    低层次生产者 (Low-Level Producer)提供了底层的控制,允许手动管理消息的发送和确认。

  • 手动控制消息确认

  • 更细粒度的控制

  • 适合需要精细控制的场景
    同步发送 (Synchronous Send)生产者在发送消息后等待服务器的响应。

  • 确保消息发送成功

  • 可能影响吞吐量
    异步发送 (Asynchronous Send)生产者发送消息后立即返回,不等待服务器响应。

  • 提高吞吐量

  • 需要额外处理消息确认
    批处理 (Batching)生产者将多个消息打包在一起发送,减少网络请求。

  • 减少网络负载

  • 提高性能
    压缩 (Compression)减少发送到Broker的消息大小。

  • 支持Gzip、Snappy、LZ4压缩算法

  • 减少网络和存储使用
    确认机制 (Acknowledgment)控制消息确认的行为。

  • acks=0
    

    :不确认

acks=1

:领导者确认

-

acks=all

:所有副本确认
重试策略 (Retry Policy)定义生产者在发送失败时的重试行为。- 设置重试次数和重试间隔

7. Kafka 连接器和流处理 (Kafka Connect & Kafka Streams)

组件描述特点Kafka Connect用于连接Kafka与外部系统(如数据库、键值存储等)的框架。

  • 提供了无服务器的数据流传输

  • 支持自定义连接器开发

  • 可以自动化数据同步任务
    Kafka Streams用于构建流处理应用程序的库,提供了流处理的各种操作。

  • 支持状态流处理

  • 易于实现复杂流处理逻辑

  • 与Kafka生态无缝集成
    连接器 (Connector)Kafka Connect使用的插件,用于特定的数据源或目的地。

  • 例如:JDBC连接器用于数据库

  • 可以处理数据的导入和导出
    流处理拓扑 (Stream Topology)Kafka Streams中定义的数据处理流程。

  • 包括数据源、处理器、数据汇聚等

  • 支持复杂的数据处理逻辑
    状态存储 (State Store)Kafka Streams中用于存储状态数据的持久化层。

  • 支持RocksDB等存储引擎

  • 保证状态的持久性和一致性
    窗口 (Windowing)Kafka Streams中用于处理基于时间或计数的窗口数据。

  • 支持滑动窗口和跳窗

  • 用于实现各种基于时间的聚合操作
    处理时间 (Processing Time)Kafka Streams中基于事件处理的时间。

  • 与事件时间相比,处理时间通常更快

  • 但可能受到系统负载的影响
    事件时间 (Event Time)Kafka Streams中基于事件自身时间戳的时间。

  • 允许更准确的时间相关处理

  • 支持时间相关的语义,如水印

8,Kafka安全性

安全特性描述如何实现身份验证 (Authentication)验证客户端身份的过程。

  • SASL/PLAIN、SASL/SCRAM、Kerberos等机制

  • SSL/TLS客户端认证
    授权 (Authorization)控制客户端对资源的访问权限。

  • ACLs (Access Control Lists)

  • Kafka授权模型
    加密 (Encryption)保护传输中的数据,防止数据在传输过程中被窃听。

  • SSL/TLS加密传输

  • 数据加密在库中存储
    传输安全 (Transport Security)确保数据在客户端和Broker之间的传输安全。- 使用SSL/TLS协议加密网络连接逻辑安全 (Logical Security)确保Kafka集群的逻辑组件安全。- 例如,确保Zookeeper的安全网络隔离 (Network Isolation)防止未授权的网络访问。- 使用防火墙、私有网络等审计 (Auditing)记录和监控系统活动,用于事后审查和问题诊断。- 记录关键操作和访问事件密钥管理 (Key Management)管理用于加密和解密的密钥。- 使用密钥管理系统,如Keycloak、KMS等端到端加密 (End-to-End Encryption)从生产者到消费者的数据全程加密。

  • 客户端加密消息

  • 消费者解密消息

** 9.Kafka 监控和运维 (Kafka Monitoring & Operations)**

监控运维特性描述工具/方法指标监控 (Metrics Monitoring)监控Kafka集群的运行指标,如吞吐量、延迟等。

  • JMX (Java Management Extensions)

  • Prometheus + Grafana
    日志管理 (Logging)收集和分析Kafka的日志信息,用于问题诊断。

  • 使用日志收集器,如Fluentd、Logstash

  • ELK (Elasticsearch, Logstash, Kibana) 栈
    性能调优 (Performance Tuning)调整Kafka配置以优化性能。

  • 调整Broker、生产者、消费者的配置参数

  • 根据实际负载进行调优
    故障排查 (Troubleshooting)诊断和解决Kafka运行中的问题。

  • 使用Kafka管理工具

  • 分析Broker、Zookeeper日志
    集群管理 (Cluster Management)管理Kafka集群的生命周期,如扩容、缩容等。

  • 使用Kafka管理脚本

  • Kafka管理界面,如Confluent Control Center
    数据备份 (Data Backup)备份Kafka中的数据,防止数据丢失。

  • 使用MirrorMaker跨集群复制

  • 定期备份日志文件
    高可用性 (High Availability)确保Kafka集群在部分节点故障时仍能正常运行。

  • 配置副本和分区

  • 使用Raft等协议
    灾难恢复 (Disaster Recovery)在发生严重故障时恢复Kafka集群。

  • 制定和测试灾难恢复计划

  • 使用备份数据恢复集群
    安全运维 (Secure Operations)在运维过程中保护Kafka集群的安全。

  • 使用安全的远程访问工具

  • 定期更新和打补丁

10. Kafka 集群管理 (Kafka Cluster Management)

管理任务描述考虑因素集群规划 (Cluster Planning)确定集群的大小、Broker数量、硬件规格等。

  • 预期负载

  • 数据量

  • 性能要求
    集群部署 (Cluster Deployment)实际部署Kafka集群到服务器或云环境。

  • 自动化部署工具,如Ansible、Terraform

  • 容器化部署,如Kubernetes
    集群配置 (Cluster Configuration)设置Broker、Zookeeper、日志保留策略等配置。

  • 根据具体需求调整配置参数

  • 考虑安全性和性能
    集群监控 (Cluster Monitoring)实施监控策略,实时了解集群状态。

  • 使用专业监控工具,如Kafka Manager、Confluent Control Center

  • 集成到现有的监控系统中
    集群维护 (Cluster Maintenance)定期进行维护工作,如升级、扩容、缩容等。- 零停机时间升级
    - 数据迁移和负载均衡集群故障转移 (Cluster Failover)实现故障转移机制,确保高可用性。

  • 使用Raft等协议

  • 配置副本和分区
    集群安全 (Cluster Security)加固集群,防止未授权访问。

  • 配置ACLs

  • 使用SSL/TLS加密

  • 定期安全审计
    集群备份与恢复 (Backup & Recovery)实施备份策略,确保数据不丢失,并能够恢复。

  • 定期备份数据

  • 测试恢复流程
    集群优化 (Cluster Optimization)根据监控数据对集群进行优化。

  • 调整JVM参数

  • 优化网络和磁盘I/O
    集群文档 (Cluster Documentation)记录集群的架构、配置、操作手册等。

  • 便于新成员快速上手

  • 便于问题排查

11. Kafka 性能优化 (Kafka Performance Optimization)

优化领域描述优化策略网络优化 (Network Optimization)提升网络传输效率,减少延迟。

  • 使用高速网络硬件

  • 优化网络配置,如MTU (Maximum Transmission Unit)
    磁盘I/O优化 (Disk I/O Optimization)提高磁盘读写性能。

  • 使用SSD代替HDD

优化文件系统和存储配置
JVM调优 (JVM Tuning)调整Java虚拟机参数,提升Kafka性能。

  • 调整堆大小

  • 垃圾回收策略优化
    批处理 (Batching)生产者将多个消息打包在一起发送,减少网络请求。

  • 调整批处理大小和延迟参数

  • 优化批处理策略
    压缩 (Compression)使用压缩减少传输和存储的数据量。

  • 选择适当的压缩算法

  • 调整压缩级别
    索引优化 (Indexing Optimization)优化日志索引以加快消息查找速度。- 调整索引参数,如索引段的大小分区优化 (Partitioning Optimization)合理划分分区,提高并行处理能力。- 根据数据量和消费者数量调整分区数量消费者优化 (Consumer Optimization)提升消费者处理消息的效率。

  • 使用多线程或消费者组

  • 优化拉取和提交偏移量的策略
    副本优化 (Replica Optimization)优化副本管理,提高数据的可靠性和可用性。- 调整副本数量和副本分配策略垃圾回收 (Garbage Collection)优化垃圾收集过程,减少GC (Garbage Collection) 暂停时间。

  • 使用G1垃圾收集器

  • 调整GC参数
    负载均衡 (Load Balancing)确保集群负载均衡,避免热点问题。- 使用分区键和消息路由优化

    12. Kafka 集成和生态系统 (Kafka Integration & Ecosystem)

    组件/工具描述用途Kafka ConnectKafka与外部系统之间数据流的连接器。

  • 数据导入导出

  • 支持各种数据源和目标系统
    Kafka Streams用于构建流处理应用程序的库。

  • 实时流处理

  • 状态管理

  • 复杂事件处理
    Apache Flink流处理框架,与Kafka集成,处理实时数据流。

  • 流批一体处理

  • 容错机制

  • 窗口操作
    Apache Spark大数据处理框架,支持Kafka作为数据源和数据汇聚点。

  • 批处理

  • 流处理

  • 机器学习
    Apache NiFi数据流自动化系统,用于数据的流动、处理和分发。

  • 数据管道

  • 数据分发

  • 数据处理
    Apache Camel集成框架,提供Kafka组件,用于构建集成路由。

  • ETL (Extract, Transform, Load) 过程

  • 企业集成模式
    Confluent Control CenterKafka管理工具,提供集群管理、监控和操作界面。

  • 集群监控

  • 配置管理

  • 用户界面
    Kafka MirrorMaker用于在Kafka集群之间复制数据的工具。

  • 数据异地冗余

  • 跨数据中心同步
    Schema Registry用于管理Kafka消息模式的中心化服务。

  • 保证消息兼容性

  • 版本控制
    Kafka REST Proxy允许HTTP请求访问Kafka集群的服务。

  • 提供RESTful接口

  • 简化客户端开发
    Kafka Security Modules提供额外的安全特性,如加密、认证和授权。

  • 增强集群安全性

  • 支持多种安全协议

13. Kafka 高级特性 (Kafka Advanced Features)

Transactional Producer支持事务性消息的生产者,确保消息的原子性。

  • 金融交易

  • 数据库变更日志
    Exactly-Once Semantics确保消息精确一次处理,避免重复或丢失。

  • 计费系统

  • 订单处理
    Idempotent Producer保证消息不重复发送,即使在重试情况下。

  • 防止重复处理

  • 幂等性保证
    Compaction Topics对于键值消息,Kafka可以进行日志压缩。

  • 存储最新状态

  • 时间序列数据
    Retention Policy控制消息在Kafka中的保留时间或大小。

  • 日志清理

  • 数据保留
    Log SegmentsKafka将日志分割成多个段,以优化存储和管理。

  • 分段存储

  • 高效索引
    Message Timestamps消息可以携带时间戳,支持时间相关的操作。

  • 事件时间处理

  • 窗口函数
    Custom Partitioner用户可以自定义分区器,以控制消息的路由。

  • 高级路由逻辑

  • 特定业务需求
    Interceptors允许在消息发送和接收路径上插入自定义逻辑。

  • 监控

  • 修改消息

  • 安全性检查
    Quarantine隔离无法处理的消息,防止影响正常消息流。

  • 错误处理

  • 消息过滤

14. Kafka 消息流处理模式 (Kafka Stream Processing Patterns)

处理模式描述示例Map对消息进行转换,生成新的键值对。将原始数据转换为所需的格式或类型。Filter根据条件过滤消息,只传递符合条件的消息。仅允许特定用户的行为数据通过。Aggregate对消息进行聚合操作,如求和、平均等。按小时聚合网站访问量。Window对在特定时间范围内的消息进行操作。计算每个5分钟窗口内的交易总量。Join将两个流基于共同的键连接起来。将用户点击流和用户信息流连接,以添加用户详细信息。Session Window基于会话的窗口,会话之间由非活动间隔分隔。根据用户的活动间隔聚合用户行为。Caching缓存消息或中间结果,提高处理速度。缓存热门查询结果以快速响应。Stateful Processing使用状态进行处理,可以记住之前的状态。根据用户的历史行为进行个性化推荐。Complex Event Processing (CEP)检测消息流中的复杂模式。检测特定股票的连续价格变动模式。Sink将处理后的消息发送到外部系统。将流处理结果写入数据库或搜索索引。

15.Kafka 消息序列化与反序列化

序列化/反序列化类型描述适用场景String Serializer使用字符串序列化器,消息内容会被转换成字符串。

  • 简单的文本消息

  • 日志数据
    String Deserializer使用字符串反序列化器,消息内容会被解析为字符串。

  • 需要以文本形式处理消息

  • 日志解析
    Avro Serializer使用Avro序列化器,一种二进制格式,支持模式。

  • 需要模式验证的数据

  • 跨语言支持
    Avro Deserializer使用Avro反序列化器,解析Avro格式的消息。- 与Avro Serializer配合使用JSON Serializer使用JSON序列化器,将消息内容转换为JSON格式。

  • 需要结构化数据

  • 易于人读和机器解析
    JSON Deserializer使用JSON反序列化器,将JSON格式的消息解析为对象。- 与JSON Serializer配合使用Protobuf Serializer使用Protocol Buffers序列化器,一种高效的二进制格式。

  • 高性能要求

  • 跨平台支持
    Protobuf Deserializer使用Protocol Buffers反序列化器,解析二进制消息。- 与Protobuf Serializer配合使用Custom Serializer自定义序列化器,根据需要实现序列化逻辑。

  • 特殊格式数据

  • 高度定制化需求
    Custom Deserializer自定义反序列化器,根据需要实现反序列化逻辑。

  • 特殊格式数据

  • 高度定制化需求

16. Kafka 消息传递语义 (Kafka Messaging Semantics)

语义类型描述特点At most once消息可能会丢失,但不会重复。

  • 最少的保证

  • 高吞吐量

  • acks=1
    

At least once消息不会丢失,但可能会重复。

  • 保证不丢失

  • 可能重复

  • acks=all
    

Exactly once消息不会丢失且不会重复。

  • 精确处理

  • 事务性

  • 复杂性较高

  • enable.idempotence=true
    

In-order delivery消息在单个分区内是有序的。

  • 保证顺序性

  • 单个分区内消息顺序
    Out-of-order delivery消息可能会乱序,特别是在多个分区中。

  • 吞吐量可能更高

  • 消息可能乱序

17. Kafka 集群扩展性 (Kafka Cluster Scalability)

扩展性方面描述考虑因素水平扩展 (Horizontal Scaling)通过增加Broker数量来扩展集群。

  • 增加更多的服务器

  • 重新分配分区
    垂直扩展 (Vertical Scaling)通过增加单个Broker的硬件资源来扩展集群。

  • 更强大的CPU

  • 更多的内存

  • 更快的磁盘
    分区扩展 (Partition Scaling)增加主题的分区数量来提高并行处理能力。

  • 更高的吞吐量

  • 更复杂的协调
    消费者扩展 (Consumer Scaling)增加消费者数量或消费者组来提高消息处理能力。

  • 消费者组的再平衡

  • 负载均衡
    流处理器扩展 (Stream Processor Scaling)增加流处理器的数量来提高流处理应用程序的吞吐量。

  • Kafka Streams应用程序

  • 增加实例
    数据保留策略 (Data Retention Policy)调整数据保留策略以优化存储资源使用。

  • 保留时间

  • 保留大小
    控制器扩展 (Controller Scaling)控制器负责管理分区和副本的状态,其性能影响集群的稳定性。

  • 控制器选举

  • 集群元数据管理
    网络优化 (Network Optimization)优化网络配置以提高数据传输效率。

  • 网络带宽

  • 网络延迟
    存储优化 (Storage Optimization)优化存储系统以提高I/O性能和降低成本。

  • 使用SSD

  • RAID配置

18. Kafka 测试和调试 (Kafka Testing & Debugging)

测试/调试工具或方法描述用途Kafka Console ProducerKafka提供的命令行生产者工具,用于发送测试消息。

  • 测试主题

  • 发送样例数据
    Kafka Console ConsumerKafka提供的命令行消费者工具,用于读取和查看消息。

  • 读取主题消息

  • 调试消费者代码
    Kafka Manager提供了一个Web界面用于管理Kafka集群。

  • 监控集群状态

  • 主题、分区管理
    Kafka Tools包括了一系列用于测试和调试的工具。

  • 检查日志

  • 验证消息
    Kafka Logs直接查看Kafka的日志文件以获取详细的错误信息。

  • 诊断问题

  • 查看Broker状态
    JMX Metrics使用JMX (Java Management Extensions) 来监控和调试Kafka。

  • 实时监控性能指标

  • 动态调整配置
    Kafka's built-in monitoringKafka自身提供了一些监控工具和指标。

  • 内置的性能监控

  • 健康检查
    Integration Tests对Kafka与其他系统集成进行测试。

  • 验证连接器功能

  • 测试流处理应用程序
    Stress Testing对Kafka进行压力测试,以评估其在高负载下的表现。

  • 确定性能瓶颈

  • 测试集群稳定性
    Debugging with IDEs使用集成开发环境进行代码级别的调试。

  • 开发者调试

  • 查找代码缺陷
    Profiling使用性能分析工具来识别性能瓶颈。

  • 优化代码

  • 提升效率

19.Spring Boot整合Kafka

Spring Boot整合Kafka主要涉及以下几个步骤:

  1. 添加依赖:在Spring Boot项目的pom.xml文件中添加Spring Kafka的相关依赖。
  2. 配置Kafka:在application.propertiesapplication.yml中配置Kafka的连接信息,包括Bootstrap Servers。
  3. 创建Producer配置:配置Kafka Producer并创建Kafka Template,用于发送消息。
  4. 创建Consumer配置:配置Kafka Consumer,创建用于接收消息的监听器。
  5. 发送和接收消息:使用Kafka Template发送消息,使用@KafkaListener注解或消息监听器容器来接收消息。
  6. 管理监听器:可以控制监听器的启动、停止和恢复。
  7. 高级特性:使用Spring Kafka提供的高级特性,如事务性消息、自定义分区器、消息转发等

具体步骤:

   首先需要安装Kafka服务,网上教程挺多,确保有对应的服务。

  19.1 添加依赖:

在
pom.xml

中添加如下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
19.2 配置Kafka

application.properties

中添加:

spring:
  kafka:
    consumer:
      key-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      group-id: myGroup
      enable-auto-commit: false
      auto-commit-interval: 1000
      max-poll-records: 1
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        acks: all
        retries: 3
        batch.size: 16384
        linger.ms: 1
        buffer.memory: 33554432
        max.block.ms: 6000
        compression.type: none
        max.request.size: 1048576
    bootstrap-servers: 127.0.0.1:9092

19.3 创建生产者:发送消息到Kafka

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

19.4 创建消费者:从Kafka消费消息

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsumer {
 
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(String message) {
        System.out.println("Received message in group myGroup: " + message);
    }
}

19.5 启动类:确保@EnableKafka注解已经添加

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
 
@SpringBootApplication
@EnableKafka
public class KafkaApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

本文转载自: https://blog.csdn.net/u011174699/article/details/138415993
版权归原作者 xiaomifeng1010 所有, 如有侵权,请联系我们删除。

“Apache Kafka知识点表格总结”的评论:

还没有评论