0


java发送数据到kafka

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import java.util.Properties;
  9. public class KafkaTransactionalProducer {
  10. public static void main(String[] args) {
  11. Properties props = new Properties();
  12. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.1.1:9000,4.4.4.4:8888");
  13. props.put("security.protocol", "SASL_PLAINTEXT");
  14. props.put("sasl.mechanism", "SCRAM-SHA-256");
  15. props.put("sasl.kerberos.service.name", "target");
  16. props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ee\" password=\"eres\";");
  17. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  18. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  19. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  20. props.put(ProducerConfig.ACKS_CONFIG, "all");
  21. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-unique-transactional-id"); // 自定义唯一事务ID
  22. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  23. try {
  24. producer.initTransactions(); // 初始化事务
  25. producer.beginTransaction(); // 开始事务
  26. String topic = "your_topic";
  27. String message = "{\"key\": \"value\"}"; // 要发送的消息
  28. ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
  29. producer.send(record, new Callback() {
  30. public void onCompletion(RecordMetadata metadata, Exception exception) {
  31. if (exception != null) {
  32. System.err.println("Error sending message: " + exception.getMessage());
  33. } else {
  34. System.out.println("Message sent successfully to topic " + metadata.topic() +
  35. " partition " + metadata.partition() +
  36. " with offset " + metadata.offset());
  37. }
  38. }
  39. });
  40. producer.commitTransaction(); // 提交事务
  41. } catch (KafkaException e) {
  42. producer.abortTransaction(); // 事务失败,回滚
  43. e.printStackTrace();
  44. } finally {
  45. producer.close();
  46. }
  47. }
  48. }
  49. 把这些代码改成一个独立的应用服务有自己的端口通过post请求传递json报文,然后给kafka发送消息

要将 Kafka 生产者代码改成一个独立的应用服务,能够通过 POST 请求接收 JSON 报文并将其发送到 Kafka,你可以使用 Spring Boot 框架来实现这个功能。Spring Boot 提供了便捷的 REST API 开发和配置管理功能。

以下是一个基于 Spring Boot 的示例项目,展示了如何实现这个需求:

1. 创建 Spring Boot 项目

首先,你需要创建一个 Spring Boot 项目。如果你使用 Spring Initializr,可以选择以下依赖:

  • Spring Web
  • Spring Kafka

2. 添加 Maven 依赖

  1. pom.xml

文件中,确保包含以下依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. </dependencies>

3. 配置 Kafka 生产者

  1. application.properties

文件中配置 Kafka 生产者属性:

  1. spring.kafka.producer.bootstrap-servers=10.1.1.1:9000,4.4.4.4:8888
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.enable-idempotence=true
  5. spring.kafka.producer.acks=all
  6. spring.kafka.producer.transaction-id-prefix=my-unique-transactional-id
  7. spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
  8. spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
  9. spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";
  10. spring.kafka.producer.properties.sasl.kerberos.service.name=target

4. 实现 Kafka 生产者服务

创建一个

  1. KafkaProducerService

类来处理消息发送:

  1. package com.example.kafkaproducer.service;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import org.springframework.stereotype.Service;
  12. @Service
  13. public class KafkaProducerService {
  14. private final KafkaTemplate<String, String> kafkaTemplate;
  15. @Autowired
  16. public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  17. this.kafkaTemplate = kafkaTemplate;
  18. }
  19. public void sendMessage(String topic, String message) {
  20. kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  21. kafkaTemplate.send(topic, message).addCallback(new org.springframework.util.concurrent.ListenableFutureCallback<>() {
  22. @Override
  23. public void onSuccess(RecordMetadata result) {
  24. System.out.println("Message sent successfully to topic " + result.topic() +
  25. " partition " + result.partition() +
  26. " with offset " + result.offset());
  27. }
  28. @Override
  29. public void onFailure(Throwable ex) {
  30. System.err.println("Error sending message: " + ex.getMessage());
  31. }
  32. });
  33. return null;
  34. });
  35. }
  36. }

5. 创建 REST Controller

实现一个 REST 控制器来处理 POST 请求:

  1. package com.example.kafkaproducer.controller;
  2. import com.example.kafkaproducer.service.KafkaProducerService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("/api")
  7. public class KafkaController {
  8. private final KafkaProducerService kafkaProducerService;
  9. @Autowired
  10. public KafkaController(KafkaProducerService kafkaProducerService) {
  11. this.kafkaProducerService = kafkaProducerService;
  12. }
  13. @PostMapping("/send")
  14. public String sendMessage(@RequestBody String message) {
  15. String topic = "your_topic";
  16. kafkaProducerService.sendMessage(topic, message);
  17. return "Message sent to Kafka";
  18. }
  19. }

6. 启动 Spring Boot 应用

在你的

  1. Application

类中启动 Spring Boot 应用:

  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaProducerApplication.class, args);
  8. }
  9. }

7. 运行应用并测试

启动 Spring Boot 应用后,你可以通过 POST 请求将 JSON 消息发送到

  1. http://localhost:8080/api/send

,比如使用

  1. curl

  1. curl -X POST http://localhost:8080/api/send -H "Content-Type: application/json" -d '{"key": "value"}'

这样,Kafka 生产者服务就可以通过 REST API 接收消息并将其发送到 Kafka 了。

上面的用java的1.8实现

如果你使用 Java 1.8,你可以按照以下步骤来实现一个独立的应用服务,该服务通过 POST 请求接收 JSON 报文并将其发送到 Kafka。我们将不使用 Spring Boot,而是使用更基础的 Java Servlet 技术来完成这个任务。

1. 创建 Maven 项目

确保你的

  1. pom.xml

包含必要的依赖:

  1. <dependencies>
  2. <!-- Spring Boot Starter Web -->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-web</artifactId>
  6. </dependency>
  7. <!-- Spring Boot Starter Kafka -->
  8. <dependency>
  9. <groupId>org.springframework.kafka</groupId>
  10. <artifactId>spring-kafka</artifactId>
  11. </dependency>
  12. <!-- JSON处理 -->
  13. <dependency>
  14. <groupId>com.google.code.gson</groupId>
  15. <artifactId>gson</artifactId>
  16. <version>2.10.1</version>
  17. </dependency>
  18. </dependencies>

2. 配置 Kafka 生产者

创建

  1. KafkaProducerConfig

类来配置 Kafka 生产者:

  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class KafkaProducerConfig {
  5. public static Properties getProducerProperties() {
  6. Properties props = new Properties();
  7. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.1.1:9000,4.4.4.4:8888");
  8. props.put("security.protocol", "SASL_PLAINTEXT");
  9. props.put("sasl.mechanism", "SCRAM-SHA-256");
  10. props.put("sasl.kerberos.service.name", "target");
  11. props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ee\" password=\"eres\";");
  12. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  15. props.put(ProducerConfig.ACKS_CONFIG, "all");
  16. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-unique-transactional-id");
  17. return props;
  18. }
  19. }

3. 实现 Kafka 生产者 Servlet

创建

  1. KafkaProducerServlet

类来处理 POST 请求并将消息发送到 Kafka:

  1. import com.google.gson.Gson;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import javax.servlet.ServletException;
  8. import javax.servlet.annotation.WebServlet;
  9. import javax.servlet.http.HttpServlet;
  10. import javax.servlet.http.HttpServletRequest;
  11. import javax.servlet.http.HttpServletResponse;
  12. import java.io.IOException;
  13. @WebServlet("/api/send")
  14. public class KafkaProducerServlet extends HttpServlet {
  15. private KafkaProducer<String, String> producer;
  16. @Override
  17. public void init() throws ServletException {
  18. super.init();
  19. producer = new KafkaProducer<>(KafkaProducerConfig.getProducerProperties());
  20. }
  21. @Override
  22. protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  23. String topic = "your_topic";
  24. StringBuilder stringBuilder = new StringBuilder();
  25. String line;
  26. while ((line = req.getReader().readLine()) != null) {
  27. stringBuilder.append(line);
  28. }
  29. String message = stringBuilder.toString();
  30. ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
  31. producer.send(record, new Callback() {
  32. @Override
  33. public void onCompletion(RecordMetadata metadata, Exception exception) {
  34. if (exception != null) {
  35. System.err.println("Error sending message: " + exception.getMessage());
  36. } else {
  37. System.out.println("Message sent successfully to topic " + metadata.topic() +
  38. " partition " + metadata.partition() +
  39. " with offset " + metadata.offset());
  40. }
  41. }
  42. });
  43. resp.setContentType("text/plain");
  44. resp.getWriter().write("Message sent to Kafka");
  45. }
  46. @Override
  47. public void destroy() {
  48. super.destroy();
  49. if (producer != null) {
  50. producer.close();
  51. }
  52. }
  53. }

4. 配置

  1. web.xml

如果你不使用注解,可以在

  1. web.xml

中配置 Servlet:

  1. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
  4. http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
  5. version="4.0">
  6. <servlet>
  7. <servlet-name>KafkaProducerServlet</servlet-name>
  8. <servlet-class>com.example.KafkaProducerServlet</servlet-class>
  9. </servlet>
  10. <servlet-mapping>
  11. <servlet-name>KafkaProducerServlet</servlet-name>
  12. <url-pattern>/api/send</url-pattern>
  13. </servlet-mapping>
  14. </web-app>

5. 部署和运行

将你的应用打包成一个 WAR 文件并部署到支持 Servlet 的服务器(如 Apache Tomcat)。然后,你可以通过 POST 请求将 JSON 消息发送到

  1. http://localhost:8080/your-app/api/send

例如,使用

  1. curl

来测试:

  1. bashcurl -X POST http://localhost:8080/your-app/api/send -H "Content-Type: application/json" -d '{"key": "value"}'

这样,你就完成了将 Kafka 生产者代码改成一个独立的应用服务,通过 POST 请求接收 JSON 报文并发送到 Kafka 的任务。

上面的用java的1.8springboot实现

要使用 Java 1.8 和 Spring Boot 实现一个独立的应用服务,通过 POST 请求接收 JSON 报文并将其发送到 Kafka,你可以按照以下步骤进行。

1. 创建 Spring Boot 项目

可以使用 Spring Initializr 来创建一个新的 Spring Boot 项目。选择以下依赖:

  • Spring Web
  • Spring Kafka

2. 添加 Maven 依赖

  1. pom.xml

文件中添加 Kafka 和 Spring Boot 相关的依赖:

  1. <dependencies>
  2. <!-- Spring Boot Starter Web -->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-web</artifactId>
  6. </dependency>
  7. <!-- Spring Boot Starter Kafka -->
  8. <dependency>
  9. <groupId>org.springframework.kafka</groupId>
  10. <artifactId>spring-kafka</artifactId>
  11. </dependency>
  12. <!-- JSON处理 -->
  13. <dependency>
  14. <groupId>com.google.code.gson</groupId>
  15. <artifactId>gson</artifactId>
  16. <version>2.10.1</version>
  17. </dependency>
  18. </dependencies>

3. 配置 Kafka 生产者

  1. application.properties

文件中配置 Kafka 生产者属性:

  1. spring.kafka.producer.bootstrap-servers=10.1.1.1:9000,4.4.4.4:8888
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.enable-idempotence=true
  5. spring.kafka.producer.acks=all
  6. spring.kafka.producer.transaction-id-prefix=my-unique-transactional-id
  7. spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
  8. spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
  9. spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";
  10. spring.kafka.producer.properties.sasl.kerberos.service.name=target

4. 创建 Kafka 生产者服务

创建一个

  1. KafkaProducerService

类来处理消息发送:

  1. package com.example.kafkaproducer.service;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.kafka.core.ProducerFactory;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.annotation.KafkaListener;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.stereotype.Service;
  9. @Service
  10. public class KafkaProducerService {
  11. private final KafkaTemplate<String, String> kafkaTemplate;
  12. @Autowired
  13. public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  14. this.kafkaTemplate = kafkaTemplate;
  15. }
  16. public void sendMessage(String topic, String message) {
  17. kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  18. kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<>() {
  19. @Override
  20. public void onSuccess(RecordMetadata result) {
  21. System.out.println("Message sent successfully to topic " + result.topic() +
  22. " partition " + result.partition() +
  23. " with offset " + result.offset());
  24. }
  25. @Override
  26. public void onFailure(Throwable ex) {
  27. System.err.println("Error sending message: " + ex.getMessage());
  28. }
  29. });
  30. return null;
  31. });
  32. }
  33. }

5. 创建 REST Controller

创建一个 REST 控制器来处理 POST 请求:

  1. package com.example.kafkaproducer.controller;
  2. import com.example.kafkaproducer.service.KafkaProducerService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("/api")
  7. public class KafkaController {
  8. private final KafkaProducerService kafkaProducerService;
  9. @Autowired
  10. public KafkaController(KafkaProducerService kafkaProducerService) {
  11. this.kafkaProducerService = kafkaProducerService;
  12. }
  13. @PostMapping("/send")
  14. public String sendMessage(@RequestBody String message) {
  15. String topic = "your_topic";
  16. kafkaProducerService.sendMessage(topic, message);
  17. return "Message sent to Kafka";
  18. }
  19. }

6. 启动 Spring Boot 应用

在你的

  1. Application

类中启动 Spring Boot 应用:

  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaProducerApplication.class, args);
  8. }
  9. }

7. 运行应用并测试

启动 Spring Boot 应用后,你可以通过 POST 请求将 JSON 消息发送到

  1. http://localhost:8080/api/send

,例如使用

  1. curl

  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaProducerApplication.class, args);
  8. }
  9. }

额外提示

  1. 异常处理: 在实际应用中,你可能需要添加更完善的异常处理和日志记录。
  2. 安全性: 确保你的应用具有适当的安全配置,尤其是对外暴露的 API。

这样,你就完成了使用 Java 1.8 和 Spring Boot 实现 Kafka 生产者服务的任务,通过 POST 请求接收 JSON 报文并将其发送到 Kafka。

public void onSuccess(RecordMetadata result) { 我这个继承的参数不是RecordMetadata result

如果你的

  1. onSuccess

方法的参数不是

  1. RecordMetadata result

,那么你可能在使用

  1. ListenableFutureCallback

接口的实现时遇到了问题。在 Kafka 的生产者 API 中,

  1. ListenableFutureCallback

接口通常用于处理

  1. send

方法的回调。

请确保你正确地实现了

  1. ListenableFutureCallback

接口。以下是一个完整的示例,展示了如何实现 Kafka 生产者的回调处理,包括

  1. onSuccess

  1. onFailure

方法。

完整示例:KafkaProducerService 类

在这个示例中,我们将使用

  1. ListenableFutureCallback

处理 Kafka 生产的结果。

  1. package com.example.kafkaproducer.service;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.core.ProducerFactory;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.stereotype.Service;
  12. import org.springframework.util.concurrent.ListenableFutureCallback;
  13. @Service
  14. public class KafkaProducerService {
  15. private final KafkaTemplate<String, String> kafkaTemplate;
  16. @Autowired
  17. public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  18. this.kafkaTemplate = kafkaTemplate;
  19. }
  20. public void sendMessage(String topic, String message) {
  21. kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  22. kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<>() {
  23. @Override
  24. public void onSuccess(RecordMetadata result) {
  25. System.out.println("Message sent successfully to topic " + result.topic() +
  26. " partition " + result.partition() +
  27. " with offset " + result.offset());
  28. }
  29. @Override
  30. public void onFailure(Throwable ex) {
  31. System.err.println("Error sending message: " + ex.getMessage());
  32. }
  33. });
  34. return null;
  35. });
  36. }
  37. }

关键点

  1. KafkaTemplate 的回调: 使用 kafkaTemplate.send().addCallback 方法可以处理消息发送的结果。addCallback 方法接受一个 ListenableFutureCallback 对象来处理成功和失败的回调。
  2. 实现 ListenableFutureCallback: ListenableFutureCallback 是 Spring 提供的接口,用于处理异步操作的成功和失败结果。你需要实现 onSuccessonFailure 方法。
  3. ProducerRecordRecordMetadata: ProducerRecord 用于构建消息记录,RecordMetadata 用于接收发送消息后的元数据。

使用

  1. Callback

接口的示例

如果你直接使用 Kafka 的

  1. Callback

接口而不是 Spring 的

  1. ListenableFutureCallback

,示例如下:

  1. import org.apache.kafka.clients.producer.Callback;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. public class KafkaProducerCallback implements Callback {
  4. @Override
  5. public void onCompletion(RecordMetadata metadata, Exception exception) {
  6. if (exception != null) {
  7. System.err.println("Error sending message: " + exception.getMessage());
  8. } else {
  9. System.out.println("Message sent successfully to topic " + metadata.topic() +
  10. " partition " + metadata.partition() +
  11. " with offset " + metadata.offset());
  12. }
  13. }
  14. }

然后在

  1. KafkaProducerService

中使用这个

  1. Callback

类:

  1. public void sendMessage(String topic, String message) {
  2. kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  3. kafkaTemplate.send(topic, message).addCallback(new KafkaProducerCallback());
  4. return null;
  5. });
  6. }

这样,你就可以确保 Kafka 消息发送的结果被正确地处理,无论是成功还是失败。

public void onSuccess(RecordMetadata result) { 我这个继承的参数shi Object o怎么回事

如果你在实现 Kafka 生产者的回调时遇到了

  1. onSuccess

方法的参数类型为

  1. Object o

的问题,这可能是由于使用了不同的回调接口或配置。

在 Kafka 中,

  1. send

方法的回调接口通常是

  1. Callback

,其

  1. onCompletion

方法的签名如下:

  1. void onCompletion(RecordMetadata metadata, Exception exception);

如果你的实现中

  1. onSuccess

方法的参数类型是

  1. Object

,可能你在使用的是与 Kafka 原生 API 不匹配的回调接口或在自定义实现时出现了问题。

确保使用正确的回调接口

以下是如何正确实现 Kafka 的

  1. Callback

接口,并处理

  1. onSuccess

  1. onFailure

方法:

1. Kafka
  1. Callback

接口的正确实现

  1. import org.apache.kafka.clients.producer.Callback;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. public class KafkaProducerCallback implements Callback {
  5. @Override
  6. public void onCompletion(RecordMetadata metadata, Exception exception) {
  7. if (exception != null) {
  8. System.err.println("Error sending message: " + exception.getMessage());
  9. } else {
  10. System.out.println("Message sent successfully to topic " + metadata.topic() +
  11. " partition " + metadata.partition() +
  12. " with offset " + metadata.offset());
  13. }
  14. }
  15. }
2. 使用 Kafka
  1. Callback

  1. send

方法

在 Kafka 生产者服务中使用上述回调:

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class KafkaProducerService {
  9. private final KafkaTemplate<String, String> kafkaTemplate;
  10. @Autowired
  11. public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  12. this.kafkaTemplate = kafkaTemplate;
  13. }
  14. public void sendMessage(String topic, String message) {
  15. kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  16. kafkaTemplate.send(topic, message).addCallback(new KafkaProducerCallback());
  17. return null;
  18. });
  19. }
  20. }

自定义实现中的

  1. Object o

问题

如果你看到

  1. onSuccess(Object o)

,这可能是你在自定义回调或在其他上下文中使用了不同的回调接口。请检查以下几点:

  1. 检查回调接口类型:确保你的回调接口实现的是 Kafka 的 Callback 接口,而不是其他的接口或自定义接口。
  2. 回调方法的签名:确保 onSuccess 方法的签名与 Callback 接口的要求匹配。如果你的代码中 onSuccess 方法接受 Object 类型的参数,可能需要更改为 RecordMetadata 类型。
  3. 代码示例:确保在代码中,send 方法的回调处理与 Kafka 的回调接口一致。

结论

确保你使用的回调接口正确且实现了 Kafka 的

  1. Callback

接口,以便

  1. onCompletion

方法可以正确处理消息的发送结果。如果你遇到不同类型的回调接口,可能需要检查代码的其他部分或依赖项以确保一致

标签: linq c#

本文转载自: https://blog.csdn.net/qq_33192454/article/details/141939629
版权归原作者 qq_33192454 所有, 如有侵权,请联系我们删除。

“java发送数据到kafka”的评论:

还没有评论