0


Flink Sql Redis Connector

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (
  `order_id` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hgetall',
  'key' = 'orders'
);

select * from orders

#集群模式
create table redis_sink (
site_id STRING,
inverter_id STRING,
start_time STRING,
PRIMARY KEY(site_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'cluster',
'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003',
'password' = '123123',
'command' = 'hgetall',
'key' = 'site_inverter'
)

cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (
  `order_number` BIGINT,
  `price` DECIMAL(32,2),
  `order_time` TIMESTAMP(3),
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);

2. define redis sink table 

CREATE TABLE orders (
  `order_number` STRING,
  `price` STRING,
  `order_time` STRING,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode' = 'single',
  'single.host' = '192.168.10.101',
  'single.port' = '6379',
  'password' = 'xxxxxx',
  'command' = 'hmset',
  'key' = 'orders'
);

3. insert data to redis sink table (cast data type to string)

insert into redis_sink
    select
        cast(order_number as STRING) order_number,
        cast(price as STRING) price,
        cast(order_time as STRING) order_time
    from orders
    

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能

  1. 该connector目前支持多个写入和读取命令:

     读取:   get    hget     hgetall     hscan   lrange    smembers    zrange
    
     写入:   set   hset      hmset      lpush    rpush     sadd
    

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据

4. 连接参数说明

OptionRequiredDefaultTypeDescriptionconnectorrequirednoStringconnector namemoderequirednoStringredis cluster mode (single or cluster)single.hostoptionalnoStringredis single mode machine hostsingle.portoptionalnointredis single mode running portpasswordoptionalnoStringredis database passwordcommandrequirednoStringredis write data or read data commandkeyrequirednoStringredis keyexpireoptionalnoIntset key ttlfieldoptionalnoStringget a value with field when using hget commandcursoroptionalnoIntusing hscan command(e.g:1,2)startoptional0Intread data when using lrange commandendoptional10Intread data when using lrange commandconnection.max.wait-millsoptionalnoIntredis connection parameterconnection.timeout-msoptionalnoIntredis connection parameterconnection.max-totaloptionalnoIntredis connection parameterconnection.max-idleoptionalnoIntredis connection parameterconnection.test-on-borrowoptionalnoBooleanredis connection parameterconnection.test-on-returnoptionalnoBooleanredis connection parameterconnection.test-while-idleoptionalnoBooleanredis connection parameterso.timeout-msoptionalnoIntredis connection parametermax.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {

    
    private ReadableConfig options;
    public RedisSourceSinkFactory(){}

    public RedisSourceSinkFactory(ReadableConfig options){
        this.options = options;
    }

    
    //DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        options = helper.getOptions();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        return new RedisDynamicTableSource(options,columnNames,primaryKey);

    }
    
    /DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        List<Column> columns = schema.getColumns();
        ArrayList<String> columnNames = new ArrayList<>();
        columns.forEach(column -> columnNames.add(column.getName()));
        List<String> primaryKey = schema.getPrimaryKey().get().getColumns();
        ReadableConfig options = helper.getOptions();
        return new RedisDynamicTableSink(options,columnNames,primaryKey);
    }

    @Override
    public String factoryIdentifier() {
        return "redis";
    }

    //sql connector 必填项
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.PASSWORD);
        options.add(RedisOptions.KEY);
        options.add(RedisOptions.MODE);
        return options;
    }
    
    //sql connector 选填项
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        HashSet<ConfigOption<?>> options = new HashSet<>();
        options.add(RedisOptions.SINGLE_HOST);
        options.add(RedisOptions.SINGLE_PORT);
        options.add(RedisOptions.CLUSTER_NODES);
        options.add(RedisOptions.FIELD);
        options.add(RedisOptions.CURSOR);
        options.add(RedisOptions.EXPIRE);
        options.add(RedisOptions.COMMAND);
        options.add(RedisOptions.START);
        options.add(RedisOptions.END);
        options.add(RedisOptions.CONNECTION_MAX_TOTAL);
        options.add(RedisOptions.CONNECTION_MAX_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);
        options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);
        options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);
        options.add(RedisOptions.CONNECTION_TIMEOUT_MS);
        options.add(RedisOptions.TTL_SEC);
        options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);
        options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);
        options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);

        return options;
    }

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;

import java.util.List;

public class RedisDynamicTableSource implements ScanTableSource {

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;

    public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }

    @Override
    public DynamicTableSource copy() {

        return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);
    }

    @Override
    public String asSummaryString() {
        return "redis table source";
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.all();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);
        return SourceFunctionProvider.of(redisSourceFunction,false);
    }
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;

import java.util.*;

public class RedisSourceFunction extends RichSourceFunction<RowData>{

    private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String value;
    private String field;
    private String[] fields;
    private String cursor;
    private Integer start;
    private Integer end;
    private String[] keySplit;
    private static int position = 1;
    private GenericRowData rowData;
    public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);

    }

    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
        String command = options.get(RedisOptions.COMMAND);

        // judge if command is redis set data command and stop method
        List<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,
                RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);
        if(sourceCommand.contains(command.toUpperCase())){ return;}

        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");
        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);

        if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);

            switch (command.toUpperCase()){
                        case RedisCommandOptions.GET:
                            value = jedis.get(key);
                            rowData = new GenericRowData(2);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            rowData.setField(1,BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGET:
                            field = options.get(RedisOptions.FIELD);
                            value = jedis.hget(key, field);
                            rowData = new GenericRowData(3);
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                            }
                            rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                            break;

                        case RedisCommandOptions.HGETALL:
                            if (keyArr.length > 1){
                                for (String str : keyArr) {
                                    rowData = new GenericRowData(columns.size());
                                    keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(str, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }
                                    ctx.collect(rowData);
                                }

                            }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                                rowData = new GenericRowData(columns.size());
                                keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                for (int i = 0; i < primaryKey.size(); i++) {
                                    rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                }

                                for (int i = primaryKey.size(); i < columns.size(); i++) {
                                    String value = jedis.hget(key, columns.get(i));
                                    rowData.setField(i,BinaryStringData.fromString(value));
                                }

                                ctx.collect(rowData);

                            }else{
                                //Fuzzy matching ,gets the data of the entire table
                                String fuzzyKey = new StringBuffer(key).append("*").toString();
                                Set<String> keys = jedis.keys(fuzzyKey);
                                for (String keyStr : keys) {
                                    keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                                    rowData = new GenericRowData(columns.size());
                                    for (int i = 0; i < primaryKey.size(); i++) {
                                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                                    }

                                    for (int i = primaryKey.size(); i < columns.size(); i++) {
                                        String value = jedis.hget(keyStr, columns.get(i));
                                        rowData.setField(i,BinaryStringData.fromString(value));
                                    }

                                    ctx.collect(rowData);

                                }
                            }

                            break;

                        case RedisCommandOptions.HSCAN:
                            cursor = options.get(RedisOptions.CURSOR);
                            ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);
                            List<Map.Entry<String, String>> result = entries.getResult();
                            keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            position = primaryKey.size();
                            for (int i = 0; i < result.size(); i++) {
                                value = result.get(i).getValue();
                                rowData.setField(position,BinaryStringData.fromString(value));
                                position++;
                            }
                            break;

                        case RedisCommandOptions.LRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            List<String> list = jedis.lrange(key, start, end);
                            rowData = new GenericRowData(list.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            list.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});

                            break;

                        case RedisCommandOptions.SMEMBERS:
                            Set<String> smembers = jedis.smembers(key);
                            rowData = new GenericRowData(smembers.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            smembers.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;

                        case RedisCommandOptions.ZRANGE:
                            start = options.get(RedisOptions.START);
                            end = options.get(RedisOptions.END);
                            Set<String> sets = jedis.zrange(key, start, end);
                            rowData = new GenericRowData(sets.size() +1);
                            rowData.setField(0,BinaryStringData.fromString(key));
                            sets.forEach(s -> {
                                rowData.setField(position,BinaryStringData.fromString(s));
                                position++;});
                            break;

                        default:
                            LOG.error("Cannot process such data type: {}", command);
                            break;
                    }

                    if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                        ctx.collect(rowData);
                    }

            }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.GET:
                    value = jedisCluster.get(key);
                    rowData = new GenericRowData(2);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    rowData.setField(1,BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGET:
                    field = options.get(RedisOptions.FIELD);
                    value = jedisCluster.hget(key, field);
                    rowData = new GenericRowData(3);
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));
                    }
                    rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));
                    break;

                case RedisCommandOptions.HGETALL:
                    if (keyArr.length > 1){
                        for (String str : keyArr) {
                            rowData = new GenericRowData(columns.size());
                            keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(str, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }
                            ctx.collect(rowData);
                        }

                    }else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){
                        rowData = new GenericRowData(columns.size());
                        keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        for (int i = 0; i < primaryKey.size(); i++) {
                            rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                        }

                        for (int i = primaryKey.size(); i < columns.size(); i++) {
                            String value = jedisCluster.hget(key, columns.get(i));
                            rowData.setField(i,BinaryStringData.fromString(value));
                        }

                        ctx.collect(rowData);

                    }else{
                        //Fuzzy matching ,gets the data of the entire table
                        String fuzzyKey = new StringBuffer(key).append("*").toString();
                        Set<String> keys = jedisCluster.keys(fuzzyKey);
                        for (String keyStr : keys) {
                            keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            rowData = new GenericRowData(columns.size());
                            for (int i = 0; i < primaryKey.size(); i++) {
                                rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                            }

                            for (int i = primaryKey.size(); i < columns.size(); i++) {
                                String value = jedisCluster.hget(keyStr, columns.get(i));
                                rowData.setField(i,BinaryStringData.fromString(value));
                            }

                            ctx.collect(rowData);

                        }
                    }

                    break;

                case RedisCommandOptions.HSCAN:
                    cursor = options.get(RedisOptions.CURSOR);
                    ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);
                    List<Map.Entry<String, String>> result = entries.getResult();
                    keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    rowData = new GenericRowData(columns.size());
                    for (int i = 0; i < primaryKey.size(); i++) {
                        rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));
                    }

                    position = primaryKey.size();
                    for (int i = 0; i < result.size(); i++) {
                        value = result.get(i).getValue();
                        rowData.setField(position,BinaryStringData.fromString(value));
                        position++;
                    }
                    break;

                case RedisCommandOptions.LRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    List<String> list = jedisCluster.lrange(key, start, end);
                    rowData = new GenericRowData(list.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    list.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});

                    break;

                case RedisCommandOptions.SMEMBERS:
                    Set<String> smembers = jedisCluster.smembers(key);
                    rowData = new GenericRowData(smembers.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    smembers.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;

                case RedisCommandOptions.ZRANGE:
                    start = options.get(RedisOptions.START);
                    end = options.get(RedisOptions.END);
                    Set<String> sets = jedisCluster.zrange(key, start, end);
                    rowData = new GenericRowData(sets.size() +1);
                    rowData.setField(0,BinaryStringData.fromString(key));
                    sets.forEach(s -> {
                        rowData.setField(position,BinaryStringData.fromString(s));
                        position++;});
                    break;

                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }

            if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){
                ctx.collect(rowData);
            }

        }else{
            LOG.error("Unsupport such {} mode",mode);
        }

    }

    @Override
    public void cancel() {

        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;

    public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {
        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.DELETE)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();

    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);
        return SinkFunctionProvider.of(myRedisSinkFunction);

    }

    @Override
    public DynamicTableSink copy() {
        return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);
    }

    @Override
    public String asSummaryString() {
        return "redis table sink";
    }
}
package org.apache.flink.sink;

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;

import java.util.List;

public class RedisSinkFunction extends RichSinkFunction<RowData>{

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);

    private ReadableConfig options;
    private List<String> primaryKey;
    private List<String> columns;
    private String fields;
    private Jedis jedis;
    private JedisCluster jedisCluster;
    private String[] fieldsArr;
    private StringBuffer redisTableKey;
    private String value;

    public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){

        this.options = Preconditions.checkNotNull(options);
        this.columns = Preconditions.checkNotNull(columns);
        this.primaryKey = Preconditions.checkNotNull(primaryKey);
    }

    @Override
    public void invoke(RowData rowData, Context context) throws Exception {

        String password = options.get(RedisOptions.PASSWORD);
        Preconditions.checkNotNull(password,"password is null,please set value for password");
        Integer expire = options.get(RedisOptions.EXPIRE);
        String key = options.get(RedisOptions.KEY);
        Preconditions.checkNotNull(key,"key is null,please set value for key");
        String command = options.get(RedisOptions.COMMAND);
        Preconditions.checkNotNull(command,"command is null,please set value for command");
        String mode = options.get(RedisOptions.MODE);
        Preconditions.checkNotNull(command,"mode is null,please set value for mode");

        Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);
        Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);
        Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);

        Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);
        Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);
        Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);

        if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {

            String host = options.get(RedisOptions.SINGLE_HOST);
            Integer port = options.get(RedisOptions.SINGLE_PORT);
            JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,
                    maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);
            jedis = jedisPool.getResource();
            jedis.auth(password);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedis.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        if (i != primaryKey.size() -1){
                            redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                        }

                    }
                    for (int i = 1; i < columns.size(); i++) {
                        if (!primaryKey.contains(columns.get(i))){
                            value = rowData.getString(i).toString();
                            jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                        }
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedis.lpush(key,value);

                    break;

                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedis.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedis.sadd(key,value);
                    break;

                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }

        }else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){
            String nodes = options.get(RedisOptions.CLUSTER_NODES);
            String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);
            String[] host = new String[hostAndPorts.length];
            int[] port = new int[hostAndPorts.length];

            for (int i = 0; i < hostAndPorts.length; i++) {
                String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                host[i] = splits[0];
                port[i] = Integer.parseInt(splits[1]);
            }
            Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);
            Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);
            Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);

            jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,
                    maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);

            switch (command.toUpperCase()){
                case RedisCommandOptions.SET:
                    value = rowData.getString(0).toString();
                    jedisCluster.set(String.valueOf(key),String.valueOf(value));
                    break;

                case RedisCommandOptions.HSET:

                    String field = columns.get(1);
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    value = rowData.getString(1).toString();
                    jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));

                case RedisCommandOptions.HMSET:
                    //construct redis key:table_name:primary key col name: primary key value
                    redisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);

                    for (int i = 0; i < primaryKey.size(); i++) {
                        if(primaryKey.size() <= 1){
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                            break;
                        }else{
                            redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                            redisTableKey.append(rowData.getString(i).toString());
                        }
                        redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);
                    }

                    for (int i = 1; i < columns.size(); i++) {
                        value = rowData.getString(i).toString();
                        jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));
                    }

                    break;

                case RedisCommandOptions.LPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.lpush(key,value);

                    break;

                case RedisCommandOptions.RPUSH:
                    value = rowData.getString(0).toString();
                    jedisCluster.rpush(key,value);

                    break;

                case RedisCommandOptions.SADD:
                    value = rowData.getString(0).toString();
                    jedisCluster.sadd(key,value);
                    break;

                default:
                    LOG.error("Cannot process such data type: {}", command);
                    break;
            }

        }else{
            LOG.error("Unsupport such {} mode",mode);
        }

    }

    @Override
    public void close() throws Exception {
        if(jedis != null){
            jedis.close();
        }

        if(jedisCluster != null){
            jedisCluster.close();
        }

    }
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

标签: flink sql redis

本文转载自: https://blog.csdn.net/bigdatakenan/article/details/139836576
版权归原作者 源码挖掘机 所有, 如有侵权,请联系我们删除。

“Flink Sql Redis Connector”的评论:

还没有评论