前言
在内卷严重的程序员圈子中,原地踏步就是退步,所以不能再躺平啦,赶紧爬起来学习,接下来博主将推出《Kafka入门到精通》系列文章,让你可以在企业中玩起Kafka来得心应手,此乃升职加薪必备呀。
Kafka认识
Kafka 是由Linkedin公司开发的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,是一款基于发布订阅模式的开源消息引擎系统。相对于其他的消息组件来说Kafka拥有更好的吞吐量、内置分区、具有复制和容错的功能,这使它成为一个非常理想的大型消息处理应用。使用场景如:网页浏览记录,日志收集,监控数据等等。
Kafka 的标准定位是分布式流式处理平台,早期的定位是以消息引擎的身份出现的,随着 Kafka 的不断演进, Kafka 开发团队日益发现经 Kafka 交由下游数据处理平台做的事情 Kafka 自己也可以做,因此在 Kafka 0.10.0.0 版本正式推出了 Kafka Streams ,即流式处理组件 。自 Kafka 正式成为了 个流式处理框架,而不仅仅是消息引擎了。
如图所示,Kafka的工作流程为
- 生产者发送消息到Kafka集群
- 消费者从Kafka拉取消息
- Kafka依赖于Zookeeper处理服务的协调
Kafka快速安装(windows)
第一步:下载kafka ,https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
下载后解压,由于Kafka 使用 ZooKeeper作为服务协调工具, 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器,Kafka 内置提供了 ZooKeeper 服务器以及 组相关的管理脚本,我们直接使用这个内置ZooKeeper 即可,进入到bin\windows 目录,执行如下列命令(windows):
zookeeper-server-start.bat ../../config/zookeeper.properties
zookeeper.properties是作为zookeeper的配置文件,比如你想修改zookeeper的默认端口通过配置文件修 clientPort=2181项即可 ,如下:
如果是在linux启动,cd到bin目录,执行下面命令
zookeeper-server-start.sh ../config/zookeeper.properties
启动效果如下
对于运行 Kafka 言,至少要求安装Java 7版本 ,Zookeeper的端口是2181接下来我们启动 Kafka 务器,进入到bin\windows 目录,执行如下列命令(windows):
kafka-server-start.bat ../../config/server.properties
server.properties作为kafka的配置文件,我们关注三个配置,你也可以根据情况进行修改
broker.id=0 #如果要集群部署,broker.id不能重复
advertised.listeners=PLAINTEXT://127.0.0.1:9092 #kafka的地址和端口
zookeeper.connect=127.0.0.1:2181 #zookeeper的地址和端口
启动效果如下
控制台输出结尾处的“ Kafka Server O], started ,标志 Kafka 服务器启动成功,默认的服务端口是 9092
Kafka快速入门
下面我们基于SpringBoot来快速入门Kafka,做一个发送消息和接收消息的案例
第一步:搭建SpringBoot工程,导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>
第二步,创建启动类,yml配置如下
spring:application:name: application-kafka
kafka:bootstrap-servers: localhost:9092#这个是kafka的地址,对应你server.properties中配置的producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group #消费者的ID,这个对应 config/consumer.properties中的group.id
第三步:创建测试类,发送消息, 通过 注入 KafkaTemplate 发送
@SpringBootTest(classes =KafkaApplication.class)@RunWith(SpringRunner.class)publicclassProducerTest{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@Testpublicvoidsend()throwsInterruptedException{//发送消息:第一个参数是topic,第二个参数是消息的内容。Topic我们可以理解为对消息的分类
kafkaTemplate.send("topic-hello","你好kafka");System.out.println("发送完成");Thread.sleep(2000);}}
第四步:创建消费者,接收消息, 通过 :@KafkaListener(topics = “topic”) 监听topic中的消息
@ComponentpublicclassHelloConsumer{@KafkaListener(topics ="topic-hello")publicvoidhandler(String message){System.out.println("收到消息:"+message);}}
第五步:使用命令 创建Topic,否则会找不到Topic
创建名为“topic-hello”的TOPIC , 进入 bin/windows目录,cmd执行:
kafka-topics.bat --create --zookeeper localhost:2181 --topic topic-hello --partitions 1 --replication-factor 1
执行测试方法,查看控制台效果
文章结束,希望对你有所帮助,喜欢的话点赞收藏加评论哦
版权归原作者 墨家巨子@俏如来 所有, 如有侵权,请联系我们删除。