Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。
一、自定义FlinkRocketMQConsumer
参考FlinkKafkaConsumer:
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{}
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {}
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> {}
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceFunction.SourceContext<T> var1) throws Exception;
void cancel();
@Public
public interface SourceContext<T> {
void collect(T var1);
@PublicEvolving
void collectWithTimestamp(T var1, long var2);
@PublicEvolving
void emitWatermark(Watermark var1);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
可以看到,自定义的Source,只需要实现SourceFunction。
创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法
public class FlinkRocketMQConsumer implements SourceFunction<String> {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
}
@Override
public void cancel() {
}
}
需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
<scope>provided</scope>
</dependency>
对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:
public class FlinkRocketMQConsumer implements SourceFunction<String> {
private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");
}
这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。
另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化
public class FlinkRocketMQConsumer implements SourceFunction<String> {
private String nameSrvAddr;
private String topic;
public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
}
...
}
重写run方法和cancel方法
@Override
public void run(SourceContext<String> ctx) throws Exception {
consumer.setNamesrvAddr(nameSrvAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
msgs.forEach(msg -> {
ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
// 所以,不能让程序走完
while (true) {
Thread.sleep(10);
}
}
@Override
public void cancel() {
consumer.shutdown();
}
完整代码如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import java.nio.charset.Charset;
/**
* @author Johnson
* @version 1.0
* @description
* @create 2023-03-20 10:02
*/
public class FlinkRocketMQConsumer implements SourceFunction<String> {
private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");
private String nameSrvAddr;
private String topic;
public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {
this.nameSrvAddr = nameSrvAddr;
this.topic = topic;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
consumer.setNamesrvAddr(nameSrvAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
msgs.forEach(msg -> {
ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭
// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭
// 所以,不能让程序走完
while (true) {
Thread.sleep(10);
}
}
@Override
public void cancel() {
consumer.shutdown();
}
}
二、方法调用
package rocketmq;
import com.source.FlinkRocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author Johnson
* @version 1.0
* @description
* @create 2023-03-21 10:30
*/
public class FlinkRocketMQConsumerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));
rmqDS .print("**********");
env.execute("FlinkRocketMQConsumerDemo");
}
}
到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。
三、隐患
在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。
关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。
版权归原作者 Johnson8702 所有, 如有侵权,请联系我们删除。