0


flink 整合rocketmq

下面代码结构
下面代码路径 :source->rocketmq->common->selector
DefaultTopicSelector.java 类

publicclassDefaultTopicSelector<T>implementsTopicSelector<T>{privatefinalString topicName;privatefinalString tagName;publicDefaultTopicSelector(finalString topicName){this(topicName,"");}publicDefaultTopicSelector(String topicName,String tagName){this.topicName = topicName;this.tagName = tagName;}@OverridepublicStringgetTopic(T tuple){return topicName;}@OverridepublicStringgetTag(T tuple){return tagName;}}

TopicSelector.java

importjava.io.Serializable;publicinterfaceTopicSelector<T>extendsSerializable{StringgetTopic(T tuple);StringgetTag(T tuple);}

下面代码路径:source->rocketmq->common->serialization
AlarmEventSerializationSchema.java

importcom.alibaba.fastjson.JSON;importcom.bsj.flinkRisk.model.AlarmEvent;importcom.bsj.flinkRisk.utils.NumberToByteUtil;importjava.nio.charset.StandardCharsets;publicclassAlarmEventSerializationSchemaimplementsKeyValueSerializationSchema<AlarmEvent>{publicstaticfinalString DEFAULT_KEY_FIELD ="key";publicstaticfinalString DEFAULT_VALUE_FIELD ="value";publicString keyField;publicString valueField;publicAlarmEventSerializationSchema(){this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);}publicAlarmEventSerializationSchema(String keyField,String valueField){this.keyField = keyField;this.valueField = valueField;}@Overridepublicbyte[]serializeKey(AlarmEvent event){returnNumberToByteUtil.long2Bytes(event.getVehicleId());}@Overridepublicbyte[]serializeValue(AlarmEvent event){String data = JSON.toJSONString(event);return data.getBytes(StandardCharsets.UTF_8);}}

KeyValueDeserializationSchema.java

importorg.apache.flink.api.java.typeutils.ResultTypeQueryable;importjava.io.Serializable;publicinterfaceKeyValueDeserializationSchema<T>extendsResultTypeQueryable<T>,Serializable{TdeserializeKeyAndValue(byte[] key,byte[] value);}

KeyValueSerializationSchema.java

importjava.io.Serializable;publicinterfaceKeyValueSerializationSchema<T>extendsSerializable{byte[]serializeKey(T tuple);byte[]serializeValue(T tuple);}

VehiclePosInfoDeserializationSchema.java

importlombok.extern.slf4j.Slf4j;importorg.apache.flink.api.common.typeinfo.TypeInformation;@Slf4jpublicclassVehiclePosInfoDeserializationSchemaimplementsKeyValueDeserializationSchema<VehiclePosInfo>{publicVehiclePosInfoDeserializationSchema(){}@OverridepublicVehiclePosInfodeserializeKeyAndValue(byte[] key,byte[] value){SpeedInfo speedInfo =null;YunDataSerialize yunDataSerialize =newYunDataSerialize();try{
          
            speedInfo =yunDataSerialize.SpeedinfoFromArray(value);// 2表示为1402的位置数据,传过来的不处理。if(speedInfo.getSource()==2){returnnull;}}catch(Exception e){
            log.error("数据解析异常,数据{}",HexStr.toStr(value),e);}returnconvertToVehicleState(speedInfo);}/**
     * 类型转换
     *
     * @param speedInfo
     * @return
     */privateVehiclePosInfoconvertToVehicleState(SpeedInfo speedInfo){if(speedInfo==null){returnnull;}VehiclePosInfo vehicleState =newVehiclePosInfo();
        vehicleState.setVehicleId(speedInfo.getVehicleId());
        vehicleState.setGroupId(speedInfo.getGroupId());
        vehicleState.setPlateColor(speedInfo.getPlateColor());
        vehicleState.setPlate(speedInfo.getPlate());
        vehicleState.setVehicleShape(speedInfo.getVehicleShape());
        vehicleState.setVehicleState(speedInfo.getVehicleState());
        vehicleState.setTerminalNo(speedInfo.getTerminalNo());
        vehicleState.setProtocolType(speedInfo.getProtocolType());
        vehicleState.setTerminalType(speedInfo.getTerminalType());
        vehicleState.setDevTime(speedInfo.getPos().getDevTime());
        vehicleState.setRecvTime(speedInfo.getPos().getRecvTime());
        vehicleState.setLon(speedInfo.getPos().getLon());
        ehicleState.setLat(speedInfo.getPos().getLat());
        vehicleState.setSpeed(speedInfo.getPos().getSpeed());
        vehicleState.setDirect(speedInfo.getPos().getDirect());
        vehicleState.setHigh(speedInfo.getPos().getHigh());
        vehicleState.setMileage(speedInfo.getPos().getMileage());
        vehicleState.setIsAcc(speedInfo.getPos().getIsAcc());
        vehicleState.setExtend(speedInfo.getPos().getExtend());
        vehicleState.setIsPos(speedInfo.getPos().getIsPos());
        vehicleState.setPosSource(speedInfo.getSource());
        vehicleState.setCarhrough(speedInfo.getCarhrough());}}

下面代码路径:source->rocketmq->example 这个只是例子可以不用

RocketMQFlinkExample.java

publicclassRocketMQFlinkExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);Properties consumerProps =newProperties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,"localhost:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP,"c002");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC,"gg");Properties producerProps =newProperties();
        producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,"localhost:9876");int msgDelayLevel =RocketMQConfig.MSG_DELAY_LEVEL00;
        producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel));// TimeDelayLevel is not supported for batchingboolean batchFlag = msgDelayLevel <=0;DataStream<SpeedInfo> stream = env
                .addSource(newRocketMQSource(newVehiclePosInfoDeserializationSchema(), consumerProps)).name("transactions");

                stream.keyBy(SpeedInfo::getVehicleId).process(newKeyedProcessFunction<Long,SpeedInfo,AlarmEvent>(){@OverridepublicvoidprocessElement(SpeedInfo in,Context ctx,Collector<AlarmEvent> out)throwsException{AlarmEvent event=newAlarmEvent();
                        out.collect(event);}}).name("upper-processor").setParallelism(2).addSink(newRocketMQSink(newAlarmEventSerializationSchema("id","province"),newDefaultTopicSelector("zhisheng"), producerProps).withBatchFlushOnCheckpoint(batchFlag)).name("rocketmq-sink").setParallelism(2);

        env.execute("rocketmq-flink-example");}}

SimpleConsumer.java

publicclassSimpleConsumer{publicstaticvoidmain(String[] args){DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("g00003");
        consumer.setNamesrvAddr("localhost:9876");try{
            consumer.subscribe("zhisheng","*");}catch(MQClientException e){
            e.printStackTrace();}
        consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){for(MessageExt msg : msgs){System.out.println(msg.getKeys()+":"+newString(msg.getBody()));}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});try{
            consumer.start();}catch(MQClientException e){
            e.printStackTrace();}}}

SimpleProducer.java

publicclassSimpleProducer{publicstaticvoidmain(String[] args){DefaultMQProducer producer =newDefaultMQProducer("p001");
        producer.setNamesrvAddr("localhost:9876");try{
            producer.start();}catch(MQClientException e){
            e.printStackTrace();}for(int i =0; i <10000; i++){Message msg =newMessage("zhisheng","","id_"+ i,("country_X province_"+ i).getBytes());try{
                producer.send(msg);}catch(Exception e){
                e.printStackTrace();}System.out.println("send "+ i);try{Thread.sleep(10);}catch(InterruptedException e){
                e.printStackTrace();}}}}

下面代码路径:source->rocketmq

RocketMQConfig.java

publicclassRocketMQConfig{// Server ConfigpublicstaticfinalString NAME_SERVER_ADDR ="nameserver.address";// 必须publicstaticfinalString NAME_SERVER_POLL_INTERVAL ="nameserver.poll.interval";publicstaticfinalint DEFAULT_NAME_SERVER_POLL_INTERVAL =30000;// 30 secondspublicstaticfinalString BROKER_HEART_BEAT_INTERVAL ="brokerserver.heartbeat.interval";publicstaticfinalint DEFAULT_BROKER_HEART_BEAT_INTERVAL =30000;// 30 seconds// Producer related configpublicstaticfinalString PRODUCER_GROUP ="producer.group";publicstaticfinalString PRODUCER_RETRY_TIMES ="producer.retry.times";publicstaticfinalint DEFAULT_PRODUCER_RETRY_TIMES =3;publicstaticfinalString PRODUCER_TIMEOUT ="producer.timeout";publicstaticfinalint DEFAULT_PRODUCER_TIMEOUT =3000;// 3 seconds// Consumer related configpublicstaticfinalString CONSUMER_GROUP ="consumer.group";// 必须publicstaticfinalString CONSUMER_TOPIC ="consumer.topic";// 必须publicstaticfinalString CONSUMER_TAG ="consumer.tag";publicstaticfinalString DEFAULT_CONSUMER_TAG ="*";publicstaticfinalString CONSUMER_OFFSET_RESET_TO ="consumer.offset.reset.to";// offset 重制到publicstaticfinalString CONSUMER_OFFSET_LATEST ="latest";publicstaticfinalString CONSUMER_OFFSET_EARLIEST ="earliest";publicstaticfinalString CONSUMER_OFFSET_TIMESTAMP ="timestamp";publicstaticfinalString CONSUMER_OFFSET_FROM_TIMESTAMP ="consumer.offset.from.timestamp";// offset 重制到某个时间点publicstaticfinalString CONSUMER_OFFSET_PERSIST_INTERVAL ="consumer.offset.persist.interval";publicstaticfinalint DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL =5000;// 5 secondspublicstaticfinalString CONSUMER_PULL_POOL_SIZE ="consumer.pull.thread.pool.size";publicstaticfinalint DEFAULT_CONSUMER_PULL_POOL_SIZE =20;publicstaticfinalString CONSUMER_BATCH_SIZE ="consumer.batch.size";publicstaticfinalint DEFAULT_CONSUMER_BATCH_SIZE =500;publicstaticfinalString CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND ="consumer.delay.when.message.not.found";publicstaticfinalint DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND =10;publicstaticfinalString MSG_DELAY_LEVEL ="msg.delay.level";publicstaticfinalint MSG_DELAY_LEVEL00 =0;// no delaypublicstaticfinalint MSG_DELAY_LEVEL01 =1;// 1spublicstaticfinalint MSG_DELAY_LEVEL02 =2;// 5spublicstaticfinalint MSG_DELAY_LEVEL03 =3;// 10spublicstaticfinalint MSG_DELAY_LEVEL04 =4;// 30spublicstaticfinalint MSG_DELAY_LEVEL05 =5;// 1minpublicstaticfinalint MSG_DELAY_LEVEL06 =6;// 2minpublicstaticfinalint MSG_DELAY_LEVEL07 =7;// 3minpublicstaticfinalint MSG_DELAY_LEVEL08 =8;// 4minpublicstaticfinalint MSG_DELAY_LEVEL09 =9;// 5minpublicstaticfinalint MSG_DELAY_LEVEL10 =10;// 6minpublicstaticfinalint MSG_DELAY_LEVEL11 =11;// 7minpublicstaticfinalint MSG_DELAY_LEVEL12 =12;// 8minpublicstaticfinalint MSG_DELAY_LEVEL13 =13;// 9minpublicstaticfinalint MSG_DELAY_LEVEL14 =14;// 10minpublicstaticfinalint MSG_DELAY_LEVEL15 =15;// 20minpublicstaticfinalint MSG_DELAY_LEVEL16 =16;// 30minpublicstaticfinalint MSG_DELAY_LEVEL17 =17;// 1hpublicstaticfinalint MSG_DELAY_LEVEL18 =18;// 2h/**
     * 构建 producer 配置
     *
     * @param props    Properties
     * @param producer DefaultMQProducer
     */publicstaticvoidbuildProducerConfigs(Properties props,DefaultMQProducer producer){buildCommonConfigs(props, producer);String group = props.getProperty(PRODUCER_GROUP);if(StringUtils.isEmpty(group)){
            group = UUID.randomUUID().toString();}
         producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));

        producer.setRetryTimesWhenSendFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
        producer.setSendMsgTimeout(getInteger(props,
                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));}/**
     * 构建 Consumer 配置
     *
     * @param props    Properties
     * @param consumer DefaultMQPushConsumer
     */publicstaticvoidbuildConsumerConfigs(Properties props,DefaultMQPullConsumer consumer){buildCommonConfigs(props, consumer);//消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);//        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setPersistConsumerOffsetInterval(getInteger(props,
                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));}/**
     * 构建通用的配置
     *
     * @param props  Properties
     * @param client ClientConfig
     */privatestaticvoidbuildCommonConfigs(Properties props,ClientConfig client){String nameServers = props.getProperty(NAME_SERVER_ADDR);Validate.notEmpty(nameServers);
        client.setNamesrvAddr(nameServers);

        client.setPollNameServerInterval(getInteger(props,
                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
        client.setHeartbeatBrokerInterval(getInteger(props,
                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));}}

RocketMQSink.java

publicclassRocketMQSink<IN>extendsRichSinkFunction<IN>implementsCheckpointedFunction{privatestaticfinallong serialVersionUID =1L;privatestaticfinalLogger LOG =LoggerFactory.getLogger(RocketMQSink.class);privatetransientDefaultMQProducer producer;privateboolean async;// false by defaultprivateProperties props;privateTopicSelector<IN> topicSelector;privateKeyValueSerializationSchema<IN> serializationSchema;privateboolean batchFlushOnCheckpoint;// false by defaultprivateint batchSize =1000;privateList<Message> batchList;privateint messageDeliveryDelayLevel =RocketMQConfig.MSG_DELAY_LEVEL00;publicRocketMQSink(KeyValueSerializationSchema<IN> schema,TopicSelector<IN> topicSelector,Properties props){this.serializationSchema = schema;this.topicSelector = topicSelector;this.props = props;if(this.props !=null){this.messageDeliveryDelayLevel =RocketMQUtils.getInteger(this.props,RocketMQConfig.MSG_DELAY_LEVEL,RocketMQConfig.MSG_DELAY_LEVEL00);if(this.messageDeliveryDelayLevel <RocketMQConfig.MSG_DELAY_LEVEL00){this.messageDeliveryDelayLevel =RocketMQConfig.MSG_DELAY_LEVEL00;}elseif(this.messageDeliveryDelayLevel >RocketMQConfig.MSG_DELAY_LEVEL18){this.messageDeliveryDelayLevel =RocketMQConfig.MSG_DELAY_LEVEL18;}}}@Overridepublicvoidopen(Configuration parameters)throwsException{Validate.notEmpty(props,"Producer properties can not be empty");Validate.notNull(topicSelector,"TopicSelector can not be null");Validate.notNull(serializationSchema,"KeyValueSerializationSchema can not be null");

        producer =newDefaultMQProducer();
        producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));RocketMQConfig.buildProducerConfigs(props, producer);
        batchList =newLinkedList<>();if(batchFlushOnCheckpoint &&!((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled()){
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            batchFlushOnCheckpoint =false;}try{
            producer.start();}catch(MQClientException e){thrownewRuntimeException(e);}}Overridepublicvoidinvoke(IN input,Context context)throwsException{Message msg =prepareMessage(input);if(batchFlushOnCheckpoint){
            batchList.add(msg);if(batchList.size()>= batchSize){flushSync();}return;}if(async){try{
                producer.send(msg,newMessageQueueSelector(){// 每个节点轮着来@OverridepublicMessageQueueselect(List<MessageQueue> mqs,Message msg,Object arg){// arg就是send的第3个参数long vehicleId =(Long) arg;return mqs.get((int)(vehicleId % mqs.size()));}},msg.getKeys(),newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){
                        LOG.debug("Async send message success! result: {}", sendResult);}@OverridepublicvoidonException(Throwable throwable){if(throwable !=null){
                            LOG.error("Async send message failure!", throwable);}}});}catch(Exception e){
                LOG.error("Async send message failure!", e);}}else{try{SendResult result = producer.send(msg);
                LOG.debug("Sync send message result: {}", result);}catch(Exception e){
                LOG.error("Sync send message failure!", e);}}}/**
     * 解析消息
     *
     * @param input
     * @return
     */privateMessageprepareMessage(IN input){String topic = topicSelector.getTopic(input);String tag = topicSelector.getTag(input)!=null? topicSelector.getTag(input):"";byte[] k = serializationSchema.serializeKey(input);String key = k !=null?String.valueOf(NumberToByteUtil.bytes2Long(k)):"";byte[] value = serializationSchema.serializeValue(input);Validate.notNull(topic,"the message topic is null");Validate.notNull(value,"the message body is null");Message msg =newMessage(topic, tag, key, value);if(this.messageDeliveryDelayLevel >RocketMQConfig.MSG_DELAY_LEVEL00){
            msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);}return msg;}publicRocketMQSink<IN>withAsync(boolean async){this.async = async;returnthis;}publicRocketMQSink<IN>withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint){this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;returnthis;}publicRocketMQSink<IN>withBatchSize(int batchSize){this.batchSize = batchSize;returnthis;}@Overridepublicvoidclose()throwsException{if(producer !=null){flushSync();
            producer.shutdown();}}privatevoidflushSync()throwsException{if(batchFlushOnCheckpoint){synchronized(batchList){if(batchList.size()>0){
                    producer.send(batchList);
                    batchList.clear();}}}}@OverridepublicvoidsnapshotState(FunctionSnapshotContext context)throwsException{flushSync();}@OverridepublicvoidinitializeState(FunctionInitializationContext context)throwsException{// Nothing to do}}

RocketMQSource.java

publicclassRocketMQSource<OUT>extendsRichParallelSourceFunction<OUT>implementsCheckpointedFunction,ResultTypeQueryable<OUT>{privatestaticfinallong serialVersionUID =1L;privatestaticfinalLogger LOG =LoggerFactory.getLogger(RocketMQSource.class);privatetransientMQPullConsumerScheduleService pullConsumerScheduleService;privateDefaultMQPullConsumer consumer;privateKeyValueDeserializationSchema<OUT> schema;privateRunningChecker runningChecker;privatetransientListState<Tuple2<MessageQueue,Long>> unionOffsetStates;privateMap<MessageQueue,Long> offsetTable;privateMap<MessageQueue,Long> restoredOffsets;privateProperties props;privateString topic;privateString group;privatestaticfinalString OFFSETS_STATE_NAME ="topic-partition-offset-states";privatetransientvolatileboolean restored;publicRocketMQSource(KeyValueDeserializationSchema<OUT> schema,Properties props){this.schema = schema;this.props = props;}@Overridepublicvoidopen(Configuration parameters)throwsException{
        LOG.info("启动定位数据消费Mq");Validate.notEmpty(props,"Consumer properties can not be empty");Validate.notNull(schema,"KeyValueDeserializationSchema can not be null");this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);Validate.notEmpty(topic,"Consumer topic can not be empty");Validate.notEmpty(group,"Consumer group can not be empty");if(offsetTable ==null){
            offsetTable =newConcurrentHashMap<>();}if(restoredOffsets ==null){
            restoredOffsets =newConcurrentHashMap<>();}

        runningChecker =newRunningChecker();

        pullConsumerScheduleService =newMQPullConsumerScheduleService(group);
        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));RocketMQConfig.buildConsumerConfigs(props, consumer);}@Overridepublicvoidrun(SourceContext<OUT> context)throwsException{
        LOG.info("开始发送数据");// The lock that guarantees that record emission and state updates are atomic,// from the view of taking a checkpoint.finalObject lock = context.getCheckpointLock();int delayWhenMessageNotFound =getInteger(props,RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG,RocketMQConfig.DEFAULT_CONSUMER_TAG);int pullPoolSize =getInteger(props,RocketMQConfig.CONSUMER_PULL_POOL_SIZE,RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);int pullBatchSize =getInteger(props,RocketMQConfig.CONSUMER_BATCH_SIZE,RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
        pullConsumerScheduleService.registerPullTaskCallback(topic,newPullTaskCallback(){@OverridepublicvoiddoPullTask(MessageQueue mq,PullTaskContext pullTaskContext){try{long offset =getMessageQueueOffset(mq);if(offset <0){return;}PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);boolean found =false;switch(pullResult.getPullStatus()){case FOUND:List<MessageExt> messages = pullResult.getMsgFoundList();for(MessageExt msg : messages){byte[] key = msg.getKeys()!=null? msg.getKeys().getBytes(StandardCharsets.UTF_8):null;byte[] value = msg.getBody();OUT data = schema.deserializeKeyAndValue(key, value);// output and state update are atomicsynchronized(lock){if(data!=null){
                                        context.collectWithTimestamp(data, msg.getBornTimestamp());}}}
                            found =true;break;case NO_MATCHED_MSG:
                            LOG.info("No matched message after offset {} for queue {}", offset, mq);break;case NO_NEW_MSG:break;case OFFSET_ILLEGAL:
                            LOG.info("Offset {} is illegal for queue {}", offset, mq);break;default:break;}synchronized(lock){putMessageQueueOffset(mq, pullResult.getNextBeginOffset());}if(found){
                        pullTaskContext.setPullNextDelayTimeMillis(0);// no delay when messages were found}else{
                        pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);}}catch(Exception e){thrownewRuntimeException(e);}}});try{
            pullConsumerScheduleService.start();}catch(MQClientException e){thrownewRuntimeException(e);}

        runningChecker.setRunning(true);awaitTermination();}privatevoidawaitTermination()throwsInterruptedException{while(runningChecker.isRunning()){Thread.sleep(50);}}privatelonggetMessageQueueOffset(MessageQueue mq)throwsMQClientException{Long offset = offsetTable.get(mq);// restoredOffsets(unionOffsetStates) is the restored global union state;// should only snapshot mqs that actually belong to usif(restored && offset ==null){
            offset = restoredOffsets.get(mq);}if(offset ==null){
            offset = consumer.fetchConsumeOffset(mq,false);if(offset <0){String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);switch(initialOffset){case CONSUMER_OFFSET_EARLIEST:
                        offset = consumer.minOffset(mq);break;case CONSUMER_OFFSET_LATEST:
                        offset = consumer.maxOffset(mq);break;case CONSUMER_OFFSET_TIMESTAMP:
                        offset = consumer.searchOffset(mq,getLong(props,RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,System.currentTimeMillis()));break;default:thrownewIllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");}}}
        offsetTable.put(mq, offset);return offsetTable.get(mq);}privatevoidputMessageQueueOffset(MessageQueue mq,long offset)throwsMQClientException{
        offsetTable.put(mq, offset);
        consumer.updateConsumeOffset(mq, offset);}@Overridepublicvoidcancel(){
        LOG.info("cancel ...");
        runningChecker.setRunning(false);if(pullConsumerScheduleService !=null){
            pullConsumerScheduleService.shutdown();}

        offsetTable.clear();
        restoredOffsets.clear();}@Overridepublicvoidclose()throwsException{
        LOG.info("close ...");try{cancel();}finally{super.close();}}@OverridepublicvoidsnapshotState(FunctionSnapshotContext context)throwsException{if(!runningChecker.isRunning()){
            LOG.info("snapshotState() called on closed source; returning null.");return;}if(LOG.isDebugEnabled()){
            LOG.info("Snapshotting state {} ...", context.getCheckpointId());}

        unionOffsetStates.clear();if(LOG.isDebugEnabled()){
            LOG.info("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                    offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());}}// remove the unassigned queues in order to avoid read the wrong offset when the source restartSet<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
        offsetTable.entrySet().removeIf(item ->!assignedQueues.contains(item.getKey()));for(Map.Entry<MessageQueue,Long> entry : offsetTable.entrySet()){
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));}@OverridepublicvoidinitializeState(FunctionInitializationContext context)throwsException{// called every time the user-defined function is initialized,// be that when the function is first initialized or be that// when the function is actually recovering from an earlier checkpoint.// Given this, initializeState() is not only the place where different types of state are initialized,// but also where state recovery logic is included.
        LOG.info("initialize State ...");this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(newListStateDescriptor<>(
                OFFSETS_STATE_NAME,TypeInformation.of(newTypeHint<Tuple2<MessageQueue,Long>>(){})));this.restored = context.isRestored();if(restored){if(restoredOffsets ==null){
                restoredOffsets =newConcurrentHashMap<>();}for(Tuple2<MessageQueue,Long> mqOffsets : unionOffsetStates.get()){if(!restoredOffsets.containsKey(mqOffsets.f0)|| restoredOffsets.get(mqOffsets.f0)< mqOffsets.f1){
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);}}
            LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);}else{
            LOG.info("No restore state for the consumer.");}}@OverridepublicTypeInformation<OUT>getProducedType(){return schema.getProducedType();}}

RocketMQUtils.java

publicfinalclassRocketMQUtils{publicstaticintgetInteger(Properties props,String key,int defaultValue){returnInteger.parseInt(props.getProperty(key,String.valueOf(defaultValue)));}publicstaticlonggetLong(Properties props,String key,long defaultValue){returnLong.parseLong(props.getProperty(key,String.valueOf(defaultValue)));}publicstaticbooleangetBoolean(Properties props,String key,boolean defaultValue){returnBoolean.parseBoolean(props.getProperty(key,String.valueOf(defaultValue)));}}

RunningChecker.java

publicclassRunningCheckerimplementsSerializable{privatevolatileboolean isRunning =false;publicbooleanisRunning(){return isRunning;}publicvoidsetRunning(boolean running){
        isRunning = running;}}

以下代码路径:source
RocketMQPosInfoSource.java

/**
 * rocketmq 定位数据流源
 */publicclassRocketMQPosInfoSourceextendsRocketMQSource<VehiclePosInfo>implementsResultTypeQueryable<VehiclePosInfo>{publicRocketMQPosInfoSource(KeyValueDeserializationSchema<VehiclePosInfo> schema,Properties props){super(schema, props);}}

本文转载自: https://blog.csdn.net/qq_39291431/article/details/127124696
版权归原作者 qq_39291431 所有, 如有侵权,请联系我们删除。

“flink 整合rocketmq”的评论:

还没有评论