生产者测试:
bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic test-topic
消费者测试:
bin/kafka-console-consumer.sh --bootstrap-server 1.2.3.4:9092 --topic test-topic --from-beginning
const { Kafka }= require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['1.2.3.4:9092'] // 使用服务器的 IP 和端口
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group'})
const run = async ()=>{
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello KafkaJS user!1111'},
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true})
await consumer.run({
eachMessage: async ({ topic, partition, message })=>{
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})},
})}
run().catch(console.error)
本文转载自: https://blog.csdn.net/github_35631540/article/details/141565882
版权归原作者 拿我格子衫来 所有, 如有侵权,请联系我们删除。
版权归原作者 拿我格子衫来 所有, 如有侵权,请联系我们删除。