0


关于kafka安装使用qqf

kafka的一些命令在不同版本已经不一样了 至少是在2.12-3.0.1这个版本创建一些东西已经不依赖zookeeper参数 所以要慢慢自己学习

zookeeper is not a recognized option 其中这个报错就是提示的这个问题

下载kafka

wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

我自己放在了目录

/usr/local/kafka_2.12-2.5.0

修改配置文件 配置文件路径 /usr/local/kafka_2.12-2.5.0/config/server.properties

修改的配置是,192.168.145.135 修改成你自己机器的ip地址

advertised.listeners=PLAINTEXT://192.168.145.135:9092

&符号表示后台运行

启动zookeeper 进入/usr/local/kafka_2.12-2.5.0/bin

./zookeeper-server-start.sh ../config/zookeeper.properties &

启动kafka 进入/usr/local/kafka_2.12-2.5.0/bin

./kafka-server-start.sh ../config/server.properties &

创建一个topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qqftest

查看topic创建的结果

./kafka-topics.sh --list --zookeeper localhost:2181

这里创建生产者和消费者记得开两个shell窗口,这样才能很好的看到生产和消费信息

生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic qqftest

消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qqftest

下面这句是指明的从开始读取消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qqftest --from-beginning

下面是使用springboot链接kafka做的测试,这个是使用了手动提交的模式,你可以百度一下自动提交模式需要修改的配置

首先在pom.xml中添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

然后再yml文件中添加配置

kafka:
  bootstrap-servers: 192.168.145.135:9092 #Kafka集群
  listener:
    ack-mode: manual
  producer:    #生产者配置
    retries: 0
    acks: 1    # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
    batch-size: 16384 # 批量大小
    properties:
      linger:
        ms: 0   # 提交延时 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka   linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
    buffer-memory: 33554432 # 生产端缓冲区大小
    key-serializer: org.apache.kafka.common.serialization.StringSerializer #Kafka提供的序列化
    value-serializer: org.apache.kafka.common.serialization.StringSerializer #Kafka提供的反序列化类
  consumer:
    properties:
      group:
        id: defaultConsumerGroup # 默认的消费组ID
      session:
        timeout:
          ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
      request:
        timeout:
          ms: 180000 # 消费请求超时时间
    enable-auto-commit: false # 是否自动提交offset
    auto-offset-reset: latest # 当kafka中没有初始offset或offset超出范围时将自动重置offset 参数值earliest:重置为分区中最小的offset; latest:重置为分区中最新的offset(消费分区中新产生的数据); none:只要有一个分区不存在已提交的offset,就抛出异常;
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka提供的序列化
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka提供的反序列化类
    auto-commit-interval: 1000ms # 提交offset延时(接收到消息后多久提交offset)

最后是java代码

引入的包根据你的实际情况删除,我自己直接粘贴过来了

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.sql.DataSource;
import java.util.concurrent.TimeUnit;

下面的代码就是再一个controller文件中直接粘贴复制就可以使用了,有些瑕疵,我想有基础的同学应该可以自己解决了

/*自定义topic
 *
 * 使用./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qqftest 创建topic
 * */
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate; //spring framework封装的kafka的包
public static final String qqftest = "qqftest";

@RequestMapping("/KafkaProduct")
@ResponseBody
public String KafkaProduct(Model model) {
    log.info("准备发送消息为:{}", "obj2String");
    //发送消息,监听返回对象包装 实现成功跟失败后的接口

    for (int i = 0; i < 10; i++) {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(qqftest, "obj2String" + i);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //失败的处理
                log.info(qqftest + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(qqftest + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

    return "addKafkaProductSuc";

}

@KafkaListener(topics = qqftest)
public void onMessage1(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    // 消费的哪个topic、partition的消息,打印出消息内容
    log.info("简单消费onMessage1:" + topic + record.topic() + "-" + record.partition() + "-" + record.value());
}

@KafkaListener(topics = qqftest)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    ack.acknowledge();
    log.info("消费者 消费了: Topic:" + topic + ",Message:" + record.value());
}
标签: 大数据 java kafka

本文转载自: https://blog.csdn.net/weixin_43835474/article/details/125471979
版权归原作者 liuchunwei001 所有, 如有侵权,请联系我们删除。

“关于kafka安装使用qqf”的评论:

还没有评论