0


Flink RocketMQ Connector实现

Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。

一、自定义FlinkRocketMQConsumer

参考FlinkKafkaConsumer:

  1. public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{}
  2. public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {}
  3. public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {}
  4. public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}
  5. public interface SourceFunction<T> extends Function, Serializable {
  6. void run(SourceFunction.SourceContext<T> var1) throws Exception;
  7. void cancel();
  8. @Public
  9. public interface SourceContext<T> {
  10. void collect(T var1);
  11. @PublicEvolving
  12. void collectWithTimestamp(T var1, long var2);
  13. @PublicEvolving
  14. void emitWatermark(Watermark var1);
  15. @PublicEvolving
  16. void markAsTemporarilyIdle();
  17. Object getCheckpointLock();
  18. void close();
  19. }
  20. }

可以看到,自定义的Source,只需要实现SourceFunction。

创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法

  1. public class FlinkRocketMQConsumer implements SourceFunction<String> {
  2. @Override
  3. public void run(SourceContext<String> sourceContext) throws Exception {
  4. }
  5. @Override
  6. public void cancel() {
  7. }
  8. }

需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.7.0</version>
  5. <scope>provided</scope>
  6. </dependency>

对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:

  1. public class FlinkRocketMQConsumer implements SourceFunction<String> {
  2. private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");
  3. }

这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。

另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化

  1. public class FlinkRocketMQConsumer implements SourceFunction<String> {
  2. private String nameSrvAddr;
  3. private String topic;
  4. public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
  5. this.nameSrvAddr = nameSrvAddr;
  6. this.topic = topic;
  7. }
  8. ...
  9. }

重写run方法和cancel方法

  1. @Override
  2. public void run(SourceContext<String> ctx) throws Exception {
  3. consumer.setNamesrvAddr(nameSrvAddr);
  4. consumer.subscribe(topic, "*");
  5. consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
  6. msgs.forEach(msg -> {
  7. ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
  8. });
  9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  10. });
  11. consumer.start();
  12. // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
  13. // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
  14. // 所以,不能让程序走完
  15. while (true) {
  16. Thread.sleep(10);
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. consumer.shutdown();
  22. }

完整代码如下:

  1. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import java.nio.charset.Charset;
  6. /**
  7. * @author Johnson
  8. * @version 1.0
  9. * @description
  10. * @create 2023-03-20 10:02
  11. */
  12. public class FlinkRocketMQConsumer implements SourceFunction<String> {
  13. private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");
  14. private String nameSrvAddr;
  15. private String topic;
  16. public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
  17. this.nameSrvAddr = nameSrvAddr;
  18. this.topic = topic;
  19. }
  20. @Override
  21. public void run(SourceContext<String> ctx) throws Exception {
  22. consumer.setNamesrvAddr(nameSrvAddr);
  23. consumer.subscribe(topic, "*");
  24. consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
  25. msgs.forEach(msg -> {
  26. ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
  27. });
  28. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  29. });
  30. consumer.start();
  31. // 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
  32. // 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
  33. // 所以,不能让程序走完
  34. while (true) {
  35. Thread.sleep(10);
  36. }
  37. }
  38. @Override
  39. public void cancel() {
  40. consumer.shutdown();
  41. }
  42. }

二、方法调用

  1. package rocketmq;
  2. import com.source.FlinkRocketMQConsumer;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /**
  6. * @author Johnson
  7. * @version 1.0
  8. * @description
  9. * @create 2023-03-21 10:30
  10. */
  11. public class FlinkRocketMQConsumerDemo {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. DataStreamSource<String> rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));
  15. rmqDS .print("**********");
  16. env.execute("FlinkRocketMQConsumerDemo");
  17. }
  18. }

到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。

三、隐患

在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。

关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。

标签: flink rocketmq

本文转载自: https://blog.csdn.net/Johnson8702/article/details/129684391
版权归原作者 Johnson8702 所有, 如有侵权,请联系我们删除。

“Flink RocketMQ Connector实现”的评论:

还没有评论