Zookeeper 初识
初识Zookeeper
Zookeeper是Hadoop项目下的一个子项目,是一个树形目录服务,翻译过来就是动物管理员,负责管理Hadoop(大象)、Hive(小猪)的管理员,简称zk.Zookeeper是一个
分布式的、开源的分布式应用程序的协调服务
ZooKeeper主要功能包括:
- 配置管理
- 分布式锁
- 集群管理
Zookeeper 安装
在这里我已经提前上传好了Zookeeper的安装包,和三台虚拟机,并设置好了SSH免密登录、Jdk以及Hosts,这是一个很重要的前提,主要是为了后续的配置方便
我的host设置如下
我的Zookeeper安装包
正常流程先解压Zookeeper
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
因为个人习惯,我在这里将其移动到了其他位置
mv apache-zookeeper-3.7.1-bin /opt/module/zookeeper
欧克在这里已经复制成功了。我们进入到zookeeper下的conf目录下准备修改配置文件,修改配置文件前,我们先需要修改配置文件的名字,让内部存在一个zoo.cfg,因为zookeeper在启动时会检测这个文件,然后根据这个文件进行配置,但是不建议直接修改,还是复制来的保险,模板文件方便后续恢复什么的
cp zoo_sample.cfg zoo.cfg
复制完成:
现在开始修改Zookeeper的配置文件,zookeeper 的配置文件不多我在这里直接修改
# 心跳检测时间 2s 2 * 1000tickTime=2000# 初始化时 连接到服务器端的间隔次数 总时间为 2 * 10 = 20 sinitLimit=10# ZK Leader 和follower之间通讯的次数 总时间为 5 * 2 = 10ssyncLimit=5# 这里表示zookeeper存储的位置,默认为/tmp/zookeeper下,这个目录下的数据有可能会在磁盘空间不足或服务器重启时自动被linux清理# 我提前在/opt/data 下创建了zookeeper文件夹,将数据存放在这里dataDir=/opt/data/zookeeper
# 指定日志文件存放位置dataLogDir=/opt/logs/zookeeper
# the port at which the clients will connect# 这里表示暴露给客户端进行连接的端口clientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1## Metrics Providers## https://prometheus.io Metrics Exporter#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider#metricsProvider.httpPort=7000#metricsProvider.exportJvmInfo=true
server.1=hadoop132-father:2888:3888
server.2=hadoop133:2888:3888
server.3=hadoop134:2888:3888
server.1的说明:
集群模式中, 集群中的每台机器都需要感知其它机器, 在 zoo.cfg 配置文件中, 可以按照如下格式进行配置, 每一行代表一台服务器配置:
server.id=host:port:portid 被称为 Server ID, 用来标识服务器在集群中的序号。同时每台 ZooKeeper 服务器上, 都需要在数据目录(即 dataDir 指定的目录) 下创建一个 myid 文件, 该文件只有一行内容, 即对应于每台服务器的Server ID。
ZooKeeper 集群中, 每台服务器上的 zoo.cfg 配置文件内容一致。
server.1 的 myid 文件内容就是 “1”。每个服务器的 myid 内容都不同, 且需要保证和自己的 zoo.cfg 配置文件中 “server.id=host:port:port” 的 id 值一致。
id 的范围是 1 ~ 255。
随后我们需要去创建myid 文件,
myid文件的位置需要与dataDir指定的目录相同,此外其内容应该与zookeeper中配置的一样
OK 完成,随后就是向相关配置复制过去即可,方式有很多种,我这里采用的
scp
scp -r zookeeper root@hadoop133:$PWDscp -r zookeeper root@hadoop134:$PWD
之后我直接偷懒,不想每次启动zookeeper都到这个文件夹下,我直接配置环境变量
# 编写环境变量vim /etc/profile
# 在环境变量中追加上这几行exportZOOKEEPER_HOME=/opt/module/zookeeper
exportPATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
# 添加完刷新环境变量source /etc/profile
启动Zookeeper!
zkServer.sh start
一套下来成了嘛?咱们查看一下运行状态
这是没有启动起来嘛?也不是,启动起来了,但是由于咱们是集群模式,我们把另外两个全部启动在进行查看
OK集群搭建结束,下班!
Zookeeper 命令操作
Zookeeper数据模型
Zookeeper是一个属性目录服务,其数据模型和Unix的文件系统目录树很相似,拥有一个层次化结构
这里面的每一个节点都被称为ZNode,
每个节点上都会保存自己的数据和节点信息
节点可以拥有子节点,同时也允许少量数据(1M)存储在该节点下
节点可以分为4大类:
- PERSISTENT 持久化节点
- EPHEMERAL 临时节点 -e
- PERSISTENT_SEQUENTIAL 持久化顺序节点 -s
- EPHEMERAL_SEQUENTIAL 临时顺序节点 -es
服务端命令
服务端命令比较简单 主要是对zookeeper服务进行操作[请注意我这里是配置了环境变量]
# 启动ZK服务
zkServer.sh start
# 查看Zookeeper 服务状态
zkServer.sh stauts
# 停止Zookeeper 服务
zkServer.sh stop
# 重启Zookeeper服务
zkServer.sh restart
客户端命令
连接服务端
zkCli.sh -server IP:PORT
我这里尝试连接hadoop132-father的数据
zkCli.sh -server hadoop132-father:2181
运行结果如下
现在已经连接上了zookeeper
退出
退出就比较简单可以直接输入
quit
即可,这样子可以直接退出
ok在这里已经退出成功了,当然也可以暴力进行,直接
ctrl+c
直接退出
查看
查看本质上Linux的命令行基本相似,直接进行即可
ls path
但是区别是我们不能
ls zookeeper
进行访问,必须是绝对路径,如果我们强行输入:
正确的输入方法:
这里有自动补全机制,所以大家不用担心没有办法偷懒
有时候需要查看详细信息,那么需要进行操作,推荐使用
ls2 path
直接进行,但是我在自己尝试的时候发现没有这个命令,搞了半天是被取代了
# 查看详细信息 老版本
ls2
# 其他方法 我的版本是3.7.1 算是一个比较新的版本了 已经没有ls2命令了ls -s path
创建
创建就比较简单了直接create
create path [value]
这里是可以进行赋值的,我们在设置值的时候在进行演示
创建一个节点,名字是wxk
OK 创建成功,再再根节点下创建一个wxk2
wxk2 节点也创建成功。当然我们也可以创建为某个节点创建一个子结点,例如我要在wxk下创建一个子节点 叫做wxk3
在这里也创建成功了
在上文中我们提到了临时节点和顺序节点,那么这个该如何设置?
临时节点创建:
create -e path
# 临时顺序节点
create -es path
创建临时节点:
在这里我们直接看看不出区别,有什么特性也不知道,其实
当本次会话结束后,临时节点就会被删除
,我们退出重进一下:
现在已经没有了,而我们之前c创建的节点仍然存在,这些仍然存在的节点是持久化节点.现在创建几个顺序节点
现在我们直接重启,看看hu会不会序号是否会发生变化
序号生成时啥样就是啥样不受影响
赋值 与 取值
赋值一般想到的都是set方法,再zookeeper中也是如此,而获取一般都是get,这个也是:
# 赋值 / 更新值set path value
# 获取值
get path
这里讲wxk2进行赋值,赋值为
this_is_wxk
那么如何更新? 更新无非就是在设置一次嘛,这里将值修改为
this_is_xiaoming
删除
删除delete,老方法了,大家都熟悉了
delete path
在这里直接删除wxk这个节点
啥?删除不掉,他说内部不为空,
说明delete删除的时候不能够迭代,不能够把有子节点的节点给删除掉
,那咱们就先把子节点给删除了试试
作为一个懒蛋,确实不想进去一个一个删,我只想偷懒,OK这里还有一个删除命令:
deleteall path
deleteall
可以删除带有子节点的节点,由于wxk下的删完了,咱们创建一个在进行删除:
OK下机
JavaAPI操作
下机15s后重新连接了xdm
Curator
Curator是Zookeeper的Java客户端库,其目标是简化Zookeeper客户端的使用,现在是Apache的顶级项目 地址Apache Curator
常用操作:
建立连接
添加节点
删除节点
修改节点
查询节点
Watch事件监听
前置环境
maven
<dependencies><!--测试单元--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency><!--curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.3.0</version></dependency><!--日志--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
日志文件配置
log4j.rootLogger=off,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
连接Zookeeper集群
@TestpublicvoidtestConnect(){/*
* @param connectString 连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
* @param sessionTimeoutMs 会话超时时间 单位 ms
* @param connectionTimeoutMs 连接超时时间 单位 ms
* @param retryPolicy 重试策略
*/String hosts ="hadoop132-father:2181,hadoop133:2181,hadoop134:2181";int sessionTime =10*1000;int connectTime =10*1000;RetryPolicy retryPolicy =newExponentialBackoffRetry(3000,10);CuratorFramework client =CuratorFrameworkFactory.newClient(hosts, sessionTime, connectTime, retryPolicy);// 启动客户端
client.start();//打印连接的状态finalCuratorFrameworkState state = client.getState();System.out.println(state);}
运行结果如下:
当然也可以这样子
@TestpublicvoidtestConnect(){/*
* @param connectString 连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
* @param sessionTimeoutMs 会话超时时间 单位 ms
* @param connectionTimeoutMs 连接超时时间 单位 ms
* @param retryPolicy 重试策略
*/String hosts ="hadoop132-father:2181,hadoop133:2181,hadoop134:2181";int sessionTime =10*1000;int connectTime =10*1000;RetryPolicy retryPolicy =newExponentialBackoffRetry(3000,10);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(hosts).sessionTimeoutMs(sessionTime).connectionTimeoutMs(connectTime).retryPolicy(retryPolicy).namespace("wxk")// 设置工作空间 设置之后所有的操作都是以次为根节点.build();
client.start();finalCuratorFrameworkState state = client.getState();System.out.println(state);}
二者的效果是相同的
创建节点
关于节点就有很多种情况:临时节点、持久化节点、有序、无需、有值、未设置值多种情况
创建普通的节点
在这里我将展示这个类的方法,后续只展示关键方法
publicclassZookeeperTest{privateCuratorFramework client;@BeforepublicvoidtestConnect(){/*
* @param connectString 连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
* @param sessionTimeoutMs 会话超时时间 单位 ms
* @param connectionTimeoutMs 连接超时时间 单位 ms
* @param retryPolicy 重试策略
*/String hosts ="hadoop132-father:2181,hadoop133:2181,hadoop134:2181";int sessionTime =10*1000;int connectTime =10*1000;RetryPolicy retryPolicy =newExponentialBackoffRetry(3000,10);// CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, sessionTime, connectTime, retryPolicy);
client =CuratorFrameworkFactory.builder().connectString(hosts).sessionTimeoutMs(sessionTime).connectionTimeoutMs(connectTime).retryPolicy(retryPolicy).namespace("wxk")// 设置工作空间 设置之后所有的操作都是以次为根节点.build();
client.start();}@TestpublicvoidbaseCreate()throwsException{//注意 这个forPath还有一个参数可以设置值,在这里是选择的是仅有一个参数的方法String s = client.create().forPath("/test");System.out.println(s);}@Afterpublicvoidclose(){if(client !=null){
client.close();}}}
运行结果:
在这里不仅创建好了wxk还在下面创建了test这个节点,我们查看这个节点是否有值【在Linux Client客户端上直接创建,如果不赋值,那么就是null】
get /wxk/test # 这里是在zookeeper 中进行的
这里返回的不是null而是我们我们的IP。
如果创建节点,没有指定数据,那么默认将当前客户端的ip作为数据存储
创建节点并赋予数据
@TestpublicvoidcreateNodeDate()throwsException{String s = client.create().forPath("/have_data","我有数据".getBytes(StandardCharsets.UTF_8));System.out.println(s);}
运行之后查看数据
创建临时节点,并获取它的值
@TestpublicvoidcreateNodeMode()throwsException{//创建临时节点String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/have_mode","我有数据".getBytes(StandardCharsets.UTF_8));System.out.println(s);// 但是临时节点有个问题就是会话结束就会管理,那么咱们在这里获取一波finalbyte[] bytes = client.getData().forPath("/have_mode");String c =newString(bytes);System.out.println(c);}
查看结果:
创建多级节点
我们知道,在Client中不能够创建多级节点,如果创建多级节点,就会直接报错:
但是在Java API就可以实现
@TestpublicvoidcreateNeed()throwsException{finalString s = client.create().creatingParentsIfNeeded().forPath("/qqq/ttt");System.out.println(s);}
查看运行结果:
下载成功
查询节点
查询数据 – – – Get
查询数据使用Get方法,需要注意的是,这里返回的结果是byte数组类型,需要使用
new String()
进行转换成字符串
@TestpublicvoidgetData()throwsException{finalbyte[] bytes = client.getData().forPath("/qqq/ttt");System.out.println(newString(bytes));}
运行结果如下:
查询子节点
我们在client中可以使用
ls /
来查看子节点,那么Java API 如何实现? Java API实现主要是通过
getChildern()
方法
@TestpublicvoidgetChildren()throwsException{// 在上文中我们已经传见了/qqq/ttt节点,所以这个查询的结果是ttt
client.getChildren().forPath("/qqq").forEach(System.out::println);}
那么如果直接查询
/
呢?这个结果是什么?难道是这个?
那么先测试一下:
@TestpublicvoidgetRootsCh()throwsException{
client.getChildren().forPath("/").forEach(System.out::println);}
运行结果如下:
为什么和我们在客户端查询的不一样? –如果你产生这个疑问,那么就是前面讲的时候没有仔细看,在前面我们已经指定了我们的工作空间是
/wxk
这个路径,所以我们我们在客户端上输入
/
其实就是查询
/wxk
下的路径
你看一毛一样
查询节点的状态
查询节点状态相对来说比较麻烦一点,因为数据封装的比较难顶,这里直接贴方法
@TestpublicvoidgetState()throwsException{Stat state =newStat();// 先输出一次 便于后续对比System.out.println(state);
client.getData().storingStatIn(state).forPath("/qqq");// 第二次进行输出System.out.println(state);}
运行结果:
我们查看查看
Stat
源码,发现就是对各种方法的一个封装,但是返回的太简陋了
修改节点
修改节点使用的是
SetData
方法,在这里我修改的是一个通过API创建的值,这个值在默认情况下是IP,这里我将IP修改成其他值
@TestpublicvoidchangeData()throwsException{
client.setData().forPath("/qqq/ttt","我不是IP".getBytes(StandardCharsets.UTF_8));}
根据版本进行修改
我们在进行修改操作的时候,如果并发度比较低基本上都美有问题,但是如果并发度比较高,一个数据可能有多个线程需要进行修改,该怎么办?好比Java 中原子类的保证原子的方式:判断当前版本号与需要修改时的版本号是否一致,如果一致,那么直接进行修改,如果不一致,则直接抛出异常
@TestpublicvoidchangeByVersion()throwsException{Stat stat =newStat();
client.getData().storingStatIn(stat).forPath("/qqq/ttt");System.out.println("version = "+ stat.getVersion());// 100 是我随便输入的 还有就是这个的修改不到100次
client.setData().withVersion(100).forPath("/qqq/ttt","我就是IP".getBytes(StandardCharsets.UTF_8));}
运行结果:
直接就给你说 :版本号不对
删除节点
普通的删除
@TestpublicvoiddeleteNode()throwsException{
client.delete().forPath("/node1");}
尝试是否能直接删除父节点(下面存在子节点)
@TestpublicvoiddeleteNode()throwsException{
client.delete().forPath("/qqq");}
运行结果:
节点不为空,直接删除失败
迭代删除
这个用法与上文的创建多级节点相似,
@TestpublicvoiddeleteFor()throwsException{
client.delete().deletingChildrenIfNeeded().forPath("/qqq");}
这里运行成功,在Client中进行检查
qqq
节点已经被删除了
必须成功的删除
@TestpublicvoiddeleteMustSuc()throwsException{
client.delete().guaranteed().forPath("/");}
回调删除
@TestpublicvoiddeleteCall()throwsException{
client.delete().inBackground(newBackgroundCallback(){@OverridepublicvoidprocessResult(CuratorFramework client,CuratorEvent event)throwsException{System.out.println("被删除了:-(");System.out.println(event);}}).forPath("/test");}
运行结果
相关的数据都被封装在了event中
事件监听机制 – Watch
Zookeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将时间通知到感兴趣的客户端上,该机制是Zookeeper实现分布式协调服务的重要特性。Zookeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态发生变化,会通知所有的订阅者
Zookeeper引入了三种watcher:
- NodeCache:只是监听某一特定的节点
- PathChildrenCache:监听一个ZNode的节点的子节点
- TreeCache:可以监听某个节点及其子节点,相当于是NodeCache 和PathChildrenCache的组合
但是很可惜,我采用的是高版本得curator,我的是5.0,这三个类已经被弃用了,这里直接采用
CuratorCache
来实现:
@TestpublicvoidNodeCacheTest(){CuratorCache cache =CuratorCache.build(client,"/");
cache.listenable().addListener(newCuratorCacheListener(){@Overridepublicvoidevent(Type type,ChildData oldData,ChildData data){System.out.println(type.name());switch(type.name()){case"NODE_CREATED"://新节点被创建if(data !=null){System.out.println("创建了节点:"+ data.getPath());}break;case"NODE_CHANGED":// 节点被修改if(oldData !=null){System.out.println("修改前得数据: "+newString(oldData.getData())+" , 修改后得数据为"+newString(data.getData()));}else{System.out.println("这是第一次修改数据,修改后得数据为"+newString(data.getData()));}break;case"NODE_DELETED":// 节点被删除System.out.println("节点{path='"+ oldData.getPath()+"'} 已被删除");break;default:break;}}});
cache.start();try{TimeUnit.SECONDS.sleep(100);}catch(InterruptedException e){
e.printStackTrace();}}
直接全部管用:
分布式锁
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点
- 客户端获取锁的时候,在lock节点下创建
临时顺序
节点 - 然后获取lock下面得所有子节点,客户端获取到所有得子节点,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,使用完后就将这个锁给删除
- 如果发现自己创建的节点并非是所有子节点中最小得,说明自己还没有获得到锁,此时客户端需要找到比自己小得那个节点,同时对其注册事件监听器,监听删除时间
- 如果发现比自己小的那个节点被删除,则客户端watcher会受到相应的通知,此时在判断自己创建的节点是否是lock子结点中最小得,如果是,则获取到了锁,如果不是则重复上述步骤继续获取到比自己小的节点并进行监听
实现分布式锁得API
一共有5中方案:
- InterProcessSemaphoreMutex: 分布式排他锁(非可重入锁)
- InterProcessMutex:分布式可重入排他锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
在这里进行一个简单的举例:这是一个与票相关得类
packagewxk.test;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.ExponentialBackoffRetry;importjava.io.PrintWriter;importjava.util.concurrent.TimeUnit;/**
* @author wxk
* @date 2023/05/18/15:15
*/publicclassTrick12306implementsRunnable{privateint tickets=10;privateInterProcessMutex lock;privateCuratorFramework client;publicTrick12306(){String hosts ="hadoop132-father:2181,hadoop133:2181,hadoop134:2181";int sessionTime =10*1000;int connectTime =10*1000;RetryPolicy retryPolicy =newExponentialBackoffRetry(3000,10);
client =CuratorFrameworkFactory.builder().connectString(hosts).sessionTimeoutMs(sessionTime).connectionTimeoutMs(connectTime).retryPolicy(retryPolicy).build();
client.start();
lock =newInterProcessMutex(client,"/lock");}@Overridepublicvoidrun(){while(true){//获取锁try{
lock.acquire(5,TimeUnit.SECONDS);if(tickets >0){System.out.println(Thread.currentThread()+":"+tickets);
tickets--;}}catch(Exception e){
e.printStackTrace();System.out.println(e);}finally{try{
lock.release();}catch(Exception e){
e.printStackTrace();}}}}}
下面是售票得方法:
packagewxk.test;importorg.junit.Test;/**
* @author wxk
* @date 2023/05/18/15:17
*/publicclassLockTest{publicstaticvoidmain(String[] args){Trick12306 ticket =newTrick12306();Thread t1 =newThread(ticket,"携程");Thread t2 =newThread(ticket,"飞猪");
t1.start();
t2.start();}}
运行结果如下:
虽然说有报错,但是不影响正常使用,这里的报错是因为没有拿到锁但是进行了所释放操作。我们查看我们的数据正常输出,没有发生超卖显现,如果想避免报错,可以将时间设置的稍微长一些,下面是我将时间修改成10s得输出结果
版权归原作者 不知落叶何时落 所有, 如有侵权,请联系我们删除。