0


基于Curator(zookeeper)实现leader选举

在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来,如下图所示,这个就是所谓的leader选举,而zookeeper作为leader选举的功能,在很多中间件中都有使用,比如kafka基于zookeeper实现leader选举,Hadoop、Spark等。

image-20220313221343466

Curator实现leader选举

除了作为集群节点的leader选举之外,leader选举还可以用在其他的场景,比如在分布式调度任务系统中,从可靠性角度出发,集群也是必不可少的。但往往,为了保证任务不会重复分配,分配任务的节点只能有一个,这种情况就需要从集群中选出一个Leader(老大)去任务池里取任务,如下图所示。

image-20220313221632203

本文就会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析,Curator有两种选举recipe(Leader Latch和Leader Election),两种实现机制上有一定的差异,后续会逐步说明。

1. LeaderLatch使用实战

首先我们实现定时调度任务。

Quartz中最重要的三个对象:Job、Trigger、Scheduler。

  • Job,表示任务
  • Trigger,配置调度参数
  • Scheduler,代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger

我们首先引入相关依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId><version>2.5.3</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version></dependency>

接下来我们通过继承

SchedulerFactoryBean

从而可以进行一个定时任务的触发。引入

LeaderLatch

以及定义相关namespace。

publicclassZkSchedulerFactoryBeanextendsSchedulerFactoryBean{private LeaderLatch leaderLatch;privatefinal String LEADER_PATH ="/leader";//namespace}

编写该类的构造方法对

LeaderLatch

进行相关初始化:

首先需要关闭自动开启定时任务,然后初始化

LeaderLatch

的时候传入客户端以及相关路径,同时添加相关监听,以便leader挂掉之后可以监听新leader。

publicZkSchedulerFactoryBean()throws Exception {this.setAutoStartup(false);//应用启动的时候不自动开启定时任务

        leaderLatch =newLeaderLatch(getClient(), LEADER_PATH);
        leaderLatch.addListener(newDemoLeaderLatchListener(this));//当leader发生变化的时候,需要触发监听
        leaderLatch.start();}private CuratorFramework getClient(){
            CuratorFramework curatorFramework = CuratorFrameworkFactory
                    .builder().connectString("localhost:2181").sessionTimeoutMs(15000).connectionTimeoutMs(20000).retryPolicy(newExponentialBackoffRetry(1000,10)).build();
            curatorFramework.start();return curatorFramework;}

我们需要创建一个新的监听:通过构造器传入

SchedulerFactoryBean

以便控制定时任务启动和停止。如果抢占成功则开启定时任务,如果抢占失败则停止定时任务。

image-20220313223342812

publicclassDemoLeaderLatchListenerimplementsLeaderLatchListener{//控制定时任务启动和停止的方法private SchedulerFactoryBean schedulerFactoryBean;publicDemoLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean){this.schedulerFactoryBean = schedulerFactoryBean;}@OverridepublicvoidisLeader(){
        System.out.println(Thread.currentThread().getName()+"成为了leader");
        schedulerFactoryBean.setAutoStartup(true);
        schedulerFactoryBean.start();}@OverridepublicvoidnotLeader(){
        System.out.println(Thread.currentThread().getName()+"抢占leader失败,不执行任务");
        schedulerFactoryBean.setAutoStartup(false);
        schedulerFactoryBean.stop();}}

我们还需要在

ZkSchedulerFactoryBean

类中重写

startScheduler

destroy

方法。

@OverrideprotectedvoidstartScheduler(Scheduler scheduler,int startupDelay)throws SchedulerException {if(this.isAutoStartup()){super.startScheduler(scheduler, startupDelay);}}/**
 * 释放资源
 * @throws SchedulerException
 */@Overridepublicvoiddestroy()throws SchedulerException {
    CloseableUtils.closeQuietly(leaderLatch);super.destroy();}

上面完成之后,我们就可以去定义具体的定时任务了。

创建一个类继承

QuartzJobBean

即可,然后在

executeInternal

方法中定义我们需要执行的任务。

publicclassQuartzJobextendsQuartzJobBean{@OverrideprotectedvoidexecuteInternal(JobExecutionContext jobExecutionContext)throws JobExecutionException {
        System.out.println("开始执行定时任务");
        SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("当前执行的系统时间:"+ sdf.format(newDate()));}}

具体的定时任务创建完成后,我们就可以去定义我们的触发器了。

首先创建类,添加

Configuration

将该类交给Spring管理,然后需要声明我们之前定义的

ZkSchedulerFactoryBean

类,将其交给Spring容器管理,这样才能对我们的定时任务进行一个触发。然后声明触发器以及定时任务。(方法中的参数都会通过依赖注入的方式传入)

@ConfigurationpublicclassQuartzConfiguration{//触发@Beanpublic ZkSchedulerFactoryBean schedulerFactoryBean(JobDetail jobDetail, Trigger trigger)throws Exception {
        ZkSchedulerFactoryBean zkSchedulerFactoryBean =newZkSchedulerFactoryBean();
        zkSchedulerFactoryBean.setJobDetails(jobDetail);
        zkSchedulerFactoryBean.setTriggers(trigger);return zkSchedulerFactoryBean;}//定时任务@Beanpublic JobDetail jobDetail(){return JobBuilder.newJob(QuartzJob.class).storeDurably().build();}@Beanpublic Trigger trigger(JobDetail jobDetail){//定义一个简单执行器,一秒执行一次,重复执行。
        SimpleScheduleBuilder simpleScheduleBuilder =
                SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever();return TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(simpleScheduleBuilder).build();}}

至此我们的代码全部编写完成,接下来就是测试了。首先我们需要开启两个springboot项目,注意需要自行修改下端口号。

image-20220313225416199

这个时候我们打开我们的zk服务器,然后启动两个项目即可。

我们可以发现leader2抢占了leader开始执行定时任务,leader1还在继续等待。

image-20220313231613299

image-20220313231620048

这时候查看我们ZK上面的节点信息:可以发现两个临时节点。

image-20220313231714443

我们手动将leader2停止,查看leader1的效果。(有可能不是实时的,会有一些延迟。)

image-20220313231817248

image-20220313231824170

以上便是我们实现高可用的一种简单方法。

2. LeaderSelector实战

LeaderSelector和Leader Latch最的差别在于,leader可以释放领导权以后,还可以继续参与竞争。

我们通过以下一个简单案例来了解一下。

publicclassSelectorClientExampleextendsLeaderSelectorListenerAdapterimplementsCloseable{privatefinal String name;privatefinal LeaderSelector leaderSelector;publicSelectorClientExample(String path, String name){
        leaderSelector =newLeaderSelector(getClient(), path,this);
        leaderSelector.autoRequeue();this.name = name;}@Overridepublicvoidclose()throws IOException {
        leaderSelector.close();}publicvoidstart(){
        leaderSelector.start();}@OverridepublicvoidtakeLeadership(CuratorFramework client)throws Exception {
        System.out.println(name +" 成为Leader");
        Thread.sleep(1000);}private CuratorFramework getClient(){
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder().connectString("localhost:2181").sessionTimeoutMs(15000).connectionTimeoutMs(20000).retryPolicy(newExponentialBackoffRetry(1000,10)).build();
        curatorFramework.start();return curatorFramework;}publicstaticvoidmain(String[] args)throws IOException {
        String path ="/leader";for(int i =0; i <10; i++){
            SelectorClientExample selectorClientExample =newSelectorClientExample(path,"Client:"+ i);
            selectorClientExample.start();}
        System.in.read();}}

这时候我们进行测试,查看节点信息可以发现各个节点在重复尝试竞争leader。

image-20220313232704571

项目地址

zk demo


本文转载自: https://blog.csdn.net/qq_41432730/article/details/123468940
版权归原作者 、楽. 所有, 如有侵权,请联系我们删除。

“基于Curator(zookeeper)实现leader选举”的评论:

还没有评论