一、概述
1、为什么使用zk
随着应用规模的迅速扩张,单台机器的部署已经难以支撑用户大规模、高并发的请求了, 因此服务化、集群化、分布式概念应运而生。 针对这种场景,人们通常使用的做法就是将软件按照模块进行拆分,形成独立的子系统,然后在局域网内部署到多台机器上面, 形成了一个集群。 这种方式即可以分滩请求压力,又可以起到灾备的效果。
然而, 集群的维护和多节点应用程序的协作运行远比单机模式复杂,需要顾及到的细节问题实在太多,比如说同一分配置在多台机器上的同步, 客户端程序实时感知服务机状态,应用与应用之间的公共资源的互斥访问等等一系列的问题。 如果这些问题都依靠开发人员或维护人员去解决的话, 非旦消耗人力,而且也达不到实时准确的效果。
所幸的是,zookeeper能够给我们非常完美的解决这些问题,zookeeper天生的就是为解决分布式协调服务这个问题而来, 应用zookeeper,能够非常好的解决如下问题:
- 配置信息同步
- 分布式锁控制
- 消息的发布与订阅(典型的生产者消费者模型)
- 集群内节点状态的快速感知
2、概念
大数据生态系统里的很多组件的命名都是某种动物或者昆虫, 比如hadoop就是大象,hive是蜜蜂。zookeeper即动物园管理者, 顾名思义就是管理大数据生态系统各组件的管理员
Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
3、节点模型
每个目录称为一个znode,具有唯一的路径标识
每个znode可以存储1MB数据
每个znode是有版本的,每个znode存储的数据可以设置版本,也就是说一个访问路径可以有多份数据
znode可以被监控包括这个目录节点数据修改以及子节点变化,一旦变化可以通知设置监控的客户端
4、特点
5、应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
统一配置管理
统一集群管理
服务动态上下线
负载均衡管理
二、安装
1、window安装
官网下载:Apache ZooKeeper
找到页面中的in the achive链接打开
打开选择要下载的版本,这里选择3.7.0,注意从3.5.7开始要下载带bin的tar.gz
即apache-zookeeper-3.7.0-bin.tar.gz
下载下来解压,解压工具解压不了使用window自带的powershell解压即可
进入安装目录下的bin直接执行zkServer.cmd命令启动发现有错误,原因是默认配置是liunx配置,所以需要改配置
打开conf目录,将zoo_sample.cfg文件复制一份,文件名zoo.cfg
并且对zoo.cfg文件中的dataDir修改 dataDir=../data
然后在bin同级目录下创建data文件夹
进入bin目录,cmd中执行zkServer.cmd命令启动zk,然后重新打开一个cmd执行zkCli.cmd命令
查看根节点下的目录ls /
2、linux安装
下载:https://archive.apache.org/dist/zookeeper/
解压到/opt/module
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
修改conf
#(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg;
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
#(2)打开zoo.cfg文件,修改dataDir路径:
[atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg
修改如下内容:
dataDir=/opt/module/zookeeper-3.4.10/zkData
#(3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹
[atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData # 默认不会给你创建
启动
#(1)启动Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ /opt/module/zookeeper-3.4.10/bin/zkServer.sh start
#(2)查看进程是否启动
[atguigu@hadoop102 zookeeper-3.4.10]$ jps
4020 Jps
4001 QuorumPeerMain # 这个是
#(3)查看状态:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: standalone
#(4)启动客户端:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh
#(5)退出客户端:
[zk: localhost:2181(CONNECTED) 0] quit
#(6)停止Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop
3、集群安装
1.集群规划
在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。
2.解压安装
#(1)解压Zookeeper安装包到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
#(2)同步/opt/module/zookeeper-3.4.10目录内容到hadoop103、hadoop104
[atguigu@hadoop102 module]$ xsync zookeeper-3.4.10/
3.配置服务器编号
#(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData
[atguigu@hadoop102 zookeeper-3.4.10]$ mkdir -p zkData
#(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件
[atguigu@hadoop102 zkData]$ touch myid
#添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码
#(3)编辑myid文件
[atguigu@hadoop102 zkData]$ vi myid
#在文件中添加与server对应的编号:
2
#(4)拷贝配置好的zookeeper到其他机器上
[atguigu@hadoop102 zkData]$ xsync myid
并分别在hadoop102、hadoop103上修改myid文件中内容为3、4
4.配置zoo.cfg文件
#(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
[atguigu@hadoop102 conf]$ vim zoo.cfg
#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.4.10/zkData
# client端口为2181,即如kafka用2181与zk交互
clientPort=2181
#增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
#server.第几号服务器=服务器的ip:服务器与集群leader交互端口2888:选举leader的端口3888
# 这个第几号服务器是上面myid的内容
#(3)同步zoo.cfg配置文件
[atguigu@hadoop102 conf]$ xsync zoo.cfg
#(4)配置参数解读
集群操作
#(1)分别启动Zookeeper
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
#(2)查看状态
[atguigu@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[atguigu@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
[atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
# 也可以通过以下方式加入到集群
./zkCli.sh ‐server 192.168.60.130:2181
./zkCli.sh ‐server 192.168.60.130:2182
./zkCli.sh ‐server 192.168.60.130:2183
注:配置参数
Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
1.tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)
2.initLimit =10:LF初始通信时限
集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
3.syncLimit =5:LF同步通信时限
集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4.dataDir:数据文件目录+数据持久化路径
主要用于保存Zookeeper中的数据。
5.clientPort =2181:客户端连接端口,即如kafka用2181与zk交互
监听客户端连接的端口。
6 、server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
4、zk客户端可视化工具
4.1、ZooInspector
下载地址
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar
4.2、taokeeper
基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,
安装前要求服务前先配置nc 和 sshd
1.下载数据库脚本
wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql
三、命令行操作
1、节点类型
zookeeper中的节点有两种, 分别为临时节点和永久节点。 节点的类型在创建时即被确定, 并且不能改变。
PERSISTENT–持久化目录节点:客户端与zookeeper断开连接后,该节点依旧存在;
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:客户端与zookeeper断开连接后,该节点依旧存在,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;
EPHEMERAL-临时目录节点:客户端与zookeeper断开连接后,该节点被删除。。 虽然每个临时的Znode都会绑定到一个客户端会话, 但他们对所有的客户端还是可见的。另外, ZooKeeper的临时节点不允许拥有子节点。;
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:客户端与zookeeper断开连接后(一旦会话(Session)结束),该节点被删除,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;
2、zookeeper常用shell命令
2.1、命令概述
2.2、查看当前znode信息
#查看当前znode信息
[zk: localhost:2181(CONNECTED) 3] ls /
[dubbo, sanguo0000000003, services, zookeeper]
2.3、node详情
ls -s /命令或者stat / 命令都行
[zk: localhost:2181(CONNECTED) 2] ls -s /
[dubbo, sanguo0000000003, services, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x66
cversion = 4
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 4
1)czxid-创建节点的事务zxid
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
2)ctime - znode被创建的毫秒数(从1970年开始)
3)mzxid - znode最后更新的事务zxid
4)mtime - znode最后修改的毫秒数(从1970年开始)
5)pZxid-znode最后更新的子节点zxid
6)cversion - znode子节点变化号,znode子节点修改次数
7)dataversion - znode数据变化号
8)aclVersion - znode访问控制列表的变化号
9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
10)dataLength- znode的数据长度
11)numChildren - znode子节点数量
2.4、创建无序永久节点
[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
2.5、创建有序永久节点
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000000
[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao"
Created /sanguo/weiguo/zhangliao0000000001
[zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu"
Created /sanguo/weiguo/xuchu0000000002
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
2.6、创建 无序号临时节点
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"
当前客户端可以查看值 ,退出客户端重新进入值删除了
2.7、创建有序号临时节点
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"
2.8、修改节点值
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
2.9、删除节点
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
2.10、递归删除节点
[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
2.11、节点监听
查看监听原理节
四、javaApi操作
1、原生api
创建maven引入依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
1.1、创建zk客户端
public class TestZookeeper {
private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private int sessionTimeout = 2000;
private ZooKeeper zooKeeper;
@Test
public void init() throws IOException {
zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
1.2、创建子节点
public class TestZookeeper {
// private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private String connectionString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
//创建节点
@Test
public void create() throws InterruptedException, KeeperException {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
String path = zkClient.create("/atguigu", "chenchen".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
}
1.3、获取子节点并监听节点变化
public class TestZookeeper {
// private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private String connectionString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
List<String> children = null;
try {
System.out.println("********************************");
children = zkClient.getChildren("/", true);
for (int i = 0; i < children.size(); i++) {
System.out.println(children.get(i));
}
System.out.println("********************************");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//获取子节点并监控节点的变化
@Test
public void getDataAndWatch() throws InterruptedException, KeeperException, IOException {
List<String> children = zkClient.getChildren("/", true);
for (int i = 0; i < children.size(); i++) {
System.out.println(children.get(i));
}
//让程序不要结束
System.in.read();
}
}
1.4、判断znode是否存在
public class TestZookeeper {
// private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private String connectionString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
List<String> children = null;
try {
System.out.println("********************************");
children = zkClient.getChildren("/", true);
for (int i = 0; i < children.size(); i++) {
System.out.println(children.get(i));
}
System.out.println("********************************");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//判断节点是否存在
@Test
public void exist() throws InterruptedException, KeeperException {
Stat exists = zkClient.exists("/dubbo", true);
if (exists == null) {
System.out.println("没有该节点");
}else{
System.out.println("该节点存在");
}
}
}
1.5、删除znode
public class TestZookeeper {
// private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private String connectionString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {//监听节点路径的变化
List<String> children = null;
try {
System.out.println("********************************");
children = zkClient.getChildren("/", true);
for (int i = 0; i < children.size(); i++) {
System.out.println(children.get(i));
}
System.out.println("********************************");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//删除节点
@Test
public void delete() throws InterruptedException, KeeperException {
zkClient.delete("/atguigu2",-1);
}
}
2、zookeeper 开源客户端curator
Curator 是 Apache ZooKeeper 的Java客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。
Apache Curator –
2.1、创建连接
创建maven项目引入依赖和日志文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>curator-zk</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
log4j.properties
log4j.rootLogger=off,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
连接zk
@Before
public void testConnect() {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//2.第二种方式
//CuratorFrameworkFactory.builder();
client = CuratorFrameworkFactory.builder()
.connectString("192.168.200.130:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("test")
.build();
//开启连接
client.start();
}
2.2、节点创建
/**
* 创建节点:create 持久 临时 顺序 数据
* 1. 基本创建 :create().forPath("")
* 2. 创建节点 带有数据:create().forPath("",data)
* 3. 设置节点的类型:create().withMode().forPath("",data)
* 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
public void testCreate() throws Exception {
//2. 创建节点 带有数据
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app2", "hehe".getBytes());
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
//1. 基本创建
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
//3. 设置节点的类型
//默认类型:持久化
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
//4. 创建多级节点 /app1/p1
//creatingParentsIfNeeded():如果父节点不存在,则创建父节点
String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}
2.3、节点查看
/**
* 查询节点:
* 1. 查询数据:get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
*/
@Test
public void testGet1() throws Exception {
//1. 查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception {
// 2. 查询子节点: ls
List<String> path = client.getChildren().forPath("/");
System.out.println(path);
}
@Test
public void testGet3() throws Exception {
Stat status = new Stat();
System.out.println(status);
//3. 查询节点状态信息:ls -s
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status);
}
2.4、节点删除
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 1. 删除单个节点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//2. 删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
//3. 必须成功的删除
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4. 回调
client.delete().guaranteed().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了~");
System.out.println(event);
}
}).forPath("/app1");
}
2.5、节点修改
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1", "itcast".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
Stat status = new Stat();
//3. 查询节点状态信息:ls -s
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();//查询出来的 3
System.out.println(version);
client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
}
2.6、节点监听
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便
需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
NodeCache : 只是监听某一个特定的节点
PathChildrenCache : 监控一个ZNode的子节点.
TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
/**
* 演示 NodeCache:给指定一个节点注册监听器
*/
@Test
public void testNodeCache() throws Exception {
//1. 创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);
while (true){
}
}
/**演示PathChildrenCache
*/
@Test
public void testPathChildrenCache() throws Exception {
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了~");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("数据变了!!!");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3. 开启
pathChildrenCache.start();
while (true){
}
}
/**
* 演示 TreeCache:监听某个节点自己和所有子节点们
*/
@Test
public void testTreeCache() throws Exception {
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
//3. 开启
treeCache.start();
while (true){
}
}
2.7、分布式锁
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
客户端获取锁时,在lock节点下创建临时顺序节点。
然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
分布式锁-模拟12306售票案例
Curator实现分布式锁API
在Curator中有五种锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
创建线程进行加锁设置
public class Ticket12306 implements Runnable{
private int tickets = 10;//数据库的票数
private InterProcessMutex lock ;
@Override
public void run() {
while(true){
//获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
创建连接,并且初始化锁
public Ticket12306(){
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//2.第二种方式
//CuratorFrameworkFactory.builder();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//开启连接
client.start();
lock = new InterProcessMutex(client,"/lock");
}
运行多个线程进行测试
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
}
五、内部原理(面试题)
1、选举机制
1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
2)Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。
3)以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,
(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。
(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。
(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。
(5)服务器5启动,同4一样当小弟。
2、ZooKeeper的监听原理是什么?
2.1、节点值变化监听
在 hadoop104 主机上注册监听/sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /sanguo
可能因为版本问题上面的命令会报错,可改用:
[zk: localhost:2181(CONNECTED) 26] get /sanguo [watch]
在 hadoop103 主机上修改/sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
观察 hadoop104 主机收到数据变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged
path:/sanguo
注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
2.2、节点的子节点变化监听(路径变化)
在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo
[shuguo, weiguo]
在 hadoop103 主机/sanguo 节点上创建子节点
[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi"
Created /sanguo/jin
观察 hadoop104 主机收到子节点变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged
path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
- 监控数据变化用 set
- 监控子节点变化用 ls
3、写数据流程
4、ZooKeeper的部署方式有哪几种?
集群中的角色有哪些?集群最少需要几台机器?
(1)部署方式单机模式、集群模式
(2)角色:Leader和Follower
(3)集群最少需要机器数:3
六、实战案例
1、服务器动态上下线
需求
现在集群创建server节点
create /servers "servers"
服务器端向Zookeeper注册代码
public class DistributeServer {
private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer server = new DistributeServer();
//1 连接zookeeper集群
server.getConnect();
//2 注册节点
server.regist("test2");
//3 业务逻辑处理
server.business();
}
//业务逻辑处理
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
//注册节点
private void regist(String hostName) throws InterruptedException, KeeperException {
String path = zkClient.create
("/servers/server", hostName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(path);
}
//连接zookeeper集群
private void getConnect() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
List<String> children = null;
try {
System.out.println("********************************");
children = zkClient.getChildren("/", true);
for (int i = 0; i < children.size(); i++) {
System.out.println(children.get(i));
}
System.out.println("********************************");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
客户端代码
public class DistributeClient {
private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient server = new DistributeClient();
//1 连接zookeeper集群
server.getConnect();
//2 注册节点
server.getChildren();
//3 业务逻辑处理
server.business();
}
//注册监听
private void getChildren() throws InterruptedException, KeeperException {
List<String> children = zkClient.getChildren("/servers", true);
//储存服务器节点主机名称集合
ArrayList<String> hosts = new ArrayList<>();
for (String child : children) {
byte[] data = zkClient.getData("/servers/" + child, false, null);
hosts.add(String.valueOf(data));
}
//将所有在线主机的名称打印出来
System.out.println(hosts);
}
//业务逻辑处理
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
//连接zookeeper集群
private void getConnect() throws IOException {
zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getChildren();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
2、分布式锁案例
2.1、原生zk实现
锁实现
package com.herobin.flink.debug_local.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
// 超时时间
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "locks";
private String subNode = "seq-";
// 当前 client 等待的子节点
private String waitPath;
//ZooKeeper 连接
private CountDownLatch connectLatch = new CountDownLatch(1);
//ZooKeeper 节点等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前 client 创建的子节点
private String currentNode;
// 和 zk 服务建立连接,并创建根节点
public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() ==
Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() ==
Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
{
waitLatch.countDown();
} }
});
// 等待连接建立
connectLatch.await();
//获取根节点状态
Stat stat = zk.exists("/" + rootNode, false);
//如果根节点不存在,则创建根节点,根节点类型为永久节点
if (stat == null) {
System.out.println("根节点不存在");
zk.create("/" + rootNode, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁方法
public void zkLock() {
try {
//在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zk.create("/" + rootNode + "/" + subNode,
null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List<String> childrenNodes = zk.getChildren("/" +
rootNode, false);
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
if (childrenNodes.size() == 1) {
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(childrenNodes);
//当前节点名称
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//获取当前节点的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
zk.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁方法
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
锁测试(创建两个线程)
package com.herobin.flink.debug_local.zk;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
// 创建分布式锁 1
final DistributedLock lock1 = new DistributedLock();
// 创建分布式锁 2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
} }
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
} }
}).start();
}
}
观察运行
线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁
2.2、Curator 框架实现分布式锁案例
查看API Curator章节
版权归原作者 程序三两行 所有, 如有侵权,请联系我们删除。