kafka的一些命令在不同版本已经不一样了 至少是在2.12-3.0.1这个版本创建一些东西已经不依赖zookeeper参数 所以要慢慢自己学习
zookeeper is not a recognized option 其中这个报错就是提示的这个问题
下载kafka
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
我自己放在了目录
/usr/local/kafka_2.12-2.5.0
修改配置文件 配置文件路径 /usr/local/kafka_2.12-2.5.0/config/server.properties
修改的配置是,192.168.145.135 修改成你自己机器的ip地址
advertised.listeners=PLAINTEXT://192.168.145.135:9092
&符号表示后台运行
启动zookeeper 进入/usr/local/kafka_2.12-2.5.0/bin
./zookeeper-server-start.sh ../config/zookeeper.properties &
启动kafka 进入/usr/local/kafka_2.12-2.5.0/bin
./kafka-server-start.sh ../config/server.properties &
创建一个topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qqftest
查看topic创建的结果
./kafka-topics.sh --list --zookeeper localhost:2181
这里创建生产者和消费者记得开两个shell窗口,这样才能很好的看到生产和消费信息
生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic qqftest
消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qqftest
下面这句是指明的从开始读取消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qqftest --from-beginning
下面是使用springboot链接kafka做的测试,这个是使用了手动提交的模式,你可以百度一下自动提交模式需要修改的配置
首先在pom.xml中添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后再yml文件中添加配置
kafka:
bootstrap-servers: 192.168.145.135:9092 #Kafka集群
listener:
ack-mode: manual
producer: #生产者配置
retries: 0
acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 # 批量大小
properties:
linger:
ms: 0 # 提交延时 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
buffer-memory: 33554432 # 生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer #Kafka提供的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer #Kafka提供的反序列化类
consumer:
properties:
group:
id: defaultConsumerGroup # 默认的消费组ID
session:
timeout:
ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
request:
timeout:
ms: 180000 # 消费请求超时时间
enable-auto-commit: false # 是否自动提交offset
auto-offset-reset: latest # 当kafka中没有初始offset或offset超出范围时将自动重置offset 参数值earliest:重置为分区中最小的offset; latest:重置为分区中最新的offset(消费分区中新产生的数据); none:只要有一个分区不存在已提交的offset,就抛出异常;
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka提供的序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #Kafka提供的反序列化类
auto-commit-interval: 1000ms # 提交offset延时(接收到消息后多久提交offset)
最后是java代码
引入的包根据你的实际情况删除,我自己直接粘贴过来了
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.sql.DataSource;
import java.util.concurrent.TimeUnit;
下面的代码就是再一个controller文件中直接粘贴复制就可以使用了,有些瑕疵,我想有基础的同学应该可以自己解决了
/*自定义topic
*
* 使用./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qqftest 创建topic
* */
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate; //spring framework封装的kafka的包
public static final String qqftest = "qqftest";
@RequestMapping("/KafkaProduct")
@ResponseBody
public String KafkaProduct(Model model) {
log.info("准备发送消息为:{}", "obj2String");
//发送消息,监听返回对象包装 实现成功跟失败后的接口
for (int i = 0; i < 10; i++) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(qqftest, "obj2String" + i);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//失败的处理
log.info(qqftest + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info(qqftest + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
return "addKafkaProductSuc";
}
@KafkaListener(topics = qqftest)
public void onMessage1(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// 消费的哪个topic、partition的消息,打印出消息内容
log.info("简单消费onMessage1:" + topic + record.topic() + "-" + record.partition() + "-" + record.value());
}
@KafkaListener(topics = qqftest)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
ack.acknowledge();
log.info("消费者 消费了: Topic:" + topic + ",Message:" + record.value());
}
版权归原作者 liuchunwei001 所有, 如有侵权,请联系我们删除。