0


kafka简单使用

kafka简单使用

前言:通常开发过程中,比如告警数据,云网系统需要订阅告警子中心订阅告警数据,云管系统也要订阅告警子中心的告警数据…
解决方案:
1,告警子中心提供openapi接口,需要订阅的一方调用openapi接口,获取数据进行解析
2,告警子中心提供定时任务往订阅方推送数据
3,告警子中心告警数据往消息中间件发送数据,需要订阅告警数据的一方订阅中间件获取消息进行消费

以上是常见的方案:
方式一和方式二
优点: 逻辑相对简单点,确保相关的数据结构书写相关接口文档,进行推送数据和接收数据…
缺点: 耦合性高,排查问题不方便,依赖性强
方式三:
优点: 解耦、异步
缺点:

下面讲解kafka简单使用

1,kafka环境搭建

zookeeper集群搭建:
搭建kafka集群

1 # The number of milliseconds of each tick
  2 tickTime=20003 # The number of ticks that the initial 
  4 # synchronization phase can take
  5 initLimit=106 # The number of ticks that can pass between 
  7 # sending a request and getting an acknowledgement
  8 syncLimit=59 # the directory where the snapshot is stored.10 # do not use /tmp for storage,/tmp here is just 
 11 # example sakes.12 dataDir=/opt/module/zookeeper/zkData
 13 # the port at which the clients will connect
 14 clientPort=218115 # the maximum number of client connections.16 # increase this if you need to handle more clients
 17 #maxClientCnxns=6018 #
 19 # Be sure to read the maintenance section of the 
 20 # administrator guide before turning on autopurge.21 #
 22 # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance23 #
 24 # The number of snapshots to retain in dataDir
 25 #autopurge.snapRetainCount=326 # Purge task interval in hours
 27 # Set to "0" to disable auto purge feature
 28 #autopurge.purgeInterval=1293031323334353637 #######################cluster##########################
 38 server.1=192.168.124.106:2888:388839 server.2=192.168.124.107:2888:388840 server.3=192.168.124.108:2888:3888

2,kafka常见的命令

2.1 kakfa相关命令

启动kafka

-- 在kafka的目录下面 启动kafka
[root@java-0326 kafka]# bin/kafka-server-start.sh -daemon config/server.properties 
-- jps 查看是否启动kafka 
[root@java-0326 kafka]# jps
130434 Kafka
473 Jps
38908 QuorumPeerMain
-- 停止kafka
[root@java-0326 kafka]# bin/kafka-server-stop.sh stop

查看kafka topic 创建、查看、删除

-- 查看topic集合
[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.106:9092--list
__consumer_offsets
first
-- 创建topic second 副本3--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.106:9092--create --replication-factor 3--partitions 3--topic second
Created topic second.[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.106:9092--list
__consumer_offsets
first
second
-- 删除topic
[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.106:9092--delete --topic first

发送消息消费消息:

-- 发送消息
[root@java-0326 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.124.106:9092--topic first
>hello
>javja

-- 消费消息
[root@java-0326 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.107:9092--topic first

javja
xioa

-- from-beginning 会
[root@java-0326 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.107:9092--topic first --from-beginning

topic的详情

[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.108:9092--describe --topic first
Topic: first    TopicId: A29r8gNURHyL8KEz6Drvew PartitionCount:3       ReplicationFactor:3    Configs: segment.bytes=1073741824
        Topic: first    Partition:0    Leader:0       Replicas:0,1,2 Isr:0,1,2
        Topic: first    Partition:1    Leader:2       Replicas:2,0,1 Isr:2,0,1
        Topic: first    Partition:2    Leader:1       Replicas:1,2,0 Isr:1,2,0-- 修改主题的分区数
[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.108:9092--alter --topic three --partitions 3[root@java-0326 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.124.108:9092--describe --topic three

2.2 相关术语讲解

ISR
leader 维护动态的ISR【in-sync replica set】意为何leader保持同步的follower 集合。当follower 完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader 同步数据,则该follower 将被提出ISR,该时间阙值由replica.lag.time.max.ms参数设定。leader发生故障之后,就会从ISR中选取新的leader。

acks 参数配置
0:producer 不等待broker的ack的ack,这一操作提供了一个最低的延迟,broker一接收到还没写入磁盘已经返回,当broker故障时有可能 丢失数据
1:producer 等待broker的ack ,partition 的leader落盘成功后返回ack,如果在follower同步成功之前 leader故障,那么将会 丢失数据
-1(all):producer 等待broker 的ack,partition的leader和follower全部落盘成功后才返回ack,但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成 数据重复

LEO和HW
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。
在这里插入图片描述

3,idea配置kafka

3.1 添加maven依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>

3.2 生产者

KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

不带回调函数的API

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.swing.*;
import java.util.Properties;/**
 * @author psd 生产者
 */

public class CustomProducer {

    public staticvoidmain(String[] args){
        Properties pro = new Properties();// kafka集群地址
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.106:9092");// key 序列化
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// value 序列化
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 所有的同步完成 
        pro.put(ProducerConfig.ACKS_CONFIG,"all");// 重试的次数
        pro.put(ProducerConfig.RETRIES_CONFIG,1);//批次大小
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 等待时间
        pro.put(ProducerConfig.LINGER_MS_CONFIG,1);// RecordAccumulator 缓冲区大小
        pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        Producer<String,String> producer = new KafkaProducer<>(pro);for(int i =0; i <10; i++){
            producer.send(new ProducerRecord<String,String>("three",Integer.toString(i),Integer.toString(i)));}
        producer.close();}}

带回调函数API
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是
RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果
Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;/**
 * @author psd 带回调函数的API
 */
public class CustomProducerCall {
    public staticvoidmain(String[] args){
        Properties pro = new Properties();// kafka集群地址
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.106:9092");// key 序列化
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// value 序列化
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 所有的同步完成 
        pro.put(ProducerConfig.ACKS_CONFIG,"all");// 重试的次数
        pro.put(ProducerConfig.RETRIES_CONFIG,1);//批次大小
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 等待时间
        pro.put(ProducerConfig.LINGER_MS_CONFIG,1);// RecordAccumulator 缓冲区大小
        pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        Producer<String,String> producer = new KafkaProducer<>(pro);for(int i =0; i <10; i++){
            producer.send(new ProducerRecord<String, String>("three", Integer.toString(i), Integer.toString(i)), new Callback(){// 回调函数,该方法会在producer收到ack 时调用,为异步调用
                @Override
                public voidonCompletion(RecordMetadata metadata, Exception exception){if(exception == null){
                        System.out.println(metadata.partition()+"--"+ metadata.offset());}}});}
        producer.close();}}

同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同
步发送的效果,只需在调用 Future 对象的 get 方发即可

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.swing.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/**
 * @author psd 生产者
 */

public class CustomProducer {

    public staticvoidmain(String[] args) throws ExecutionException, InterruptedException {
        Properties pro = new Properties();// kafka集群地址
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.106:9092");// key 序列化
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// value 序列化
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 所有的同步完成
        pro.put(ProducerConfig.ACKS_CONFIG,"all");// 重试的次数
        pro.put(ProducerConfig.RETRIES_CONFIG,1);//批次大小
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 等待时间
        pro.put(ProducerConfig.LINGER_MS_CONFIG,1);// RecordAccumulator 缓冲区大小
        pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        Producer<String,String> producer = new KafkaProducer<>(pro);for(int i =0; i <10; i++){
            producer.send(new ProducerRecord<String,String>("three",Integer.toString(i),Integer.toString(i))).get();}
        producer.close();}}

3.3 消费者

Consumer 消息数据时的可靠性是容易保证的,因为数据在Kafka是持久化,故不用担心数据丢失问题。

导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency>

编写代码:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔

手动提交 offset
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发难以把握 offset 提交的时机,因此 Kafka还提供手动提交 offset 的API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交) 和 commitAsync (异步提交)。
两者相同点是:都会将本次 poll的一批数据最高的偏移量提交
不同的是:
commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由于不可控的因素导致,也会出现提交失败)
而 commitAsync 则没有失败重试机制,故有可能提交失败。
同步提交:
由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交offset 的式例。

同步提交:
由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交offset 的式例。

package com.atguigu.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;/**
 * @author psd 手动提交offset
 */
public class CustomConsumerSync {

    public staticvoidmain(String[] args){
        Properties properties = new Properties();// kafka 集群
        properties.put("bootstrap.servers","192.168.124.108:9092");// 消费者组
        properties.put("group.id","test2");// 关闭自动提交
        properties.put("enable.auto.commit","false");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("three"));while(true){// 消费者组拉取数据
            ConsumerRecords<String, String> records = consumer.poll(100);for(ConsumerRecord<String, String> record : records){
                System.out.println("record==>"+ record);}// 同步提交,当前线程会阻塞直到offset提交成功
            consumer.commitSync();}}}

异步提交 offset
虽然同步提交offset更可靠一些,但是由于其会阻挡当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多情况下,会选择异步提价 offset 的方式。

package com.atguigu.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;/**
 * @author psd 手动提交offset
 */
public class CustomConsumerSync {

    public staticvoidmain(String[] args){
        Properties properties = new Properties();// kafka 集群
        properties.put("bootstrap.servers","192.168.124.108:9092");// 消费者组  只要group.id 相同,就属于同一个消费组
        properties.put("group.id","test2");// 关闭自动提交
        properties.put("enable.auto.commit","false");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("three"));while(true){// 消费者组拉取数据
            ConsumerRecords<String, String> records = consumer.poll(100);for(ConsumerRecord<String, String> record : records){
                System.out.println("record==>"+ record);}// 同步提交,当前线程会阻塞直到offset提交成功
            consumer.commitSync();}}}

异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多情况下,会选用异步提交 offset 的方式。
以下为异步提交 offset 的式例:

package com.atguigu.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;/**
 * @author psd
 */
public class CustomConsumerASync {
    public staticvoidmain(String[] args){
        Properties properties = new Properties();// kafka 集群
        properties.put("bootstrap.servers","192.168.124.108:9092");// 消费者组
        properties.put("group.id","test2");// 关闭自动提交
        properties.put("enable.auto.commit","false");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("three"));while(true){
            ConsumerRecords<String, String> records = consumer.poll(10);for(ConsumerRecord<String, String> record : records){
                System.out.println("异步提交Async:"+ record);}// 异步提交
            consumer.commitAsync(new OffsetCommitCallback(){
                @Override
                public voidonComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception){if(exception != null){
                        System.out.println("commit failed for"+ offsets);}}});}}}

小计:

常见的开发步骤:
1,生产者和消费者约束好消费的数据类型,梳理相关的接口文档
2,一般消费者订阅相关的主题,对数据进行相关解析,需要判空以及数据类型转换,kafka的数据量一般很大,个人推荐用批处理、加锁、数据类型转换、多线程异步处理数据… 防止数据丢失

喜欢我的文章记得点个在看,或者点赞,持续更新中ing…

标签: kafka 分布式

本文转载自: https://blog.csdn.net/qq_45938780/article/details/141888304
版权归原作者 on the way 123 所有, 如有侵权,请联系我们删除。

“kafka简单使用”的评论:

还没有评论