在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来,如下图所示,这个就是所谓的leader选举,而zookeeper作为leader选举的功能,在很多中间件中都有使用,比如kafka基于zookeeper实现leader选举,Hadoop、Spark等。
Curator实现leader选举
除了作为集群节点的leader选举之外,leader选举还可以用在其他的场景,比如在分布式调度任务系统中,从可靠性角度出发,集群也是必不可少的。但往往,为了保证任务不会重复分配,分配任务的节点只能有一个,这种情况就需要从集群中选出一个Leader(老大)去任务池里取任务,如下图所示。
本文就会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析,Curator有两种选举recipe(Leader Latch和Leader Election),两种实现机制上有一定的差异,后续会逐步说明。
1. LeaderLatch使用实战
首先我们实现定时调度任务。
Quartz中最重要的三个对象:Job、Trigger、Scheduler。
- Job,表示任务
- Trigger,配置调度参数
- Scheduler,代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger
我们首先引入相关依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId><version>2.5.3</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency>
接下来我们通过继承
SchedulerFactoryBean
从而可以进行一个定时任务的触发。引入
LeaderLatch
以及定义相关namespace。
publicclassZkSchedulerFactoryBeanextendsSchedulerFactoryBean{private LeaderLatch leaderLatch;privatefinal String LEADER_PATH ="/leader";//namespace}
编写该类的构造方法对
LeaderLatch
进行相关初始化:
首先需要关闭自动开启定时任务,然后初始化
LeaderLatch
的时候传入客户端以及相关路径,同时添加相关监听,以便leader挂掉之后可以监听新leader。
publicZkSchedulerFactoryBean()throws Exception {this.setAutoStartup(false);//应用启动的时候不自动开启定时任务
leaderLatch =newLeaderLatch(getClient(), LEADER_PATH);
leaderLatch.addListener(newDemoLeaderLatchListener(this));//当leader发生变化的时候,需要触发监听
leaderLatch.start();}private CuratorFramework getClient(){
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder().connectString("localhost:2181").sessionTimeoutMs(15000).connectionTimeoutMs(20000).retryPolicy(newExponentialBackoffRetry(1000,10)).build();
curatorFramework.start();return curatorFramework;}
我们需要创建一个新的监听:通过构造器传入
SchedulerFactoryBean
以便控制定时任务启动和停止。如果抢占成功则开启定时任务,如果抢占失败则停止定时任务。
publicclassDemoLeaderLatchListenerimplementsLeaderLatchListener{//控制定时任务启动和停止的方法private SchedulerFactoryBean schedulerFactoryBean;publicDemoLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean){this.schedulerFactoryBean = schedulerFactoryBean;}@OverridepublicvoidisLeader(){
System.out.println(Thread.currentThread().getName()+"成为了leader");
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.start();}@OverridepublicvoidnotLeader(){
System.out.println(Thread.currentThread().getName()+"抢占leader失败,不执行任务");
schedulerFactoryBean.setAutoStartup(false);
schedulerFactoryBean.stop();}}
我们还需要在
ZkSchedulerFactoryBean
类中重写
startScheduler
及
destroy
方法。
@OverrideprotectedvoidstartScheduler(Scheduler scheduler,int startupDelay)throws SchedulerException {if(this.isAutoStartup()){super.startScheduler(scheduler, startupDelay);}}/**
* 释放资源
* @throws SchedulerException
*/@Overridepublicvoiddestroy()throws SchedulerException {
CloseableUtils.closeQuietly(leaderLatch);super.destroy();}
上面完成之后,我们就可以去定义具体的定时任务了。
创建一个类继承
QuartzJobBean
即可,然后在
executeInternal
方法中定义我们需要执行的任务。
publicclassQuartzJobextendsQuartzJobBean{@OverrideprotectedvoidexecuteInternal(JobExecutionContext jobExecutionContext)throws JobExecutionException {
System.out.println("开始执行定时任务");
SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("当前执行的系统时间:"+ sdf.format(newDate()));}}
具体的定时任务创建完成后,我们就可以去定义我们的触发器了。
首先创建类,添加
Configuration
将该类交给Spring管理,然后需要声明我们之前定义的
ZkSchedulerFactoryBean
类,将其交给Spring容器管理,这样才能对我们的定时任务进行一个触发。然后声明触发器以及定时任务。(方法中的参数都会通过依赖注入的方式传入)
@ConfigurationpublicclassQuartzConfiguration{//触发@Beanpublic ZkSchedulerFactoryBean schedulerFactoryBean(JobDetail jobDetail, Trigger trigger)throws Exception {
ZkSchedulerFactoryBean zkSchedulerFactoryBean =newZkSchedulerFactoryBean();
zkSchedulerFactoryBean.setJobDetails(jobDetail);
zkSchedulerFactoryBean.setTriggers(trigger);return zkSchedulerFactoryBean;}//定时任务@Beanpublic JobDetail jobDetail(){return JobBuilder.newJob(QuartzJob.class).storeDurably().build();}@Beanpublic Trigger trigger(JobDetail jobDetail){//定义一个简单执行器,一秒执行一次,重复执行。
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever();return TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(simpleScheduleBuilder).build();}}
至此我们的代码全部编写完成,接下来就是测试了。首先我们需要开启两个springboot项目,注意需要自行修改下端口号。
这个时候我们打开我们的zk服务器,然后启动两个项目即可。
我们可以发现leader2抢占了leader开始执行定时任务,leader1还在继续等待。
这时候查看我们ZK上面的节点信息:可以发现两个临时节点。
我们手动将leader2停止,查看leader1的效果。(有可能不是实时的,会有一些延迟。)
以上便是我们实现高可用的一种简单方法。
2. LeaderSelector实战
LeaderSelector和Leader Latch最的差别在于,leader可以释放领导权以后,还可以继续参与竞争。
我们通过以下一个简单案例来了解一下。
publicclassSelectorClientExampleextendsLeaderSelectorListenerAdapterimplementsCloseable{privatefinal String name;privatefinal LeaderSelector leaderSelector;publicSelectorClientExample(String path, String name){
leaderSelector =newLeaderSelector(getClient(), path,this);
leaderSelector.autoRequeue();this.name = name;}@Overridepublicvoidclose()throws IOException {
leaderSelector.close();}publicvoidstart(){
leaderSelector.start();}@OverridepublicvoidtakeLeadership(CuratorFramework client)throws Exception {
System.out.println(name +" 成为Leader");
Thread.sleep(1000);}private CuratorFramework getClient(){
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder().connectString("localhost:2181").sessionTimeoutMs(15000).connectionTimeoutMs(20000).retryPolicy(newExponentialBackoffRetry(1000,10)).build();
curatorFramework.start();return curatorFramework;}publicstaticvoidmain(String[] args)throws IOException {
String path ="/leader";for(int i =0; i <10; i++){
SelectorClientExample selectorClientExample =newSelectorClientExample(path,"Client:"+ i);
selectorClientExample.start();}
System.in.read();}}
这时候我们进行测试,查看节点信息可以发现各个节点在重复尝试竞争leader。
项目地址
zk demo
版权归原作者 、楽. 所有, 如有侵权,请联系我们删除。