点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下的内容:
- 消费者的基本流程
- 消费者的参数、参数补充
序列化器
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。
publicinterfaceSerializer<T>extendsCloseable{/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/defaultvoidconfigure(Map<String,?> configs,boolean isKey){// intentionally left blank}/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/byte[]serialize(String topic,T data);/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param headers headers associated with the record
* @param data typed data
* @return serialized bytes
*/defaultbyte[]serialize(String topic,Headers headers,T data){returnserialize(topic, data);}/**
* Close this serializer.
* <p>
* This method must be idempotent as it may be called multiple times.
*/@Overridedefaultvoidclose(){// intentionally left blank}}
其中Kafka也内置了一些实现好的序列化器:
- ByteArraySerializer
- StringSerializer
- DoubleSerializer
- 等等… 具体可以自行查看
自定义序列化器
自定义实体类
实现一个简单的类:
publicclassUser{privateString username;privateString password;privateInteger age;publicStringgetUsername(){return username;}publicvoidsetUsername(String username){this.username = username;}publicStringgetPassword(){return password;}publicvoidsetPassword(String password){this.password = password;}publicIntegergetAge(){return age;}publicvoidsetAge(Integer age){this.age = age;}}
实现序列化
注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!
publicclassUserSerilazerimplementsSerializer<User>{@Overridepublicvoidconfigure(Map<String,?> configs,boolean isKey){Serializer.super.configure(configs, isKey);}@Overridepublicbyte[]serialize(String topic,User data){if(null== data){returnnull;}int userId = data.getUserId();String username = data.getUsername();String password = data.getPassword();int age = data.getAge();int usernameLen =0;byte[] usernameBytes;if(null!= username){
usernameBytes = username.getBytes(StandardCharsets.UTF_8);
usernameLen = usernameBytes.length;}else{
usernameBytes =newbyte[0];}int passwordLen =0;byte[] passwordBytes;if(null!= password){
passwordBytes = password.getBytes(StandardCharsets.UTF_8);
passwordLen = passwordBytes.length;}else{
passwordBytes =newbyte[0];}ByteBuffer byteBuffer =ByteBuffer.allocate(4+4+ usernameLen +4+ passwordLen +4);
byteBuffer.putInt(userId);
byteBuffer.putInt(usernameLen);
byteBuffer.put(usernameBytes);
byteBuffer.putInt(passwordLen);
byteBuffer.put(passwordBytes);
byteBuffer.putInt(age);return byteBuffer.array();}@Overridepublicbyte[]serialize(String topic,Headers headers,User data){returnSerializer.super.serialize(topic, headers, data);}@Overridepublicvoidclose(){Serializer.super.close();}}
分区器
默认情况下的分区计算:
- 如果Record提供了分区号,则使用Record提供的分区号
- 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
- 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号
我们在这里可以看到对应的内容:
org.apache.kafka.clients.producer
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
自定义分区器
如果要自定义分区器, 需要:
- 首先开发Partitioner接口中的实现类
- 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
publicclassMyPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。