zookeeper相当于一个中介,服务器和客户端对于它来说都是客户
所以服务器和客户端都可以理解成本质是服务器,但是都可以当作是客户端,因为都是客户
服务器执行的是create操作,客户端执行的是get -w监听操作,这是本质区别
- 单机部署:一般用来检验 Zookeeper 基础功能,熟悉 Zookeeper 各种基础操作及特性。
- 伪集群部署:在单台机器上部署集群,方便在本地验证集群模式下的各种功能。
- 集群部署:一般在生产环境使用,具备一致性、分区容错性。
一、概述
// 主要见讲义的前4页
主要是:
统一命名服务、统一配置管理、统一集群管理、服务器动态上下线、软负载均衡
二、安装
分为本地安装和集群安装
主要都是分为客户端和服务器
// 本地主要看讲义6-7页// 集群安装看8-9页
- 这里我们主要讲集群安装
- 集群规划
- 解压在目标目录地址下,/opt/module
- 配置服务器编号,十分重要,在解压的文件夹中创建文件夹zkData,然后在里面创建myid文件每一个主机都有自己的一个id
- 配置解压文价夹中的conf文件夹中的zoo_sample,改名为zoo.cfg
- 里面修改tmp存储路径,避免之后数据因为到期被缓存清除
- 同时增加服务器之间的配置,比如server.2=hadoop102:2888:3888
- 编写统一开启关闭集群zookeeper的脚本
三、集群操作
3.1、选举机制【面试重点】
主要分为第一次启动和非第一次启动两种情况下的机制内容
同时要分清楚那三种ID
之后那些ID还会用到
3.2、客户端命令行操作
这里需要强调的是启动客户端的时候,不是简单的bin/zxCli.sh
需要加上你选用主机当作客户端的ip地址,因为我们已经做了host的映射了
这里直接
bin/zxCli.sh -server hadooop102:2181
// 具体的命令内容看讲义11-12页
3.3、节点类型
1、节点数据信息
进入客户端可以查看节点信息
// 通过 ls ///具体看讲义12-13页
2、节点类型
- 持久节点客户端和服务器断开后,创建的节点不删除1. 带有序号: create -s /sanguo “weiguo”2. 不带序号: create /sanguo “weiguo”
- 临时节点客户端和服务器断开后,创建的节点删除1. 带有序号:create -e -s /sanguo “weiguo”2. 不带序号:create -e /sanguo “weiguo”
// 主要看讲义13-14页
3.4、监听器原理【重点】
客户端监听服务器
主要监听:
- 节点数据的变化
- 子节点增减的变化
但是需要注意,注册一次,就是那个代码写一次,只能监听一次,想再次监听,就要再次注册
<img
// 主要看讲义的15-16页
3.5、客户端API操作
- 先启动集群的zookeeper
- 创建maven工程
- 配置pom文件
- 创建实现类,里面主要是初始化和create创建节点等操作
1、创建节点
这里我们首先写的是创建节点以及初始化
需要注意的是,我们init上面是@Before,因为要先初始化,不能也是@Test,要不然zkClient没有指定的对象,会报空指针
publicclass zkClient {// 注意:逗号前后不能有空格privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatestaticint sessionTimeout =2000;privateZooKeeper zkClient =null;@Beforepublicvoidinit()throwsIOException{
zkClient =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){}});}@Testpublicvoidcreate()throwsInterruptedException,KeeperException{String nodeCreated = zkClient.create("/atguigu","shuaige".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}
2、获取子节点并监听节点变化【有分析流程
本来单纯的如果想获取子节点,监听节点的一次变化,注意是一次!那可以这么写代码
其中getchildren中的Watcher参数在上面 init 的里面写到了
可以看下面代码
publicclass zkClient {// 注意:逗号前后不能有空格privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatestaticint sessionTimeout =2000;privateZooKeeper zkClient =null;@Beforepublicvoidinit()throwsIOException{
zkClient =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){}});}@Testpublicvoidgetchildren()throwsInterruptedException,KeeperException{List<String> children = zkClient.getChildren("/",true);for(String child : children){System.out.println(child);}}}
但是有新的问题了
我们如果在集群中进行操作,添加或者删除节点,我们在想怎么可以实时地可以在API中监听到
我们就想着给其加一个延时阻塞,因为其实getchildren这个调用地方法相当于是个线程,我们假如延时他结束地时间是不是就可以一直持续地收到监听到客户端地消息了
就是这个
// 延时阻塞
// 这个是为了可以有一段延时,操作增加节点后还能监控到
Thread.sleep(Long.MAX_VALUE);
具体在getchildren中这么写
@Testpublicvoidgetchildren()throwsInterruptedException,KeeperException{List<String> children = zkClient.getChildren("/",true);for(String child : children){System.out.println(child);}// 延时阻塞// 这个是为了可以有一段延时,操作增加节点后还能监控到Thread.sleep(Long.MAX_VALUE);}
不过我们进行测试,发现还是不可以监听到节点地动态变化
我们发现关键在于zkClient.getChildren(“/”, true);中的true表示的是问你是否开始观察,但是我们知道监听器有一个特点,是注册一次,只能监听一次
我们每一次都相当于还是只能监听一次,那怎么更改呢?
我们可以在初始的init中进行更改,init表示每一次只要执行了zk的api指令,就会走监听器的重写方法【只要zookeeper发生了变化就会触动Java程序的监听器,相当于都是重新注册一次】
所以我们可以吧zkClient.getChildren(“/”, true);这几行放在init 方法里面
就是这几行放在上面 init 中
List<String> children = zkClient.getChildren("/",true);for(String child : children){System.out.println(child);}
最后展示如下:
其中try catch是直接抛出来即可
publicclass zkClient {// 注意:逗号前后不能有空格privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privatestaticint sessionTimeout =2000;privateZooKeeper zkClient =null;@Beforepublicvoidinit()throwsIOException{
zkClient =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){// 收到事件通知后的回调函数(用户的业务逻辑)System.out.println(watchedEvent.getType()+"--"+ watchedEvent.getPath());// 再次启动监听try{List<String> children = zkClient.getChildren("/",true);for(String child : children){System.out.println(child);}}catch(Exception e){
e.printStackTrace();}}});}@Testpublicvoidgetchildren()throwsInterruptedException,KeeperException{List<String> children = zkClient.getChildren("/",true);for(String child : children){System.out.println(child);}}}
3、判断 Znode 是否存在
// 判断 znode 是否存在@Testpublicvoidexist()throwsException{Stat stat = zkClient.exists("/atguigu",false);System.out.println(stat ==null?"not exist":"exist");}
3.6、客户端向服务端写数据流程
// 主要见讲义第19页
四、服务器动态上下线监听案例
我们需要模拟的是服务器在zookeeper上创建节点
1、设计思路
每一次运行API程序都是相当于:
- 模拟服务器和zookeeper的一次连接,就是一次服务器上线,重新运行的时候相当于之前创建的服务器节点已经下线挂掉,因为我们创建的是临时节点
- 模拟客户端对zookeeper上连接的服务器的监听,同时处理一些工作process(这里单纯只是睡眠)
2、程序思路
写API程序的思路
- 我们就是缺啥,写啥,如果包里面有写好的方法,就直接ctrl+p然后看看参数,去用
- 如果包里面没有封装好的方法,就自己自定义写一个新方法,然后“ 。var ”。直接补全程序,最后抛出来异常即可
2.1 服务器
思路:
- 获取zookeeper的连接,也就是获取对象
- 注册,也就是创建一个服务器路径,注册服务器到集群zookeeper中
- 启动业务逻辑,这里没有别的需求,所以就是“线程睡觉”
我们主体思路就是上述,因为我们的流程就是先是服务器连接到集群,然后注册通知集群,也就是create节点,最后去完成其余业务逻辑
所以我们上述三步是在main方法中完成的,然后我们再单独地实现三步骤中的每一个自定义方法,因为封装的包里面没有我们所需要的方法
- 其中第一步创建连接获取对象,其实想法就是new一个zookeeper对象,里面的参数可以通过ctrl + p看到就是
(connectString, sessionTimeout, new Watcher{}),里面的connectString, sessionTimeout可以通过【ctrl + alt + f 升级为全局变量,进行赋值】
- 而第二步的注册,相当于就是create一个线程,一个节点,在里面定义节点的路径,节点的内容,节点的类型。但是需要注意的是我们在main方法中需要给自定义的regist加一个参数,一位内自定义方法中我们用到这个参数当作节点的内容,至于args[0],怎么来,在maven工程中点击运行那个图标中的edit,在里面:
具体代码展示:
publicclassDistributeServer{privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privateint sessionTimeout =2000;privateZooKeeper zk;publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{DistributeServer server =newDistributeServer();//1 获取zk连接(也就是获取对象)
server.getConnect();//2 注册,也就是创建一个服务器路径,注册服务器到zk集群
server.regist(args[0]);//3 启动业务逻辑(睡觉)
server.business();}privatevoidbusiness()throwsInterruptedException{Thread.sleep(Long.MAX_VALUE);}privatevoidregist(String hostname)throwsInterruptedException,KeeperException{String create = zk.create("/servers/"+hostname, hostname.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);// 打印一下已经创建路径,注册服务器System.out.println(hostname +" is online");}voidgetConnect()throwsIOException{
zk =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){}});}}
2.2 客户端
客户端的代码思路同上,还是没有的方法就自己创建
主要思路是:
- 获取zk连接 getConnect()
- 监听服务器下面子节点的增加和删除变化:getSeverList()监听的方法我们在上面3.5节中也有详细的说明,但是这里再强调一下,可以自己重新再getchildren中写new Watcher也可以利用在zookeeper连接时候的初始化方法中,也就是new ZooKeeper(connectString, sessionTimeout, new Watcher()中的这个watcher但是为了持续可以监听,需要在zookeeper那个初始化方法中再重新调用一次getSeverList这个方法,就是我们编写监听服务器子节点的方法,然后抛出异常
- 业务逻辑(睡觉): business()
main方法中主体思路
//1 获取zk连接
client.getConnect();//2 监听服务器下面子节点的增加和删除变化
client.getSeverList();//3 业务逻辑(睡觉)
client.business();
接下来看详细代码:
publicclassDistributeClient{privateString connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";privateint sessionTimeout =2000;privateZooKeeper zk;publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{DistributeClient client =newDistributeClient();//1 获取zk连接
client.getConnect();//2 监听服务器下面子节点的增加和删除变化
client.getSeverList();//3 业务逻辑(睡觉)
client.business();}privatevoidbusiness()throwsInterruptedException{Thread.sleep(Long.MAX_VALUE);}privatevoidgetSeverList()throwsInterruptedException,KeeperException{List<String> children = zk.getChildren("/servers",true);ArrayList<String> servers =newArrayList<>();for(String child : children){byte[] data = zk.getData("/servers/"+ child,false,null);//我们想要把所有,注意是所有服务器中监听到的节点值都封装到一个集合中,最后统一打印
servers.add(newString(data));}//打印System.out.println(servers);}privatevoidgetConnect()throwsIOException{
zk =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){try{getSeverList();}catch(InterruptedException e){
e.printStackTrace();}catch(KeeperException e){
e.printStackTrace();}}});}}
五、分布式锁案例
首先明确一下分布式锁的概念:
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源
"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁
通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。
1、需求分析
- 客户端要访问zookeeper集群,让其创建带序号的临时节点
- 创建节点后,需要判断此节点是不是当前序号最小的节点,如果不是最小的就要监听前面的节点
- 同时去获取锁,以及可以释放锁
所以整体框架在代码中先可以这么写:
publicclassDistributedLock{publicDistributedLock(){// 获取连接// 判断根节点/locks是否存在}//对zookeeper加锁publicvoidzklock(){//创建对应的临时带序号的节点//判断创建的节点是不是序号最小的节点,如果是,获取到锁,如果不是,监听序号的前一个节点}//解锁publicvoidunZklock(){//删除节点}}
2、分布式锁代码实现
说实话这部分比较难一些,有些绕
主要需要理清思路,加锁其实就是相当于在/locks路径下创建节点,是创建有序号的临时节点
这样我们就可以通过对节点序号的截取,然后判断它是不是第一个节点,如果不是的话,那我们就去监听他的前一个节点
释放锁,其实就是删除当前节点
我们不要单纯的将眼光放在一台机器上,我们只是在一台机器模拟实际两台机器去同时创建节点以及进行相关操作,所以在测试代码中我们new 了两个thread线程
在分布式锁的代码中重点去看zklock是怎么实现的,另外我们为了可以确认zk是否连接上,我们可以加一个这个api:
private CountDownLatch connectLatch = new CountDownLatch(1);
这个是为了确保上面代码是执行完的,程序才会往下走,可以具体看下面代码的实现,在process中进行了判断
publicclassDistributedLock{privatefinalString connectString ="hadoop102:2181,hadoop103:2181, hadoop104:2181";privatefinalint sessionTimeout =2000;privatefinalZooKeeper zk;privateString waitPath;// 为了可以确认zk是否连接上,我们可以加一个这个api//这个是为了确保zk连接上后,程序才往下走,在new zookeeper的下一行代码就是起到这个作用privateCountDownLatch connectLatch =newCountDownLatch(1);privateCountDownLatch waitLatch =newCountDownLatch(1);privateString currentNode;publicDistributedLock()throwsIOException,InterruptedException,KeeperException{// 获取连接
zk =newZooKeeper(connectString, sessionTimeout,newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){//相当于监听到了,连接上了zk,以及后面监听到了前一个节点下线了//connectLatch 如果连接上zk,可以释放if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
connectLatch.countDown();}//waitLatch 需要释放:如果是删除节点,并且是删除的是前一个节点if(watchedEvent.getType()==Event.EventType.NodeDeleted&& watchedEvent.getPath().equals(waitPath)){
waitLatch.countDown();}}});//等待zk正常连接后,代码才会往下走
connectLatch.await();// 判断根节点/locks是否存在// 返回的是一个状态Stat stat = zk.exists("/locks",false);if(stat ==null){//创建一个根节点
zk.create("/locks","locks".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}//对zookeeper加锁publicvoidzklock(){}//解锁publicvoidunZklock(){}}
下面是加锁的过程
//对zookeeper加锁publicvoidzklock(){//创建对应的临时带序号的节点try{
currentNode = zk.create("/locks/"+"seq-",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是不是序号最小的节点,如果是,获取到锁,如果不是,监听序号的前一个节点// 先看看有多少节点List<String> children = zk.getChildren("/locks",false);//如果children只有一个值,那就直接获取锁,如果有多个节点,那就判断谁的序号最小if(children.size()==1){return;}else{//先进行排序Collections.sort(children);//获取节点名称:类似"seq-0000000000"String thisNode = currentNode.substring("/locks/".length());//因为参数要求int类型,所以我们用的是长度//通过seq-0000000000获取该节点在children集合中的位置int index = children.indexOf(thisNode);//判断if(index ==-1){System.out.println("数据异常");}elseif(index ==0){//集合中只有一个节点,那么直接获取锁了return;}else{//需要监听前一个节点变化
waitPath ="/locks/"+ children.get(index -1);
zk.getData(waitPath,true,null);//等待监听,监听不结束,就需要一直卡在这里
waitLatch.await();return;}}}catch(KeeperException e){
e.printStackTrace();}catch(InterruptedException e){
e.printStackTrace();}}
解锁的过程
//解锁publicvoidunZklock(){//删除节点try{
zk.delete(currentNode,-1);//版本号给-1或者1都行}catch(InterruptedException e){
e.printStackTrace();}catch(KeeperException e){
e.printStackTrace();}}
最后创建两个线程,模拟是两个机器同时相差间隔很短的时间连接服务器,这时候就会使用到锁
前面一个线程的锁没有释放的时候,后面的客户端就没法访问数据
publicclassDistributedLockTest{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{//模拟两个客户端都要创建锁// 加上final 是因为匿名内部类想用方法中定义的局部变量必须加final,jdk1.8之后不用加了finalDistributedLock lock1 =newDistributedLock();finalDistributedLock lock2 =newDistributedLock();//两个客户端都要创建锁newThread(newRunnable(){@Overridepublicvoidrun(){try{
lock1.zklock();System.out.println("线程一 启动锁,获取到锁");Thread.sleep(5*1000);
lock1.unZklock();System.out.println("线程一 释放锁");}catch(InterruptedException e){
e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{
lock2.zklock();System.out.println("线程二 启动锁,获取到锁");Thread.sleep(5*1000);
lock2.unZklock();System.out.println("线程二 释放锁");}catch(InterruptedException e){
e.printStackTrace();}}}).start();}}
最后结果:
3、成熟框架:Curator
1、首先添加相关依赖
dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>
2、代码实现
- 首先在main方法中创建分布式锁,用封装好的api实现。第一个参数是获取一个客户端,第二个参数是需要监控的路径
//创建分布式锁1InterProcessMutex lock1 =newInterProcessMutex(getCuratorFramework(),"/locks");
- 其中第一个参数获取客户端我们封装到自定的方法中,定义好client后client.start,最后返回client
privatestaticCuratorFrameworkgetCuratorFramework(){//下面这个就是设置重试的次数,第一个参数是对应多少秒之后重试,第二个参数是重试多少次ExponentialBackoffRetry policy =newExponentialBackoffRetry(3000,3);//retrypolicy相当于就是连接失败之后,间隔多少秒之后再进行下一次的重试// connectionTimeoutMs那两个都是超时连接时间// 注意最后是。builder().var,不是单纯的。var,要不返回的是builderCuratorFramework client =CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181, hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();//启动客户端 client.start();System.out.println("zookeeper 启动成功");return client;}
- 模拟两台机器,创建双线程,需要注意的是这个框架可以实现重入锁,可以看最后的实现结果。这里我们需要注意的是获取锁和释放锁的代码:
//获取到锁lock1.acquire();//释放锁lock1.release();
具体代码如下:
publicclassCuratorLockTest{publicstaticvoidmain(String[] args){//创建分布式锁1InterProcessMutex lock1 =newInterProcessMutex(getCuratorFramework(),"/locks");//创建分布式锁2InterProcessMutex lock2 =newInterProcessMutex(getCuratorFramework(),"/locks");//创建多线程newThread(newRunnable(){@Overridepublicvoidrun(){try{
lock1.acquire();System.out.println("线程1 获取到锁");
lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5*1000);
lock1.release();System.out.println("线程1 释放锁");
lock1.release();System.out.println("线程1 再次释放锁");}catch(Exception e){
e.printStackTrace();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{
lock2.acquire();System.out.println("线程2 获取到锁");
lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5*1000);
lock2.release();System.out.println("线程2 释放锁");
lock2.release();System.out.println("线程2 再次释放锁");}catch(Exception e){
e.printStackTrace();}}}).start();}privatestaticCuratorFrameworkgetCuratorFramework(){//下面这个就是设置重试的次数,第一个参数是对应多少秒之后重试,第二个参数是重试多少次ExponentialBackoffRetry policy =newExponentialBackoffRetry(3000,3);//retrypolicy相当于就是连接失败之后,间隔多少秒之后再进行下一次的重试// 注意最后是。builder().var,不是单纯的。var,要不返回的是builderCuratorFramework client =CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181, hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy).build();//启动客户端
client.start();System.out.println("zookeeper 启动成功");return client;}}
3、结果
六、企业面试真题
七、一致性算法基础
// zookeeper是如何保证数据一致性的?// 底层主要用到了ZAB算法,在源码讲义的前面几页有讲
试,第二个参数是重试多少次
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000,3);
//retrypolicy相当于就是连接失败之后,间隔多少秒之后再进行下一次的重试
// 注意最后是。builder().var,不是单纯的。var,要不返回的是builder
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181, hadoop104:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
//启动客户端
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}
### 3、结果
[外链图片转存中...(img-UkTzkcuH-1709966390135)]
# 六、企业面试真题
<img src="C:\Users\Whacky\AppData\Roaming\Typora\typora-user-images\image-20231201210126943.png" alt="image-20231201210126943" style="zoom: 50%;" />
# 七、一致性算法基础
```java
// zookeeper是如何保证数据一致性的?
// 底层主要用到了ZAB算法,在源码讲义的前面几页有讲
版权归原作者 Whacky-u 所有, 如有侵权,请联系我们删除。