消息中间件对比:
1、吞吐、可靠性、性能
Kafka安装
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
- Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d--name zookeeper -p2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d--name kafka \--envKAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \--envKAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \--envKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \--envKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \--envKAFKA_HEAP_OPTS="-Xmx256M -Xms256M"\--net=host wurstmeister/kafka:2.12-2.3.1
kafka入门
- 生产者发送消息,多个消费者只能有一个消费者接收到消息
- 生产者发送消息,多个消费者都可以接收到消息
(1)创建kafka-demo项目,导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
(2)生产者发送消息
packagecom.heima.kafka.sample;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;/**
* 生产者
*/publicclassProducerQuickStart{publicstaticvoidmain(String[] args){//1.kafka的配置信息Properties properties =newProperties();//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(properties);//封装发送的消息ProducerRecord<String,String> record =newProducerRecord<String,String>("itheima-topic","100001","hello kafka");//3.发送消息
producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功
producer.close();}}
(3)消费者接收消息
packagecom.heima.kafka.sample;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;/**
* 消费者
*/publicclassConsumerQuickStart{publicstaticvoidmain(String[] args){//1.添加kafka的配置信息Properties properties =newProperties();//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);//3.订阅主题
consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while(true){//4.获取消息ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
kafka高可用设计
1、设计集群模式:
Kafka的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了,另外一个机器就会替补山
2、备份机制:
Kafka定义了两类副本
- 领导者副本(Leader Replica)
- 追随者副本 (Follower Replica) 追随者副本分为两类: 1、一种是ISR副本,同步保存 2、普通的副本,异步保存 出现主节点宕机,会先选ISR副本中的一个成为新的主节点,保证数据一致性,没有ISR节点,再从普通节点中挑选 针对全部节点宕机的情况,有两种策略: 1、等待第一个ISR副本,保证了数据的尽可能一致 2、等待一个复活的追随者,无论是ISR还是普通,提高系统的高可用性。
kafka生产者详解
1发送类型
- 同步发送使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());
- 异步发送调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){if(e !=null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});
2参数详解
- ack
代码的配置方式:
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
参数的选择说明
确认机制****说明acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
- retries
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
- 消息压缩
默认情况下, 消息发送时不会被压缩。
代码中配置方式:
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法****说明snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
kafka消费者
消息的有序性
方法:一个topic分区能保证自己的数据是按照先后消费的,但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区,在单分区种,消息可以保证严格顺序消费
提交和偏移量
自动提交:
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去,这样只是记录了规定时间内的最大偏移量,其实与数据提交的偏移量存在偏差,因此可能会出现数据的重复提交或者丢失
手动提交
当enableauto.commit被设置为false可以有以下三种提交方式
- 提交当前偏移量(同步提交)
- 异步提交
- 同步和异步组合提交
同步提交:commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());try{
consumer.commitSync();//同步提交当前最新的偏移量}catch(CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}}
异步提交:手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。消息没有重试机制
while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
consumer.commitAsync(newOffsetCommitCallback(){@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e){if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});}
同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try{while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
consumer.commitAsync();}}catch(Exception e){+
e.printStackTrace();System.out.println("记录错误信息:"+e);}finally{try{
consumer.commitSync();}finally{
consumer.close();}}
springboot整合kafka
1、在父类中的pop文件中导入依赖包
```xml
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置
spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
传递消息为对象
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
- 发送消息
@GetMapping("/hello")publicStringhello(){User user =newUser();
user.setUsername("xiaowang");
user.setAge(18);
kafkaTemplate.send("user-topic",JSON.toJSONString(user));return"ok";}
- 接收消息
packagecom.heima.kafka.listener;importcom.alibaba.fastjson.JSON;importcom.heima.kafka.pojo.User;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="user-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){User user =JSON.parseObject(message,User.class);System.out.println(user);}}}
版权归原作者 不减到100斤不吃锅包肉 所有, 如有侵权,请联系我们删除。