0


Kafka调试

Kafka安装配置

安装

kafka官网https://kafka.apache.org/downloads

wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz

tar xf kafka_2.12-2.7.2.tgz -C /usr/local/ #将kafka安装到了**/usr/local**目录下

mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka #新建存放日志和数据的文件夹

mkdir /usr/local/kafka/logs

配置

afka的主配置文件为/usr/local/kafka/config/server.properties,这里以节点kafkazk1为例,重点介绍一些常用配置项的含义:

broker.id=1 
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=10.0.0.6 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
listeners=PLAINTEXT://10.0.0.6:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/usr/local/kafka/logs 
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
num.partitions=6 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 
offsets.topic.replication.factor=1 
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
message.max.byte=5242880  #消息保存的最大值5M
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
zookeeper.connect=localhost:2181 #不是集群,所以可以写成localhost 
#zookeeper.connect=10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181  #集群
zookeeper.connection.timeout.ms=18000 
group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true 
delete.topic.enable=true

每个配置项含义如下:

  • broker.id:每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。
  • listeners:设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。
  • log.dirs:这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径, kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。
  • num.partitions:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。
  • log.retention.hours:这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。
  • log.segment.bytes:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。
  • zookeeper.connect:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,每个部分的含义如下: - hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。- port: 表示是zookeeper服务器监听连接的端口号。- /path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。
  • auto.create.topics.enable:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。
  • delete.topic.enable:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。

设置环境变量

$ vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#生效
$ . /etc/profile

启动脚本

$ vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

$ systemctl daemon-reload

启动kafka

在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在kafka各个节点上执行如下命令即可。这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件,可通过此文件查看kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。

$ cd /usr/local/kafka
$ nohup bin/kafka-server-start.sh config/server.properties &
# 或者
$ systemctl start kafka
$ jps
21840 Kafka
15593 Jps
15789 QuorumPeerMain

Kafka相关知识

下图展示了Kafka的相关术语以及之间的关系:

    上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
     如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

broker

    Kafka 集群包含一个或多个服务器,服务器节点称为broker。
     broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
     如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
     如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名

Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

相关参考链接:Kafka详解(包括kafka集群搭建)-CSDN博客

https://www.cnblogs.com/duanxz/p/4492870.html

Java kakfa配置application.yml

spring:
  kafka:
    bootstrap-servers: 10.0.0.6:9092
    producer:
      acks: all
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        linger.ms: 1

pom.xml依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>${spring-integration.version}</version><!--$NO-MVN-MAN-VER$ -->
</dependency>

**发送kafka消息工具类实现 **

package com.test.util;
/*发送kafka消息工具类*/

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.test.common.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@EnableKafka
public class KafkaSender {

    @Autowired(required = false)
    private KafkaTemplate<String, Object> template;

    @Value("${kafka.debug:true}")
    private boolean kafka_send_debug;

    public void sendMsgAsyncAndLog(String topic, Object msg) {
        if (kafka_send_debug) {
            log.info("sendMsgAsyncAndLog to {} msg: {}", topic, JsonUtil.objToStr(msg));
        }
        sendMsgAsync(topic, msg, new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("sendMsgAysnc suc! msg:{}", msg);
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error(ex.getMessage(), ex);
                log.error("sendMsgAysnc error! msg:{}", msg);
            }
        });
    }

    public void sendMsgAsync(String topic, Object msg, ListenableFutureCallback<SendResult<String, Object>> callback) {
        if (this.template != null) {
            ListenableFuture<SendResult<String, Object>> future = this.template.send(topic, msg);
            future.addCallback(callback);
        }
    }
}

监听topic并消费示例

package com.receive.test.listener;
/*监听kafka topic消息并消费*/
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.test.route.RouteDeal;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class TestKafkaListener {
    @Autowired
    private RouteDeal routeDeal;

    @KafkaListener(topics = { "topic.test_topic" })
    public void kakfaListenerDeal(ConsumerRecord<?, String> record) {
        try {
            Optional<String> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                String message = kafkaMessage.get();
                if (Boolean.getBoolean("debug_log")) {
                    log.info("receive log: {}", message);
                }
                routeDeal.putEle(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

Kafka调试

kefka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于kafka安装目录的bin目录下,这里是**/usr/local/kafka/bin,**包括createKafkaOnly.sh createKafka.sh deleteKafka.sh 创建与删除topic。

登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作。

下面列举kafka的一些常用命令的使用方法。
(1)显示topic列表

#kafka-topics.sh  --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --list
$ kafka-topics.sh  --zookeeper 10.0.0.6:2181 --list
topic123

(2)创建一个topic,并指定topic属性(副本数、分区数等)

#kafka-topics.sh --create --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 
$ kafka-topics.sh --create --zookeeper 10.0.0.6:2181 --replication-factor 1 --partitions 3 --topic topic123
Created topic topic123.
#--replication-factor表示指定副本的个数

(3)查看某个topic的状态

#kafka-topics.sh --describe --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --describe --zookeeper 10.0.0.6:2181 --topic topic123
Topic: topic123    PartitionCount: 3    ReplicationFactor: 1    Configs: 
    Topic: topic123    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
    Topic: topic123    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: topic123    Partition: 2    Leader: 1    Replicas: 1    Isr: 1

(4)生产消息 阻塞状态

#kafka-console-producer.sh --broker-list 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-producer.sh --broker-list 10.0.0.6:9092 --topic topic123

(5)消费消息 阻塞状态

#kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
$ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123
#从头开始消费消息
#kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123 --from-beginning
$ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning

(6)删除topic

#kafka-topics.sh --delete --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
$ kafka-topics.sh --delete --zookeeper 10.0.0.6:2181 --topic topic123

(7)其他

#kafka查看topic里n条消息
sh kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic topic_name  --from-beginning --max-messages n

#模拟发送数据
/home/kafka/software/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic_name

#消费数据
/home/kafka/software/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name  --from-beginning 

#查看topic列表 
/home/kafka/software/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
 kafka启动常见问题

    1.端口被占用Socket server failed to bind to 0.0.0.0:9092: Address already in use. 
     解决办法:netstat -nltp | grep 9092 找到并kill掉对应的进程,重新启动kafka进程。

    2.配置文件错误,如示例将log.retention.hours的值配置成了字母,实际应该是数值类型,导致kafka无法启动

解决办法:修改错误的配置,重新启动kafka进程

    3.节点已注册

    从kafka的server.log日志中发现如下信息,这种方式一般是因为磁盘问题,回想下是否进行过磁盘的重分配,将原来其它节点的磁盘,分配给了现在的节点,这种现象一般发生在单机版kafka多broker节点的场景

    此时如果将磁盘分配还原成最开始的分配方式并启动kafka进程后,会出现下面第二张图的问题,出现了同一个kafka主题分区有两个目录的情况

解决办法:如果是初次部署,还没有流量进来,先卸载kafka节点,再重新部署;

              其它情况,先停kafka服务,删除两个相同的目录中的一个,只保留其中一个目录即可,然后重启kafka进程

    4. 当日志出现类似于这样的ERROR信息时,可能是出现了脏副本

Exiting because log truncation is not allowed for partition test01-1, current leader's latest offset 28025 is less than replica's latest offset 28402

出现这样的错误信息后,kafka启动会失败,这时,server.properties文件中添加参数unclean.leader.election.enable=true,启动kafka服务即可以启动

        5. 当出现下面的报错时,需要检查目录的权限
     FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.io.IOException: Permission denied

    6. 当出现下面的报错时,需要检查磁盘问题
     org.apache.kafka.common.KafkaException: java.io.IOException: Input/output error
标签: kafka

本文转载自: https://blog.csdn.net/weixin_42145766/article/details/135220416
版权归原作者 逝水流痕Summer 所有, 如有侵权,请联系我们删除。

“Kafka调试”的评论:

还没有评论