生产者的同步发送消息
生产者发送消息到我们的topic分区上,需要等待我们kafka返回的ack,如果没有返回就会进入3s的阻塞,retry3次——>抛出异常(这里面我们可以将信息记录到文件日志中)
(43条消息) NIO学习_Fairy要carry的博客-CSDN博客
package com.wyh.kafka_demo.kafka;
import com.alibaba.fastjson.JSON;
import com.wyh.kafka_demo.pojo.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class SimpleProducer {
/**
* 1.创建主题
*/
private final static String TOPIC_NAME = "jqTopic5";
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 2.kafka的broker
*/
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "82.157.198.247:9092,82.157.198.247:9093");
/**
* 2.1序列化:Key+value
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* 封装到发消息的客户端中
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
/**
* 3.模拟发送五条消息
*/
int maxNum = 5;
/**
* 3.1创建消息
* hash(key)%partition
* ProducerRecord:消息记录
* key:消息标识key,帮助告诉发送到哪个分许通过hash计算
* value:json数据,消息内容
*/
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"TestKey", "hello,kafka");
/**
* 3.2通过异步的形式发送消息
*/
RecordMetadata recordMetadata = producer.send(producerRecord).get();
System.out.println("同步方式得到的消息:" + "topic-" + recordMetadata.topic()
+ "|partition-" + recordMetadata.partition()
+ "|offset-" + recordMetadata.offset());
}
}
异步发送
我们可以利用异步回调,生产者不管你消费者有没有消费发送完就往下执行自己的了;当我们拿到数据后通过回调机制异步返回(比如zk的watch机制就是这样)
(43条消息) Zookeeper_Fairy要carry的博客-CSDN博客
优点:速度较快,像银行转账业务异步就毕竟好,反馈较快
缺点:可能会出现消息丢失的情况,例如我们rabbitmq可以利用spring的retry机制解决,另外分布式事务也是需要解决的
(43条消息) RabbitMQ的高级特性(消息可靠性)_Fairy要carry的博客-CSDN博客
(43条消息) MQ-消息延迟_Fairy要carry的博客-CSDN博客_mq 延迟消息
像消息堆积我们kafka不需要考虑这方面,消息放在硬盘中
生产端ack的配置
1.两个broker上创建2个分区配上两个副本
bin/kafka-topics.sh --bootstrap-server xxxx:9092,xxxx:9093 --create --partitions 2 --replication-factor 2 --topic jqTopic5
2.生产者配置
bin/kafka-console-producer.sh --broker-list xxxx:9092,xxxx:9093 --topic jqTopic5
同步发送的前提,生产者在获取ack前会一直阻塞:三种ack
ack=0:生产者只需要把消息给到broker,而不需要到partition中kafka就会把ack返回给生产者,速度较快容易丢消息
ack=1:多副本都接收到消息(leader进行同步),并且将消息传到log中kafka就返回ack
ack=all:leader和follower同步完后才会将ack返回
发送消息缓冲机制
生产者并不是直接把消息推送给kafka的——>利用了消息缓冲的机制,kafka本地线程默认会创建一个缓冲区 ,用来存放发送的数据(先放到缓冲区,当到达16k就进行发送)
如果没有满足16k,就等待10ms再发送消息
代码实现
/**
* 2.kafka的broker
*/
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");
/**
* 2.1发送消息持久化机制参数ack
* 生产者会等3s,然后重试3次
*/
properties.put(ProducerConfig.ACKS_CONFIG,"1");
/**
* 2.12重试间隔的设置
* 默认间隔为100ms,次数为3次
*/
properties.put(ProducerConfig.RETRIES_CONFIG,3);
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
/**
* 2.2设置消息的缓冲区
* 消息先放到缓冲区,如果到16k就发送,未满足则进行等待10ms再发送
*/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
/**
* 2.3序列化:Key+value
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* 封装到发消息的客户端中
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
生产者消费者实例
生产者代码
package com.wyh.kafka_demo.kafka;
import com.alibaba.fastjson.JSON;
import com.wyh.kafka_demo.pojo.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MyProducer {
/**
* 1.创建主题
*/
private final static String TOPIC_NAME="jqTopic5";
public static void main(String[] args) throws InterruptedException {
/**
* 2.kafka的broker
*/
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");
/**
* 2.1发送消息持久化机制参数ack
* 生产者会等3s,然后重试3次
*/
properties.put(ProducerConfig.ACKS_CONFIG,"1");
/**
* 2.12重试间隔的设置
* 默认间隔为100ms,次数为3次
*/
properties.put(ProducerConfig.RETRIES_CONFIG,3);
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
/**
* 2.2设置消息的缓冲区
* 消息先放到缓冲区,如果到16k就发送,未满足则进行等待10ms再发送
*/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
/**
* 2.3序列化:Key+value
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* 封装到发消息的客户端中
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
/**
* 3.模拟发送五条消息
*/
int maxNum=5;
final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
for (int i = 1; i <= 5; i++) {
Order order = new Order((long) i, i);
/**
* 3.1消息记录发送给指定的分区
* hash(key)%partition
* ProducerRecord:消息记录
* key:消息标识key,帮助告诉发送到哪个分许通过hash计算
* value:json数据,消息内容
*/
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
/**
* 3.2通过异步的形式发送消息
*/
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
System.err.println("发送消息失败:"+e.getStackTrace());
}
if(recordMetadata!=null){
System.out.println("异步发送的消息为:"+"topic-"+recordMetadata.topic()
+"|partition-"+recordMetadata.partition()
+"|offset-"+recordMetadata.offset());
}
countDownLatch.countDown();
}
});
}
/**
* 4.主线程等待countDownlatch
*/
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
}
消费者代码:
package com.wyh.kafka_demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
private final static String TOPIC_NAME="jqTopic5";
private final static String CONSUMER_GROUP_NAME="testGroup";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"82.157.198.247:9092,82.157.198.247:9093");
/**
* 1.消费分组名
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"0");
/**
* 1.2设置序列化
*/
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
/**
* 2.创建一个消费者的客户端
*/
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
/**
* 3.消费者订阅主题列表
*/
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),
record.offset(),record.key(),record.value());
}
}
}
}
版权归原作者 Fairy要carry 所有, 如有侵权,请联系我们删除。