一、概念
Spring for Apache Kafka项目将Spring的核心概念应用于基于Kafka的消息传递解决方案的开发。我们提供了一个“模板”作为发送消息的高级抽象。
二、开发环境准备
1、Kafka客户端版本
本快速教程适用于以下版本:
- Apache Kafka 客户端 3.3.x
- Spring Framework 6.0.x
- 最低 Java 版本:17
2、引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.5</version>
</dependency>
3、配置application.yml
spring:
kafka:
# kafka连接地址
bootstrap-servers: 192.168.1.1:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
# 自定义的kafka配置项
kafka:
topic:
name: TEST_TOPIC
group:
id: TEST_GROUP
三、Spring Boot Consumer App
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
四、Spring Boot Producer App
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
五、使用 Java 配置(无 Spring boot)
以下是一个不使用Spring Boot的应用程序示例;它既有消费者也有生产者。
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<Integer, String>(producerFactory);
}
}
正如您所看到的,当不使用Spring Boot时,您必须定义几个基础设施bean。
六、Springg整合Apache Kafka
1、连接Kafka
从2.5版本开始,每一个都扩展了KafkaResourceFactory。这允许在运行时更改引导服务器,方法是在其配置中添加一个Supplier<String>:setBootstrapServersSupplier(()→ …). 这将对所有新连接调用,以获取服务器列表。消费者和生产者通常寿命较长。要关闭现有的Producers,请在DefaultKafkaProducerFactory上调用reset()。要关闭现有的Consumers,请在KafkaListenerEndpointRegistry上调用stop()(然后调用start()),和/或在任何其他侦听器容器bean上调用stop()和start()。
为了方便起见,该框架还提供了一个ABSwitchCluster,它支持两组引导服务器;其中一个在任何时候都是活动的。通过调用setBootstrapServersSupplier(),配置ABSwitchCluster并将其添加到生产者和消费者工厂以及KafkaAdmin。当你想切换时,在生产者工厂上调用primary()或secondary()并调用reset()来建立新的连接;对于消费者,停止()和启动()所有侦听器容器。当使用@KafkaListener时,停止()和启动()KafkaListenerEndpointRegistry bean。
Factory Listeners
从2.5版本开始,DefaultKafkaProducerFactory和DefaultKafka ConsumerFactory可以配置一个监听器,以便在创建或关闭生产者或消费者时接收通知。
**Producer Factory Listener **
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
Consumer Factory Listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id都是通过将客户端id属性(创建后从metrics()获得)附加到工厂beanName属性来创建的,用..分隔。。
例如,当创建新客户端时,可以使用这些监听器来创建和绑定Micrometer KafkaClientMetrics实例(并在客户端关闭时关闭它)。
2、配置Topics
如果您在应用程序上下文中定义了一个KafkaAdminbean,它可以自动向代理添加主题。为此,您可以为应用程序上下文中的每个主题添加一个NewTopic@Bean。2.3版引入了一个新的类TopicBuilder,使创建此类bean更加方便。以下示例显示了如何执行此操作:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
从2.6版开始,您可以省略.partitions()和/或replicas(),代理默认值将应用于这些财产。代理版本必须至少为2.4.0才能支持此功能-请参阅KIP-464。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
从2.7版本开始,您可以在一个KafkaAdmin.NewTopics bean定义中声明多个NewTopic:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。您可以通过编程方式调用admin的initialize()方法,稍后再试。如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable属性设置为true。然后上下文无法初始化。
从2.7版本开始,KafkaAdmin提供了在运行时创建和检查主题的方法
- 创建或修改主题
- 描述主题
对于更高级的功能,您可以直接使用AdminClient。以下示例显示了如何执行此操作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
3、发送消息
3.1 用
KafkaTemplate
KafkaTemplate包装了一个生产者,并提供了向Kafka主题发送数据的方便方法。以下列表显示了KafkaTemplate中的相关方法:
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
sendDefault API要求为模板提供一个默认主题。
API将时间戳作为参数,并将此时间戳存储在记录中。如何存储用户提供的时间戳取决于在Kafka主题上配置的时间戳类型。如果将主题配置为使用CREATE_TIME,则会记录用户指定的时间戳(如果未指定,则会生成时间戳)。如果将主题配置为使用LOG_APPEND_TIME,则会忽略用户指定的时间戳,并且代理会添加本地代理时间。
方法的metrics和partitionsFor委托给基础Producer上的相同方法。execute方法提供对基础Producer的直接访问。
要使用该模板,您可以配置一个生产者工厂,并在模板的构造函数中提供它。以下示例显示了如何执行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从2.5版开始,您现在可以覆盖工厂的ProducerConfig财产,以使用同一工厂的不同生产商配置创建模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,ProducerFactory<?,?>类型的bean(例如Spring Boot自动配置的)可以用不同的缩小的泛型类型引用。
您还可以使用标准的<bean/>定义来配置模板。
然后,要使用该模板,您可以调用它的一个方法。
当您将方法与消息一起使用时<?>参数、主题、分区和密钥信息在消息头中提供,该消息头包括以下项目:
- KafkaHeaders.TOPIC(卡夫卡标题主题)
- KafkaHeaders.PARTITION(Kafka标头分区)
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP(卡夫卡标题时间戳)
消息有效载荷是数据。
或者,您可以使用ProducerListener配置KafkaTemplate,以获得带有发送结果(成功或失败)的异步回调,而不是等待Future完成。以下列表显示了ProducerListener接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,该模板配置有LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何操作。
为了方便起见,在您只想实现其中一个方法的情况下,提供了默认的方法实现。
请注意,send方法返回一个CompletableFuture<SendResult>。您可以向侦听器注册回调,以异步接收发送的结果。以下示例显示了如何执行此操作:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult有两个财产,即ProducerRecord和RecordMetadata。有关这些对象的信息,请参阅KafkaAPI文档。
Throwable可以强制转换为KafkaProducerException;其failedProducerRecord属性包含失败的记录。
如果您希望阻止发送线程等待结果,您可以调用future的get()方法;建议使用带有超时的方法。如果您设置了linger.ms,您可能希望在等待之前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该参数会导致模板在每次发送时flush(()。只有在设置了linger.ms生产者属性并希望立即发送部分批次时,才需要刷新。
3.2 例子
本节显示了向 Kafka 发送消息的示例:
例 5.非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
阻止(同步)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,ExecutionException的原因是具有failedProducerRecord属性的KafkaProducerException。
3.3 使用RoutingKafkaTemplate
从2.5版本开始,您可以使用RoutingKafkaTemplate在运行时根据目标主题名称选择生产者。
该模板需要java.util.regex.Pattern到ProducerFactory<Object,Object>实例的映射。应该对该映射进行排序(例如LinkedHashMap),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。
以下简单的Spring Boot应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化程序。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
3.4 使用
DefaultKafkaProducerFactory
如使用KafkaTemplate中所示,ProducerFactory用于创建生产者。
默认情况下,当不使用Transactions时,DefaultKafkaProducerFactory会创建一个供所有客户端使用的单例生产者,如Kafka生产者javadocs中所建议的那样。但是,如果您在模板上调用flush(),这可能会导致使用同一生成器的其他线程延迟。从2.3版本开始,DefaultKafkaProducerFactory有了一个新的属性producerPerThread。当设置为true时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。
另请参阅KafkaTemplate事务性和非事务性发布。
创建DefaultKafkaProducerFactory时,可以通过调用只接受财产映射的构造函数从配置中获取键和/或值Serializer类(请参阅使用KafkaTemplate中的示例),也可以将Serializers实例传递给DefaultKaf kaProducer Factory构造函数(在这种情况下,所有Producer共享相同的实例)。或者,您可以提供供应商<Serializer>(从版本2.3开始),用于为每个生产商获取单独的Serializer实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从2.5.10版开始,现在可以在创建工厂后更新生产者财产。例如,如果在凭据更改后必须更新SSL密钥/信任存储位置,这可能会很有用。这些变化不会影响现有的生产者实例;调用reset()关闭所有现有生产者,以便使用新的财产创建新生产者。注意:不能将事务性生产者工厂更改为非事务性,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从2.8版开始,如果您将序列化程序作为对象提供(在构造函数中或通过setter),工厂将调用configure()方法以使用配置财产配置它们。
3.5 Reply Type Message<?>
当@KafkaListener返回消息<?>时,对于2.5之前的版本,有必要填充回复主题和关联id头。在本例中,我们使用请求中的回复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这也显示了如何在回复记录上设置密钥。
从2.5版本开始,框架将检测这些头是否丢失,并用主题填充它们——根据@SendTo值确定的主题或传入的KafkaHeaders.REPLY_topic头(如果存在)。它还将回显传入的KafkaHeaders.CORRELATION_ID和KafkaHeaders.REPLY_PARTITION(如果存在)。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
3.6 使用
ReplyingKafkaTemplate
版权归原作者 Doker 多克 技术人的数码品牌 所有, 如有侵权,请联系我们删除。