项目介绍
PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。
项目地址
快速入门
https://www.yuque.com/powerjob/guidence/nyio9g
源码解析
服务端启动
PowerJobServerApplication
启动。
publicstaticvoidmain(String[] args){pre();AkkaStarter.init();VertXStarter.init();// Start SpringBoot application.try{SpringApplication.run(PowerJobServerApplication.class, args);}catch(Throwable t){
log.error(TIPS);throw t;}}
AkkaStarter.init();
,启动actorSystem
,用FriendRequestHandler
作为消息的处理器。加载配置oms-server.akka.conf
。服务端口号设置为10086。
publicstaticvoidinit(){Stopwatch stopwatch =Stopwatch.createStarted();
log.info("[PowerJob] PowerJob's akka system start to bootstrap...");// 忽略了一个问题,机器是没办法访问外网的,除非架设自己的NTP服务器// TimeUtils.check();// 解析配置文件Config akkaFinalConfig =parseConfig();
actorSystem =ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(FriendRequestHandler.defaultProps(),RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch);}
VertXStarter.init();
主要是新建了Vertx
对象,Vertx.vertx();
,端口号设置成10010
。- 在
Initializer#initHandler
设置了Vertx处理器WorkerRequestHttpHandler
和Akka的消息处理器。
@Component@ConditionalOnExpression("'${execution.env}'!='test'")publicclassInitializer{@PostConstructpublicvoidinitHandler(){// init akkaAkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(),RemoteConstant.SERVER_ACTOR_NAME);// init vert.xVertXStarter.vertx.deployVerticle(newWorkerRequestHttpHandler());}}
WorkerRequestHttpHandler
是创建了一个HttpServer
,并且设置路由。
@Slf4jpublicclassWorkerRequestHttpHandlerextendsAbstractVerticle{@Overridepublicvoidstart()throwsException{Properties properties =PropertyUtils.getProperties();int port =Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT,String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT)));HttpServerOptions options =newHttpServerOptions();HttpServer server = vertx.createHttpServer(options);Router router =Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT).handler(ctx ->{WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class);fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat);success(ctx);});
router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT).blockingHandler(ctx ->{TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class);try{fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req);out(ctx,AskResponse.succeed(null));}catch(Exception e){
log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e);out(ctx,AskResponse.failed(ExceptionUtils.getMessage(e)));}});
router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT).blockingHandler(ctx ->{WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class);fetchWorkerRequestHandler().processWorkerLogReport(req);success(ctx);});
server.requestHandler(router).listen(port);}privatestaticvoidout(RoutingContext ctx,Object msg){
ctx.response().putHeader(OmsConstant.HTTP_HEADER_CONTENT_TYPE,OmsConstant.JSON_MEDIA_TYPE).end(JsonObject.mapFrom(msg).encode());}privatestaticvoidsuccess(RoutingContext ctx){out(ctx,ResultDTO.success(null));}}
客户端启动
PowerJobWorker
实现了InitializingBean
,执行方法PowerJobWorker#init
。该方法中会连接服务器
ServerDiscoveryService serverDiscoveryService =newServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
serverDiscoveryService.start(timingPool);
- 启动服务,
ServerDiscoveryService#start
。进行了服务的发现,在ServerDiscoveryService#acquire
方法中调用http://127.0.0.1:7700/server/acquire?appId=2¤tServer=null&protocol=AKKA
,找到服务器地址10.132.17.10:10086
。 - 客户端会也会启动akka服务,加载
oms-worker.akka.conf
的配置,设置端口号27777
。
客户端-服务发现
- 客户端发起
http://127.0.0.1:7700/server/acquire?appId=2¤tServer=null&protocol=AKKA
接口获取当前akka的server的地址。 - 服务端响应,
ServerController#acquireServer
。服务端设置Ping请求信息,访问Ping接口,path地址:akka://[email protected]:10086/user/friend_actor
,调用成功则说明地址是ok的。
privateStringactiveAddress(String serverAddress,Set<String> downServerCache,String protocol){if(downServerCache.contains(serverAddress)){returnnull;}if(StringUtils.isEmpty(serverAddress)){returnnull;}Ping ping =newPing();
ping.setCurrentTime(System.currentTimeMillis());ActorSelection serverActor =AkkaStarter.getFriendActor(serverAddress);try{CompletionStage<Object> askCS =Patterns.ask(serverActor, ping,Duration.ofMillis(PING_TIMEOUT_MS));AskResponse response =(AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS,TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress);if(response.isSuccess()){returnJsonUtils.parseObject(response.getData(),JSONObject.class).getString(protocol);}}catch(Exception e){
log.warn("[ServerElection] server({}) was down.", serverAddress);}
downServerCache.add(serverAddress);returnnull;}
publicstaticActorSelectiongetFriendActor(String address){String path =String.format(AKKA_PATH,RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address,RemoteConstant.SERVER_FRIEND_ACTOR_NAME);return actorSystem.actorSelection(path);}
FriendRequestHandler#onReceivePing
会处理传到FriendRequestHandler
里面的Ping请求。
privatevoidonReceivePing(Ping ping){getSender().tell(AskResponse.succeed(TransportService.getAllAddress()),getSelf());}
服务端-执行任务
- 入口是
JobController#runImmediately
,先过切面DesignateServerAspect#execute
,执行InstanceService#create
进行任务实例创建。经过切面UseCacheLockAspect#execute
进行分发DispatchService#dispatch
。 - 如果可以找到合适的worker线程,则构造请求实体,发送请求;如果找不到worker,则
InstanceManager#processFinishedInstance
,完成或者失败,都需要一些处理。比如日志打印,告警。
// 获取当前最合适的 worker 列表List<WorkerInfo> suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo);if(CollectionUtils.isEmpty(suitableWorkers)){
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
instanceInfoRepository.update4TriggerFailed(instanceId,FAILED.getV(), current, current,RemoteConstant.EMPTY_ADDRESS,SystemInstanceResult.NO_WORKER_AVAILABLE, now);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(),FAILED,SystemInstanceResult.NO_WORKER_AVAILABLE);return;}List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());// 构造任务调度请求ServerScheduleJobReq req =constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);// 发送请求(不可靠,需要一个后台线程定期轮询状态)WorkerInfo taskTracker = suitableWorkers.get(0);String taskTrackerAddress = taskTracker.getAddress();
transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req);
log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);// 修改状态
instanceInfoRepository.update4TriggerSucceed(instanceId,WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now);// 装载缓存
instanceMetadataService.loadJobInfo(instanceId, jobInfo);
- 因为是akka协议,所以用到了
AkkaTransporter
。拼接客户端地址akka://[email protected]:27777/user/worker
,调用服务。构造的对象是ServerScheduleJobReq
。
publicvoidtell(String address,PowerSerializable object){ActorSelection taskTrackerActor =AkkaStarter.getWorkerActor(address);
taskTrackerActor.tell(object,null);}
- 客户端接受到请求,
TaskTrackerActor#onReceiveServerScheduleJobReq
,会初始化CommonTaskTracker#initTaskTracker
,开启定时任务执行Dispatcher
。分发任务,会TaskTracker#dispatchTask
,用到了akka框架,发送TaskTrackerStartTaskReq
对象,用的处理器是processor_tracker
。 ProcessorTrackerActor#onReceiveTaskTrackerStartTaskReq
,会将任务信息封装成ProcessorRunnable
,找到对应的BasicProcessor
进行处理。
ClassLoader classLoader = omsContainer ==null?getClass().getClassLoader(): omsContainer.getContainerClassLoader();ProcessorRunnable processorRunnable =newProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue, workerRuntime);try{
threadPool.submit(processorRunnable);
success =true;}catch(RejectedExecutionException ignore){
log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
instanceId, newTask.getTaskId(), newTask.getTaskName());}catch(Exception e){
log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);}
- 调用成功之后,会发送
ProcessorReportTaskStatusReq
请求。
// 2. 回复接收成功if(success){ProcessorReportTaskStatusReq reportReq =newProcessorReportTaskStatusReq();
reportReq.setInstanceId(instanceId);
reportReq.setSubInstanceId(newTask.getSubInstanceId());
reportReq.setTaskId(newTask.getTaskId());
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
reportReq.setReportTime(System.currentTimeMillis());
taskTrackerActorRef.tell(reportReq,null);
log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());}
客户端-发送心跳
PowerJobWorker
实现了InitializingBean
,在init方法中会执行timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);
。获取服务端的接口地址,akka://[email protected]:10086/user/server_actor
,发送心跳
// 发送请求String serverPath =AkkaUtils.getServerActorPath(currentServer);if(StringUtils.isEmpty(serverPath)){return;}ActorSelection actorSelection = workerRuntime.getActorSystem().actorSelection(serverPath);
actorSelection.tell(heartbeat,null);
- 服务端
AbWorkerRequestHandler#processWorkerHeartbeat
,用来处理心跳请求。Server在接收到心跳信息后会进行状态的更新,ClusterStatusHolder#updateStatus
。
publicvoidupdateStatus(WorkerHeartbeat heartbeat){String workerAddress = heartbeat.getWorkerAddress();long heartbeatTime = heartbeat.getHeartbeatTime();WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore ->{WorkerInfo wf =newWorkerInfo();
wf.refresh(heartbeat);return wf;});long oldTime = workerInfo.getLastActiveTime();if(heartbeatTime < oldTime){
log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(),System.currentTimeMillis(), heartbeat.getHeartbeatTime());return;}
workerInfo.refresh(heartbeat);List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();if(!CollectionUtils.isEmpty(containerInfos)){
containerInfos.forEach(containerInfo ->{Map<String,DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore ->Maps.newConcurrentMap());
infos.put(workerAddress, containerInfo);});}}
总结
- akka是一个说是比较简单的框架,但是是scala写的,不同版本差异很大,不熟悉scala的很难用的好。
- vertx是比较简单的框架。入门可以参考这篇博客:
https://blog.csdn.net/qq_42985872/article/details/128494611
- 目前只整理了如上5个功能的源码,其实我对任务调度框架的理解,就是服务端告诉客户端你该执行了。作者还是做了很多检查任务状态的处理,保证了任务可以顺利执行。更加全面,也更加复杂了。
本文转载自: https://blog.csdn.net/qq_42985872/article/details/128500740
版权归原作者 秋装什么 所有, 如有侵权,请联系我们删除。
版权归原作者 秋装什么 所有, 如有侵权,请联系我们删除。