添加依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>最新版本</version>
</dependency>
zk 客户端与 zk 服务端的连接
public static void main(String[] args) throws InterruptedException, IOException {
/*
* 构造参数说明:
* *** 参数一:zookeeper 服务连接地址和端口,
* *** 参数二:会话超时时间,
* *** 参数三:watcher事件,如果不需要可以传null
*/
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 50000, (event) -> {
log.info("监听到 watcher 通知:{}", event.toString());
});
log.info("开始连接 zk 服务器");
log.info("连接状态:{}", zooKeeper.getState());
Thread.sleep(1000);
log.info("连接状态:{}", zooKeeper.getState());
}
zk 会话重连
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 50000, null);
long sessionId = zk.getSessionId();
String ssid = "0x" + Long.toHexString(sessionId);
System.out.println(ssid);
byte[] sessionPassword = zk.getSessionPasswd();
log.warn("客户端开始连接zookeeper服务器...");
log.warn("连接状态:{}", zk.getState());
Thread.sleep(1000);
log.warn("连接状态:{}", zk.getState());
Thread.sleep(200);
// 开始会话重连
log.warn("开始会话重连...");
ZooKeeper zkSession = new ZooKeeper("127.0.0.1:2181",
50000,
null,
sessionId,
sessionPassword);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
Thread.sleep(1000);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
}
zk 同步创建节点 必须带权限
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
// 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
/*
* acl 权限
* 参数一:perms:1:r, 2:w, 3:c, 4:d, 5:a
* 参数二:权限类型
*/
Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
ACL acl = new ACL(ZooDefs.Perms.ALL, id);
List<ACL> aclList = new ArrayList<>();
aclList.add(acl);
/*
* 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path:创建的路径
* data:存储的数据的byte[]
* acl:控制权限策略
* *** Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* *** Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:节点类型, 是一个枚举
* *** PERSISTENT:持久节点
* *** PERSISTENT_SEQUENTIAL:持久顺序节点
* *** EPHEMERAL:临时节点
* *** EPHEMERAL_SEQUENTIAL:临时顺序节点
* 返回节点名称
*/
// 同步创建节点
String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
log.info("创建节点 {} 成功", nodeName);
}
zk 异步创建节点 必须带权限
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
// 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
ACL acl = new ACL(ZooDefs.Perms.ALL, id);
List<ACL> aclList = new ArrayList<>();
aclList.add(acl);
// 异步创建创建节点,不支持子节点递归创建
zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("创建节点成功");
log.info("========== {}", rc); // 0 的时候才表示操作成功
log.info("========== {}", path);
log.info("========== {}", ctx);
log.info("========== {}", name);
}
}, "扩展数据,原样返回");
Thread.sleep(2000L); // 阻塞等待回调,不让程序结束,不然回调不打印
}
zk 设置节点数据
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
// zookeeper.addAuthInfo(""); // 登录 acl 权限
/*
* 设置节点数据
* 参数一:节点
* 参数二:数据
* 参数三:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
*/
Stat stat = zookeeper.setData("/test", "123".getBytes(), 0);
System.err.println(stat);
}
zk 同步删除节点
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
/*
* 同步删除节点,java api 没有 递归删除的操作
* 参数一:节点
* 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
*/
zookeeper.delete("/test", 1);
}
zk 异步删除节点
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
/*
* 同步删除节点,java api 没有 递归删除的操作
* 参数一:节点
* 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
* 参数三:异步回调
* 参数四:扩展数据,原样返回
*/
zookeeper.delete("/test", 0, new AsyncCallback.VoidCallback () {
@Override
public void processResult(int rc, String path, Object ctx) {
log.info("========== {}", rc); // 0 的时候才表示操作成功
log.info("========== {}", path);
log.info("========== {}", ctx);
}
}, "扩展数据,原样返回");
Thread.sleep(2000L);
}
zk 获取节点数据
// 没有 watcher 事件的示例
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
Stat stat = new Stat();
/*
* 参数一:节点路径
* 参数二:是否注册一个watch事件
* 参数三:获取数据后,节点存放stat的对象
*/
byte[] data = zookeeper.getData("/test", false, stat);
String result = new String(data);
log.info(result);
Thread.sleep(2000L);
}
// 带 watcher 事件的示例
private static final CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
Stat stat = new Stat();
/*
* 参数一:节点路径
* 参数二:是否注册一个watch事件
* 参数三:获取数据后,节点存放stat的对象
* return:返回节点数据
*/
byte[] data = zookeeper.getData("/test", (event) -> {
log.info("当节点发生变化时,才会触发此事件,监听到 watcher 通知:{}", event.toString());
countDown.countDown();
}, stat);
String result = new String(data);
log.info(result);
countDown.await(); // 不让线程结束,等 watcher 触发后,才让线程结束
log.info("执行完成");
}
zk 获取子节点
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
/*
* 参数一:节点路径
* 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
* return:返回子节点的路径名称
*/
List<String> childrenList = zookeeper.getChildren("/test", false);
log.info("子节点列表:{}", childrenList);
}
zk 判断节点是否存在
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
/*
* 参数一:节点路径
* 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
* return:返回节点的 stat 信息
*/
Stat stat = zookeeper.exists("/test", false);
if (null == stat) {
log.info("节点不存在");
} else {
log.info("节点存在 {}", stat);
}
}
zk ACL digest 用户权限
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
// 创建 digest 认证机制 的用户
Id zs = new Id("digest", DigestAuthenticationProvider.generateDigest("zhangsan:123456"));
Id ls = new Id("digest", DigestAuthenticationProvider.generateDigest("lisi:123456"));
// 指定 acl 权限
List<ACL> aclList = new ArrayList<>();
aclList.add(new ACL(ZooDefs.Perms.ALL, zs)); // 用户张三,有rwcda所有权限
aclList.add(new ACL(ZooDefs.Perms.READ, ls)); // 用户李四,有 r 权限
aclList.add(new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.WRITE, ls)); // 用户李四,有 cw 权限
// 创建节点
String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
log.info(nodeName);
// 操作节点
// 设置了 digest 权限的节点,如果需要操作节点,必须通过 addAuthInfo 进行登录才能操作节点
zookeeper.addAuthInfo("digest", "zhangsan:123456".getBytes());
byte[] data = zookeeper.getData("/test", false, new Stat());
log.info("数据:{}", new String(data));
}
zk ACL ip 权限
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
Thread.sleep(2000L);
log.warn("连接状态:{}", zookeeper.getState());
// 创建 ip 认证机制
Id localIp = new Id("ip", "127.0.0.1"); // 指定ip
Id otherIp = new Id("ip", "1.10.10.10"); // 指定ip
// 指定 acl 权限
List<ACL> aclList = new ArrayList<>();
aclList.add(new ACL(ZooDefs.Perms.ALL, localIp)); // 指定的ip 有所有权限
aclList.add(new ACL(ZooDefs.Perms.READ, otherIp)); // 指定的ip 只有 r 权限
// 创建节点
String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
log.info(nodeName);
// 操作节点
// 设置了 ip 权限的节点,如果需要操作节点,必须有相应权限的ip电脑才能进行操作,其他ip都无权操作
byte[] data = zookeeper.getData("/test", false, new Stat());
log.info("数据:{}", new String(data));
}
完。
本文转载自: https://blog.csdn.net/a1053765496/article/details/128028749
版权归原作者 又逢乱世 所有, 如有侵权,请联系我们删除。
版权归原作者 又逢乱世 所有, 如有侵权,请联系我们删除。