0


聊聊MaxwellKafkaProducer

本文主要研究一下MaxwellKafkaProducer

MaxwellKafkaProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

  1. public class MaxwellKafkaProducer extends AbstractProducer {
  2. private final ArrayBlockingQueue<RowMap> queue;
  3. private final MaxwellKafkaProducerWorker worker;
  4. public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
  5. super(context);
  6. this.queue = new ArrayBlockingQueue<>(100);
  7. this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);
  8. Thread thread = new Thread(this.worker, "maxwell-kafka-worker");
  9. thread.setDaemon(true);
  10. thread.start();
  11. }
  12. @Override
  13. public void push(RowMap r) throws Exception {
  14. this.queue.put(r);
  15. }
  16. @Override
  17. public StoppableTask getStoppableTask() {
  18. return this.worker;
  19. }
  20. @Override
  21. public KafkaProducerDiagnostic getDiagnostic() {
  22. return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());
  23. }
  24. }
  • MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

  1. class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
  2. static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
  3. private final Producer<String, String> kafka;
  4. private final String topic;
  5. private final String ddlTopic;
  6. private final MaxwellKafkaPartitioner partitioner;
  7. private final MaxwellKafkaPartitioner ddlPartitioner;
  8. private final KeyFormat keyFormat;
  9. private final boolean interpolateTopic;
  10. private final ArrayBlockingQueue<RowMap> queue;
  11. private Thread thread;
  12. private StoppableTaskState taskState;
  13. private String deadLetterTopic;
  14. private final ConcurrentLinkedQueue<Pair<ProducerRecord<String,String>, KafkaCallback>> deadLetterQueue;
  15. public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {
  16. if ( partitionKey.equals("table") ) {
  17. return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");
  18. } else {
  19. return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);
  20. }
  21. }
  22. public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue,
  23. Producer<String,String> producer)
  24. {
  25. super(context);
  26. if ( kafkaTopic == null ) {
  27. this.topic = "maxwell";
  28. } else {
  29. this.topic = kafkaTopic;
  30. }
  31. this.interpolateTopic = this.topic.contains("%{");
  32. this.kafka = producer;
  33. String hash = context.getConfig().kafkaPartitionHash;
  34. String partitionKey = context.getConfig().producerPartitionKey;
  35. String partitionColumns = context.getConfig().producerPartitionColumns;
  36. String partitionFallback = context.getConfig().producerPartitionFallback;
  37. this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);
  38. this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);
  39. this.ddlTopic = context.getConfig().ddlKafkaTopic;
  40. this.deadLetterTopic = context.getConfig().deadLetterTopic;
  41. this.deadLetterQueue = new ConcurrentLinkedQueue<>();
  42. if ( context.getConfig().kafkaKeyFormat.equals("hash") )
  43. keyFormat = KeyFormat.HASH;
  44. else
  45. keyFormat = KeyFormat.ARRAY;
  46. this.queue = queue;
  47. this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
  48. }
  49. public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,
  50. ArrayBlockingQueue<RowMap> queue)
  51. {
  52. this(context, kafkaTopic, queue,
  53. new KafkaProducer<String,String>(kafkaProperties, new StringSerializer(), new StringSerializer()));
  54. }
  55. @Override
  56. public void run() {
  57. this.thread = Thread.currentThread();
  58. while ( true ) {
  59. try {
  60. drainDeadLetterQueue();
  61. RowMap row = queue.take();
  62. if (!taskState.isRunning()) {
  63. taskState.stopped();
  64. return;
  65. }
  66. this.push(row);
  67. } catch ( Exception e ) {
  68. taskState.stopped();
  69. context.terminate(e);
  70. return;
  71. }
  72. }
  73. }
  74. void drainDeadLetterQueue() {
  75. Pair<ProducerRecord<String, String>, KafkaCallback> pair;
  76. while ((pair = deadLetterQueue.poll()) != null) {
  77. sendAsync(pair.getLeft(), pair.getRight());
  78. }
  79. }
  80. //......
  81. }
  • MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

AbstractAsyncProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java

  1. public abstract class AbstractAsyncProducer extends AbstractProducer {
  2. public class CallbackCompleter {
  3. private InflightMessageList inflightMessages;
  4. private final MaxwellContext context;
  5. private final MaxwellConfig config;
  6. private final Position position;
  7. private final boolean isTXCommit;
  8. private final long messageID;
  9. public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
  10. this.inflightMessages = inflightMessages;
  11. this.context = context;
  12. this.config = context.getConfig();
  13. this.position = position;
  14. this.isTXCommit = isTXCommit;
  15. this.messageID = messageID;
  16. }
  17. public void markCompleted() {
  18. inflightMessages.freeSlot(messageID);
  19. if(isTXCommit) {
  20. InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
  21. if (message != null) {
  22. context.setPosition(message.position);
  23. long currentTime = System.currentTimeMillis();
  24. long age = currentTime - message.sendTimeMS;
  25. messagePublishTimer.update(age, TimeUnit.MILLISECONDS);
  26. messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);
  27. if (age > config.metricsAgeSlo) {
  28. messageLatencySloViolationCount.inc();
  29. }
  30. }
  31. }
  32. }
  33. }
  34. private InflightMessageList inflightMessages;
  35. public AbstractAsyncProducer(MaxwellContext context) {
  36. super(context);
  37. this.inflightMessages = new InflightMessageList(context);
  38. Metrics metrics = context.getMetrics();
  39. String gaugeName = metrics.metricName("inflightmessages", "count");
  40. metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size());
  41. }
  42. public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;
  43. @Override
  44. public final void push(RowMap r) throws Exception {
  45. Position position = r.getNextPosition();
  46. // Rows that do not get sent to a target will be automatically marked as complete.
  47. // We will attempt to commit a checkpoint up to the current row.
  48. if(!r.shouldOutput(outputConfig)) {
  49. if ( position != null ) {
  50. inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);
  51. InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);
  52. if (completed != null) {
  53. context.setPosition(completed.position);
  54. }
  55. }
  56. return;
  57. }
  58. // back-pressure from slow producers
  59. long messageID = inflightMessages.waitForSlot();
  60. if(r.isTXCommit()) {
  61. inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
  62. }
  63. CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);
  64. sendAsync(r, cc);
  65. }
  66. }
  • AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync

小结

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

doc

  • MaxwellKafkaProducer

作者:codecraft

原文地址:https://segmentfault.com/a/1190000022534833

  1. 喜欢 0
标签: linq 数据库 c#

本文转载自: https://blog.csdn.net/weixin_33837884/article/details/135960326
版权归原作者 斯大林ak47 所有, 如有侵权,请联系我们删除。

“聊聊MaxwellKafkaProducer”的评论:

还没有评论