概述
首先我们先来看一下自定义数据源,Flink系统提供的一些功能
我们可以从下面这个图看出来数据源的source和sink类的集成关系
当我们要实现自定义数据源的时候,我们需要先实现DynamicTableSourceFactory, DynamicTableSinkFactory这两个工厂类,在工厂类里面去实现参数定义和数据源的创建,然后再数据源DynamicTableSource和DynamicTableSink里面去初始化数据源的一些信息,最终在source类型的数据源的ScanRuntimeProvider或者LookupTableSource或者sink类型的数据源的SinkRuntimeProvider实现类里面去实现具体功能。
具体实现
第一步 实现工厂类
Dynamic Table Factories
动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。
org.apache.flink.table.factories.DynamicTableSourceFactory
可以实现构造一个
DynamicTableSource
.
org.apache.flink.table.factories.DynamicTableSinkFactory
可以实现构造一个
DynamicTableSink
.
默认情况下,使用
connector
选项的值作为工厂标识符和 Java 的服务提供者接口来发现工厂。
在 JAR 文件中,可以将对新实现的引用添加到服务文件中:
**需要在resource目录下新建
META-INF/services/目录,并且新建org.apache.flink.table.factories.Factory文件
**
**
META-INF/services/org.apache.flink.table.factories.Factory
**
然后在文件里面添加实现类的路径:com.flink.sql.connector.http.table.HttpDynamicTableFactory,只需要添加这个类路径就行,其他的不需要
如果没有这个文件个文件里面的内容,flink系统识别不到我们自定义的数据源信息
类文件的内容如下:HttpDynamicTableFactory.class
package com.flink.sql.connector.http.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
// 我这里是同时实现了source和sink,也可以单独去实现,写在一起可以减少很多重复代码
public class HttpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
/**
首先定义一些数据源的参数信息,就是连接器的所有参数都需要先定义,这样才能在SQL里面去使用
*/
public static final String IDENTIFIER = "http";
public static final ConfigOption<String> URL = ConfigOptions.key("url")
.stringType().noDefaultValue().withDescription("the jdbc database url.");
public static final ConfigOption<String> HEADERS = ConfigOptions.key("headers")
.stringType().noDefaultValue().withDescription("the http header.");
private static final ConfigOption<String> BODY = ConfigOptions.key("body")
.stringType().noDefaultValue().withDescription("the http body params.");
private static final ConfigOption<String> TYPE = ConfigOptions.key("type")
.stringType().noDefaultValue().withDescription("the http type.");
private static final ConfigOption<String> FORMAT = ConfigOptions.key("format")
.stringType().noDefaultValue().withDescription("the http type.");
public HttpDynamicTableFactory() {
}
// 构造source类型的数据源对象
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();
helper.validate();
this.validateConfigOptions(config);
HttpSourceInfo httpSourceInfo = this.getHttpSource(config);
// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
return new HttpDynamicTableSource(httpSourceInfo, decodingFormat, producedDataType);
}
// 构造sink类型的数据源对象
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();
helper.validate();
this.validateConfigOptions(config);
HttpSourceInfo httpSourceInfo = this.getHttpSource(config);
// discover a suitable encoding format
final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
TableSchema tableSchema = context.getCatalogTable().getSchema();
return new HttpDynamicTableSink(httpSourceInfo, encodingFormat, producedDataType, tableSchema);
}
// 获取自定义的HTTP连接器的参数对象,主要用来存HTTP链接的一些参数,后面用来构造HTTP请求使用
private HttpSourceInfo getHttpSource(ReadableConfig readableConfig) {
String url = readableConfig.get(URL);
String headers = readableConfig.get(HEADERS);
String body = readableConfig.get(BODY);
String type = readableConfig.get(TYPE);
return new HttpSourceInfo(url,type, headers, body);
}
// 返回数据源的type字符串,标识这是一个什么数据源
public String factoryIdentifier() {
return IDENTIFIER;
}
// 必填参数需要在这个方法里面去添加
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet();
requiredOptions.add(URL);
requiredOptions.add(TYPE);
return requiredOptions;
}
// 非必填参数需要在这个方法里面去添加
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> optionalOptions = new HashSet();
optionalOptions.add(HEADERS);
optionalOptions.add(BODY);
optionalOptions.add(FORMAT);
return optionalOptions;
}
// 参数校验,根据实际情况去实现需要校验哪些参数,比如有些参数有格式校验可以在这里实现,没有可以不实现
private void validateConfigOptions(ReadableConfig config) {
String url = config.get(URL);
Optional<String> urlOp = Optional.of(url);
Preconditions.checkState(urlOp.isPresent(), "Cannot handle such http url: " + url);
String type = config.get(TYPE);
if ("POST".equalsIgnoreCase(type)) {
String body = config.get(BODY);
Optional<String> bodyOp = Optional.of(body);
Preconditions.checkState(bodyOp.isPresent(), "Cannot handle such http post body: " + bodyOp);
}
}
}
第二步:实现数据源类的具体方法
Dynamic Table Source
根据定义,动态表可以随时间变化。
在读取动态表时,内容可以被认为是:
- 一个更改日志(有限或无限),所有更改都会持续使用,直到更改日志用完。这由
ScanTableSource
接口表示。 - 一个不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值。这由
LookupTableSource
接口表示。
一个类可以同时实现这两个接口。规划器根据指定的查询决定它们的使用。
**扫描表源
ScanTableSource
**
ScanTableSource
在运行时扫描来自外部存储系统的所有行。
扫描的行不必只包含插入,还可以包含更新和删除。因此,表源可用于读取(有限或无限)变更日志。返回的更改日志模式指示计划程序在运行时可以预期的一组更改。
对于常规的批处理场景,源可以发出有限的仅插入行流。
对于常规流式处理方案,源可以发出无限制的仅插入行流。
对于变更数据捕获 (CDC) 方案,源可以发出带有插入、更新和删除行的有界或无界流。
**查找表源
LookupTableSource
**
LookupTableSource
在运行时通过一个或多个键查找外部存储系统的行。
与
ScanTableSource
相比,源不必读取整个表,并且可以在必要时从(可能不断变化的)外部表中懒惰地获取单个值。
与
ScanTableSource
相比,
LookupTableSource
当前仅支持发出仅插入更改。
我们的例子是HTTP类型的数据源,所以采用全表扫描获取的类型去实现。
代码实现类:HttpDynamicTableSource.class
package com.flink.sql.connector.http.table;
import com.flink.sql.connector.http.HttpSourceFunction;
import com.flink.sql.connector.http.HttpSourceInfo;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Objects;
public class HttpDynamicTableSource implements ScanTableSource {
private HttpSourceInfo httpSourceInfo;
private DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private DataType producedDataType;
public HttpDynamicTableSource() {
}
public HttpDynamicTableSource(HttpSourceInfo httpSourceInfo,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType producedDataType) {
this.httpSourceInfo = httpSourceInfo;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
return decodingFormat.getChangelogMode();
}
// 最主要的方法,就是实现这个接口方法,在方法里面返回具体的实现逻辑类的构造
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
scanContext,
producedDataType);
HttpSourceFunction.Builder builder = HttpSourceFunction.builder()
.setUrl(httpSourceInfo.getUrl()).setBody(httpSourceInfo.getBody())
.setType(httpSourceInfo.getType()).setDeserializer(deserializer);
return SourceFunctionProvider.of(builder.build(), true);
}
@Override
public DynamicTableSource copy() {
return new HttpDynamicTableSource(this.httpSourceInfo, this.decodingFormat, this.producedDataType);
}
@Override
public String asSummaryString() {
return "HTTP Table Source";
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof HttpDynamicTableSource)) {
return false;
} else {
HttpDynamicTableSource that = (HttpDynamicTableSource) o;
return Objects.equals(this.httpSourceInfo, that.httpSourceInfo)
&& Objects.equals(this.decodingFormat, that.decodingFormat)
&& Objects.equals(this.producedDataType, that.producedDataType);
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.httpSourceInfo, this.decodingFormat, this.producedDataType});
}
}
代码实现类:HttpSourceFunction.class
package com.flink.sql.connector.http;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class HttpSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private String url;
private String body;
private String type;
private String result;
private static final String HTTP_POST = "POST";
private static final String HTTP_GET = "GET";
private transient boolean hasNext;
private DeserializationSchema<RowData> deserializer;
public HttpSourceFunction() {
}
public HttpSourceFunction(String url, String body, String type, DeserializationSchema<RowData> deserializer) {
this.url = url;
this.body = body;
this.type = type;
this.deserializer = deserializer;
}
@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() -> getRuntimeContext().getMetricGroup());
}
@Override
public void close() throws IOException {
}
@Override
public TypeInformation<RowData> getProducedType() {
return deserializer.getProducedType();
}
public static HttpSourceFunction.Builder builder() {
return new HttpSourceFunction.Builder();
}
// 重点关注地方,在run方法里面去实现具体调用HTTP请求获取数据的逻辑。我这里是实现了一个工具类去调用HTTP的各种POST或者GET请求,大家可以自己去实现,功能就是执行请求获取数据,具体代码就不贴了。
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
// open and consume from socket
try {
String response = "{}";
if (StringUtils.isNotBlank(type) && HTTP_GET.equals(type.toUpperCase())) {
response = DtHttpClient.get(url);
} else {
response = DtHttpClient.post(url, body);
}
JSONObject jsonObject = JSONObject.parseObject(response);
this.result = jsonObject.getString("response");
sourceContext.collect(deserializer.deserialize(result.getBytes(StandardCharsets.UTF_8)));
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
}
@Override
public void cancel() {
}
public static class Builder {
private String url;
private String body;
private String type;
private DeserializationSchema<RowData> deserializer;
public Builder () {
}
public Builder setUrl(String url) {
this.url = url;
return this;
}
public Builder setBody(String body) {
this.body = body;
return this;
}
public Builder setType(String type) {
this.type = type;
return this;
}
public Builder setDeserializer(DeserializationSchema<RowData> deserializer) {
this.deserializer = deserializer;
return this;
}
public HttpSourceFunction build() {
if (StringUtils.isBlank(url) || StringUtils.isBlank(body) || StringUtils.isBlank(type)) {
throw new IllegalArgumentException("params has null");
}
return new HttpSourceFunction(this.url, this.body, this.type, this.deserializer);
}
}
}
Dynamic Table Sink
根据定义,动态表可以随时间变化。
在编写动态表时,可以始终将内容视为更改日志(有限或无限),其中所有更改都被连续写出,直到更改日志用完为止。返回的更改日志模式 指示接收器在运行时接受的更改集。
对于常规批处理场景,接收器可以仅接受仅插入行并写出有界流。
对于常规的流式处理方案,接收器只能接受仅插入行,并且可以写出无界流。
对于变更数据捕获 (CDC) 场景,接收器可以使用插入、更新和删除行写出有界或无界流。
代码实现类:HttpDynamicTableSink.class
package com.flink.sql.connector.http.table;
import com.flink.sql.connector.http.HttpSinkFunction;
import com.flink.sql.connector.http.HttpSourceInfo;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Objects;
public class HttpDynamicTableSink implements DynamicTableSink {
private HttpSourceInfo httpSourceInfo;
private EncodingFormat<SerializationSchema<RowData>> encodingFormat;
private DataType producedDataType;
private TableSchema tableSchema;
public HttpDynamicTableSink() {
}
public HttpDynamicTableSink(HttpSourceInfo httpSourceInfo,
EncodingFormat<SerializationSchema<RowData>> encodingFormat,
DataType producedDataType,
TableSchema tableSchema) {
this.httpSourceInfo = httpSourceInfo;
this.encodingFormat = encodingFormat;
this.producedDataType = producedDataType;
this.tableSchema = tableSchema;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return changelogMode;
}
// 重点关注地方,主要通过这个方法去构造输出数据源的实现类
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final SerializationSchema<RowData> deserializer = encodingFormat.createRuntimeEncoder(
context,
producedDataType);
HttpSinkFunction httpSinkFunction =
HttpSinkFunction.builder().setUrl(httpSourceInfo.getUrl())
.setBody(httpSourceInfo.getBody()).setDeserializer(deserializer)
.setType(httpSourceInfo.getType()).setFields(tableSchema.getFieldNames())
.setDataTypes(tableSchema.getFieldDataTypes()).build();
return SinkFunctionProvider.of(httpSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new HttpDynamicTableSink(this.httpSourceInfo, this.encodingFormat,
this.producedDataType, this.tableSchema);
}
@Override
public String asSummaryString() {
return "HTTP Table Sink";
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof HttpDynamicTableSink)) {
return false;
} else {
HttpDynamicTableSink that = (HttpDynamicTableSink) o;
return Objects.equals(this.httpSourceInfo, that.httpSourceInfo)
&& Objects.equals(this.encodingFormat, that.encodingFormat)
&& Objects.equals(this.producedDataType, that.producedDataType);
}
}
public int hashCode() {
return Objects.hash(new Object[]{this.httpSourceInfo, this.encodingFormat,
this.producedDataType, this.tableSchema});
}
}
代码实现类:HttpSinkFunction.class
package com.flink.sql.connector.http;
import com.flink.sql.util.PluginUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import java.io.IOException;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.HashMap;
public class HttpSinkFunction extends RichSinkFunction<RowData> {
private String url;
private String body;
private String type;
private SerializationSchema<RowData> serializer;
private String[] fields;
private DataType[] dataTypes;
private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");
private final SimpleDateFormat dateTimeFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final long serialVersionUID = 1L;
public HttpSinkFunction() {
}
public HttpSinkFunction(String url, String body, String type,
SerializationSchema<RowData> serializer,
String[] fields,
DataType[] dataTypes) {
this.url = url;
this.body = body;
this.type = type;
this.serializer = serializer;
this.fields = fields;
this.dataTypes = dataTypes;
}
// 重点关注,这个invoke方法实现对数据的写出,参数RowData value就是需要输出的数据,这个对象里面具体有多少数据是不确定的,因为默认是流式输出,如果需要考虑性能问题(并且对于实时性没有太高要求),可以自定义实现批量输出,先把这个里面的数据缓存起来,然后当一定时间,或者数据量达到一定阈值的时候再去调研接口输出数据。
@Override
public void invoke(RowData value, Context context) throws Exception {
Object[] objValue = transform(value);
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
map.put(fields[i], objValue[i]);
}
String body = PluginUtil.objectToString(map);
DtHttpClient.post(url, body);
}
@Override
public void open(Configuration parameters) throws Exception {
serializer.open(() -> getRuntimeContext().getMetricGroup());
}
@Override
public void close() throws IOException {
}
public static HttpSinkFunction.Builder builder() {
return new HttpSinkFunction.Builder();
}
public static class Builder {
private String url;
private String body;
private String type;
private SerializationSchema<RowData> serializer;
private String[] fields;
private DataType[] dataTypes;
public Builder () {
}
public Builder setUrl(String url) {
this.url = url;
return this;
}
public Builder setFields(String[] fields) {
this.fields = fields;
return this;
}
public Builder setBody(String body) {
this.body = body;
return this;
}
public Builder setType(String type) {
this.type = type;
return this;
}
public Builder setDataTypes(DataType[] dataTypes) {
this.dataTypes = dataTypes;
return this;
}
public Builder setDeserializer(SerializationSchema<RowData> serializer) {
this.serializer = serializer;
return this;
}
public HttpSinkFunction build() {
if (StringUtils.isBlank(url) || StringUtils.isBlank(body) || StringUtils.isBlank(type)) {
throw new IllegalArgumentException("params has null");
}
return new HttpSinkFunction(this.url, this.body, this.type, this.serializer,
this.fields, this.dataTypes);
}
}
// 这个方法是用来把RowData对象转换为HTTP接口能够识别的JSON对象的,因为默认HTTP接口不能识别这种复杂对象并且转换为我们常用的JSON对象,所以需要我们自己去解析。当然直接把这个对象丢给HTTP也是可以的,那么就需要在接收方去解析RowData对象,但是默认来说,肯定解析为更通用的类型最合适
public Object[] transform(RowData record) {
Object[] values = new Object[dataTypes.length];
int idx = 0;
int var6 = dataTypes.length;
for (int i = 0; i < var6; ++i) {
DataType dataType = dataTypes[i];
values[idx] = this.typeConvertion(dataType.getLogicalType(), record, idx);
++idx;
}
return values;
}
private Object typeConvertion(LogicalType type, RowData record, int pos) {
if (record.isNullAt(pos)) {
return null;
} else {
switch (type.getTypeRoot()) {
case BOOLEAN:
return record.getBoolean(pos) ? 1L : 0L;
case TINYINT:
return record.getByte(pos);
case SMALLINT:
return record.getShort(pos);
case INTEGER:
return record.getInt(pos);
case BIGINT:
return record.getLong(pos);
case FLOAT:
return record.getFloat(pos);
case DOUBLE:
return record.getDouble(pos);
case CHAR:
case VARCHAR:
return record.getString(pos).toString();
case DATE:
return this.dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
case TIMESTAMP_WITHOUT_TIME_ZONE:
int timestampPrecision = ((TimestampType) type).getPrecision();
return this.dateTimeFormatter.format(new Date(record.getTimestamp(pos, timestampPrecision)
.toTimestamp().getTime()));
case DECIMAL:
int decimalPrecision = ((DecimalType) type).getPrecision();
int decimalScale = ((DecimalType) type).getScale();
return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal();
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
}
}
最后在FlinkSQL里面使用:
CREATE TABLE source_table(
id BIGINT,
name STRING,
descs STRING,
valuess DOUBLE
)
WITH(
'connector' ='http',
'url' ='http//10.36.248.26:8080/test_flink?username=test',
'type' ='GET'
);
CREATE TABLE sink_table(
id BIGINT,
name STRING,
descs STRING,
valuess DOUBLE )
WITH(
'connector' ='http',
'url' ='http//10.36.248.26:8080/post_flink',
'type' ='POST');
源码地址:https://download.csdn.net/download/tianhouquan/87424283
版权归原作者 Yaphets丶混世大魔王 所有, 如有侵权,请联系我们删除。