0


使用kafka-clients操作数据(java)

一、添加依赖

  1. <!-- kafka-clients-->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>3.5.1</version>
  6. </dependency>

二、生产者

自定义分区,可忽略

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. public class MyPatitioner implements Partitioner {
  5. @Override
  6. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  7. String msgStr = value.toString();
  8. if(msgStr.contains("a")){
  9. return 1;
  10. }
  11. return 0;
  12. }
  13. @Override
  14. public void close() {
  15. }
  16. @Override
  17. public void configure(Map<String, ?> configs) {
  18. }
  19. }

1、普通消息

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. //配置
  3. Properties properties = new Properties();
  4. //连接参数
  5. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
  6. //序列化
  7. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  9. //关联自定义分区器 可选
  10. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
  11. //优化参数 可选
  12. //缓冲器大小 32M
  13. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
  14. //批次大小
  15. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
  16. //Linger.ms
  17. properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
  18. //压缩
  19. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  20. //acks
  21. properties.put(ProducerConfig.ACKS_CONFIG, "-1");
  22. //重试次数
  23. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  24. //创建生产者
  25. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  26. //异步发送数据
  27. for (int i = 0; i < 10; i++) {
  28. //给first主题发消息
  29. kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
  30. //回调异步发送
  31. kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
  32. @Override
  33. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  34. if (e == null) {
  35. System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
  36. }
  37. }
  38. });
  39. kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
  40. @Override
  41. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  42. if (e == null) {
  43. System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");
  44. }
  45. }
  46. });
  47. Thread.sleep(500);
  48. }
  49. //同步
  50. for (int i = 0; i < 10; i++) {
  51. //给first主题发消息
  52. kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
  53. }
  54. //关闭资源
  55. kafkaProducer.close();
  56. }
  1. root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
  2. a0
  3. hello0
  4. hello20
  5. a1
  6. hello1
  7. hello21
  8. a2
  9. hello2
  10. hello22
  11. a3
  12. hello3
  13. hello23
  14. a4
  15. hello4
  16. hello24
  17. a5
  18. hello5
  19. hello25
  20. a6
  21. hello6
  22. hello26
  23. a7
  24. hello7
  25. hello27
  26. a8
  27. hello8
  28. hello28
  29. a9
  30. hello9
  31. hello29
  32. sync_hello0
  33. sync_hello1
  34. sync_hello2
  35. sync_hello3
  36. sync_hello4
  37. sync_hello5
  38. sync_hello6
  39. sync_hello7
  40. sync_hello8
  41. sync_hello9

2、事务消息

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. //配置
  3. Properties properties = new Properties();
  4. //连接参数
  5. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
  6. //序列化
  7. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  9. //关联自定义分区器 可选
  10. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
  11. //优化参数 可选
  12. //缓冲器大小 32M
  13. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
  14. //批次大小
  15. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
  16. //Linger.ms
  17. properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
  18. //压缩
  19. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  20. //acks
  21. properties.put(ProducerConfig.ACKS_CONFIG, "-1");
  22. //重试次数
  23. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  24. //指定事务ID
  25. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
  26. properties.put("enable.idempotence", "true");
  27. //创建生产者
  28. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  29. //事务消息 初始化
  30. kafkaProducer.initTransactions();
  31. //开始事务
  32. kafkaProducer.beginTransaction();
  33. try {
  34. kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
  35. //提交事务
  36. kafkaProducer.commitTransaction();
  37. } catch (Exception e) {
  38. //终止事务
  39. kafkaProducer.abortTransaction();
  40. } finally {
  41. //关闭资源
  42. kafkaProducer.close();
  43. }
  44. }
  1. root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
  2. Transactions
标签: kafka java linq

本文转载自: https://blog.csdn.net/qq_29752857/article/details/131995916
版权归原作者 好奇的菜鸟 所有, 如有侵权,请联系我们删除。

“使用kafka-clients操作数据(java)”的评论:

还没有评论