0


Kafka优雅源码设计2-KafkaProducer如何实现线程安全

1. 背景知识

1.1 线程安全的定义

我们可以看下《Java并发编程实战》在2.1章节中的定义:

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

进一步定义:
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

我们常用

synchronized

或者

Lock

结合来实现线程安全的代码,但并不代表我们的类就是线程安全的。除非类本身封装了所有必要的正确性保障手段,令调用者无需关心多线程下的调用问题,更无须自己实现任何措施来保证多线程环境下的正确调用。

1.2 线程安全的级别

按照线程安全的“安全强度”由强至弱来排序:不可变、绝对线程安全、相对线程安全、线程兼容和线程对立。

  1. 不可变 在Java语言里,不可变的对象一定是线程安全的,无论是对象方法的实现还是方法调用者,都不需要再进行任何线程安全保障措施。Java中把对象里面带有状态的变量都声明为final,这样在构造函数结束后它就是不可变的。 比如StringLongInteger等,还有我们的KafkaProducer
  2. 绝对线程安全 对象自身做了足够的内部同步,也不需要外部同步。如RandomConcurrentHashMap和atomic等。
  3. 相对线程安全 我们通常意义上所讲的线程安全,它需要保证对这个对象单次的操作是线程安全的;但是对于一些特定顺序的连续调用,仍需要调用端使用额外的同步手段来保证正确性。
  4. 线程兼容 指对象本身不是线程安全的,需要调用端正确地使用同步手段在并发环境下使用。说一个类不是线程安全的,通常就是这种情况。Java类库中大部分都属于线程兼容级别,如ArrayListHashMap等。
  5. 线程对立 没救了。不管调用端是否采取同步措施,都无法在多线程环境下并发使用。尽量避免使用这种代码。如Thread类的suspend()resume()方法等。

1.3 Java语言中如何实现线程安全

根据上面的线程安全级别,我们可以知道几种实现方式:

  1. 编写不可变类,最简单的方式就是所有私有变量设为final
  2. 在调用方,使用互斥同步的手段,属于阻塞同步。最常见的就是synchronized关键字,还有java.util.concurrent.locks.Lock
  3. 无锁编程,属于非阻塞同步。JDK类库中的CAS操作就是这种,如J.U.C包里面的整数原子类,其中的compareAndSet()getAndIncrement()等方法都是用CAS操作来实现。
  4. 可重入代码,简单理解就是无状态代码。只要输入相同数据,就能返回相同结果的代码,当然也是线程安全的。
  5. 线程本地存储:共享数据的可见范围就限制在线程内,这样无须同步也能保证线程间不会出现数据争用的情况。

2. KafkaProducer部分源码

publicclassKafkaProducer<K,V>implementsProducer<K,V>{privatefinalLogger log;privatestaticfinalString JMX_PREFIX ="kafka.producer";publicstaticfinalString NETWORK_THREAD_PREFIX ="kafka-producer-network-thread";publicstaticfinalString PRODUCER_METRIC_GROUP_NAME ="producer-metrics";privatefinalString clientId;// Visible for testingfinalMetrics metrics;privatefinalPartitioner partitioner;privatefinalint maxRequestSize;privatefinallong totalMemorySize;privatefinalProducerMetadata metadata;privatefinalRecordAccumulator accumulator;privatefinalSender sender;privatefinalThread ioThread;privatefinalCompressionType compressionType;privatefinalSensor errors;privatefinalTime time;privatefinalSerializer<K> keySerializer;privatefinalSerializer<V> valueSerializer;privatefinalProducerConfig producerConfig;privatefinallong maxBlockTimeMs;privatefinalProducerInterceptors<K,V> interceptors;privatefinalApiVersions apiVersions;privatefinalTransactionManager transactionManager;...}

KafkaProducer.java Github地址
如上所述, 不可变类型在并发编程中能带来很多好处,避免数据竞争带来的风险,提供更好的性能。
一些语言(如Scala)有明确的不可变类型声明,Java目前只能在定义类时将全部字段声明为final来间接实现。
KafkaProducer就是一个不可变类。

线程安全

的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

3. 总结&思考

本篇主要是为了水 介绍了线程安全的定义、几种级别,以及常用的实现方式。
可以看到

KafkaProducer

就是很简单的生成不可变对象来保证线程安全。其实常见的设计有很多,你可以举例并在讨论区一起讨论吗。
另外,想一下为什么

Spring

官方推荐构造器注入,而非接口注入和方法参数注入呢?如下:

@RestController@RequestMapping("/api")publicclassAccountResource{privatefinalUserService userService;privatefinalMailService mailService;publicAccountResource(UserService userService,MailService mailService){this.userService = userService;this.mailService = mailService;}......}

4. 引用

  1. 《深入理解Java虚拟机(第3版)》
  2. 《Java并发编程实战》
标签: kafka

本文转载自: https://blog.csdn.net/a9181454/article/details/123891744
版权归原作者 百里长庭 所有, 如有侵权,请联系我们删除。

“Kafka优雅源码设计2-KafkaProducer如何实现线程安全”的评论:

还没有评论