0


大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

publicinterfaceSerializer<T>extendsCloseable{/**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */defaultvoidconfigure(Map<String,?> configs,boolean isKey){// intentionally left blank}/**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */byte[]serialize(String topic,T data);/**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */defaultbyte[]serialize(String topic,Headers headers,T data){returnserialize(topic, data);}/**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */@Overridedefaultvoidclose(){// intentionally left blank}}

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

publicclassUser{privateString username;privateString password;privateInteger age;publicStringgetUsername(){return username;}publicvoidsetUsername(String username){this.username = username;}publicStringgetPassword(){return password;}publicvoidsetPassword(String password){this.password = password;}publicIntegergetAge(){return age;}publicvoidsetAge(Integer age){this.age = age;}}

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

publicclassUserSerilazerimplementsSerializer<User>{@Overridepublicvoidconfigure(Map<String,?> configs,boolean isKey){Serializer.super.configure(configs, isKey);}@Overridepublicbyte[]serialize(String topic,User data){if(null== data){returnnull;}int userId = data.getUserId();String username = data.getUsername();String password = data.getPassword();int age = data.getAge();int usernameLen =0;byte[] usernameBytes;if(null!= username){
            usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            usernameLen = usernameBytes.length;}else{
            usernameBytes =newbyte[0];}int passwordLen =0;byte[] passwordBytes;if(null!= password){
            passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            passwordLen = passwordBytes.length;}else{
            passwordBytes =newbyte[0];}ByteBuffer byteBuffer =ByteBuffer.allocate(4+4+ usernameLen +4+ passwordLen +4);
        byteBuffer.putInt(userId);
        byteBuffer.putInt(usernameLen);
        byteBuffer.put(usernameBytes);
        byteBuffer.putInt(passwordLen);
        byteBuffer.put(passwordBytes);
        byteBuffer.putInt(age);return byteBuffer.array();}@Overridepublicbyte[]serialize(String topic,Headers headers,User data){returnSerializer.super.serialize(topic, headers, data);}@Overridepublicvoidclose(){Serializer.super.close();}}

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
publicclassMyPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

本文转载自: https://blog.csdn.net/w776341482/article/details/140814104
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器”的评论:

还没有评论