0


ZooKeeper Java Api 操作

添加依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>最新版本</version>
</dependency>

zk 客户端与 zk 服务端的连接

    public static void main(String[] args) throws InterruptedException, IOException {
        /*
         * 构造参数说明:
         * *** 参数一:zookeeper 服务连接地址和端口,
         * *** 参数二:会话超时时间,
         * *** 参数三:watcher事件,如果不需要可以传null
         */
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 50000, (event) -> {
            log.info("监听到 watcher 通知:{}", event.toString());
        });
        log.info("开始连接 zk 服务器");
        log.info("连接状态:{}", zooKeeper.getState());

        Thread.sleep(1000);

        log.info("连接状态:{}", zooKeeper.getState());
    }

zk 会话重连

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 50000, null);

        long sessionId = zk.getSessionId();

        String ssid = "0x" + Long.toHexString(sessionId);
        System.out.println(ssid);
        byte[] sessionPassword = zk.getSessionPasswd();

        log.warn("客户端开始连接zookeeper服务器...");
        log.warn("连接状态:{}", zk.getState());
        Thread.sleep(1000);
        log.warn("连接状态:{}", zk.getState());

        Thread.sleep(200);

        // 开始会话重连
        log.warn("开始会话重连...");

        ZooKeeper zkSession = new ZooKeeper("127.0.0.1:2181",
                50000,
                null,
                sessionId,
                sessionPassword);
        log.warn("重新连接状态zkSession:{}", zkSession.getState());
        Thread.sleep(1000);
        log.warn("重新连接状态zkSession:{}", zkSession.getState());
    }

zk 同步创建节点 必须带权限

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

        // 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
        /*
         * acl 权限
         * 参数一:perms:1:r, 2:w, 3:c, 4:d, 5:a
         * 参数二:权限类型
         */
        Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
        ACL acl = new ACL(ZooDefs.Perms.ALL, id);
        List<ACL> aclList = new ArrayList<>();
        aclList.add(acl);
        
        /*
         * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
         * 参数:
         * path:创建的路径
         * data:存储的数据的byte[]
         * acl:控制权限策略
         *  ***        Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
         *  ***        Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
         * createMode:节点类型, 是一个枚举
         *     ***        PERSISTENT:持久节点
         *     ***        PERSISTENT_SEQUENTIAL:持久顺序节点
         *     ***        EPHEMERAL:临时节点
         *     ***        EPHEMERAL_SEQUENTIAL:临时顺序节点
         * 返回节点名称
         */
        // 同步创建节点
        String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
        log.info("创建节点 {} 成功", nodeName);
    }

zk 异步创建节点 必须带权限

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

        // 使用 ZooDefs.Ids.CREATOR_ALL_ACL 一直报错Invalid ACL只能自己创建一个
        Id id = ZooDefs.Ids.ANYONE_ID_UNSAFE;
        ACL acl = new ACL(ZooDefs.Perms.ALL, id);
        List<ACL> aclList = new ArrayList<>();
        aclList.add(acl);
        // 异步创建创建节点,不支持子节点递归创建
        zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT,
                new AsyncCallback.StringCallback() {
                    @Override
                    public void processResult(int rc, String path, Object ctx, String name) {
                        log.info("创建节点成功");
                        log.info("========== {}", rc);    // 0 的时候才表示操作成功
                        log.info("========== {}", path);
                        log.info("========== {}", ctx);
                        log.info("========== {}", name);
                    }
                }, "扩展数据,原样返回");

        Thread.sleep(2000L);    // 阻塞等待回调,不让程序结束,不然回调不打印
    }

zk 设置节点数据

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);

        Thread.sleep(2000L);

//        zookeeper.addAuthInfo("");    // 登录 acl 权限
        /*
         * 设置节点数据
         * 参数一:节点
         * 参数二:数据
         * 参数三:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
         */
        Stat stat = zookeeper.setData("/test", "123".getBytes(), 0);
        System.err.println(stat);
    }

zk 同步删除节点

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        /*
         * 同步删除节点,java api 没有 递归删除的操作
         * 参数一:节点
         * 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
         */
        zookeeper.delete("/test", 1);
    }

zk 异步删除节点

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        /*
         * 同步删除节点,java api 没有 递归删除的操作
         * 参数一:节点
         * 参数二:版本号,通过 命令 [ls -s 节点] 获得 dataVersion 版本号
         * 参数三:异步回调
         * 参数四:扩展数据,原样返回
         */
        zookeeper.delete("/test", 0, new AsyncCallback.VoidCallback () {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                log.info("========== {}", rc);    // 0 的时候才表示操作成功
                log.info("========== {}", path);
                log.info("========== {}", ctx);
            }
        }, "扩展数据,原样返回");

        Thread.sleep(2000L);
    }

zk 获取节点数据

    // 没有 watcher 事件的示例    
    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        Stat stat = new Stat();
        /*
         * 参数一:节点路径
         * 参数二:是否注册一个watch事件
         * 参数三:获取数据后,节点存放stat的对象
         */
        byte[] data = zookeeper.getData("/test", false, stat);
        String result = new String(data);
        log.info(result);
        Thread.sleep(2000L);
    }
    // 带 watcher 事件的示例
    private static final CountDownLatch countDown = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        Stat stat = new Stat();
        /*
         * 参数一:节点路径
         * 参数二:是否注册一个watch事件
         * 参数三:获取数据后,节点存放stat的对象
         * return:返回节点数据
         */
        byte[] data = zookeeper.getData("/test", (event) -> {
            log.info("当节点发生变化时,才会触发此事件,监听到 watcher 通知:{}", event.toString());
            countDown.countDown();
        }, stat);

        String result = new String(data);
        log.info(result);

        countDown.await();    // 不让线程结束,等 watcher 触发后,才让线程结束
        log.info("执行完成");
    }

zk 获取子节点

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        /*
         * 参数一:节点路径
         * 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
         * return:返回子节点的路径名称
         */
        List<String> childrenList = zookeeper.getChildren("/test", false);
        log.info("子节点列表:{}", childrenList);
    }

zk 判断节点是否存在

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        /*
         * 参数一:节点路径
         * 参数二:是否注册一个watch事件。watch事件前面的示例都有演示过,这里就不演示了
         * return:返回节点的 stat 信息
         */
        Stat stat = zookeeper.exists("/test", false);
        if (null == stat) {
            log.info("节点不存在");
        } else {
            log.info("节点存在 {}", stat);
        }
    }

zk ACL digest 用户权限

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        // 创建 digest 认证机制 的用户
        Id zs = new Id("digest", DigestAuthenticationProvider.generateDigest("zhangsan:123456"));
        Id ls = new Id("digest", DigestAuthenticationProvider.generateDigest("lisi:123456"));

        // 指定 acl 权限
        List<ACL> aclList = new ArrayList<>();
        aclList.add(new ACL(ZooDefs.Perms.ALL, zs));    // 用户张三,有rwcda所有权限
        aclList.add(new ACL(ZooDefs.Perms.READ, ls));   // 用户李四,有 r 权限
        aclList.add(new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.WRITE, ls));   // 用户李四,有 cw 权限

        // 创建节点
        String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
        log.info(nodeName);

        // 操作节点
        // 设置了 digest 权限的节点,如果需要操作节点,必须通过 addAuthInfo 进行登录才能操作节点
        zookeeper.addAuthInfo("digest", "zhangsan:123456".getBytes());
        byte[] data = zookeeper.getData("/test", false, new Stat());
        log.info("数据:{}", new String(data));
    }

zk ACL ip 权限

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 50000, null);
        Thread.sleep(2000L);
        log.warn("连接状态:{}", zookeeper.getState());

        // 创建 ip 认证机制
        Id localIp = new Id("ip", "127.0.0.1");        // 指定ip
        Id otherIp = new Id("ip", "1.10.10.10");    // 指定ip

        // 指定 acl 权限
        List<ACL> aclList = new ArrayList<>();
        aclList.add(new ACL(ZooDefs.Perms.ALL, localIp));    // 指定的ip 有所有权限
        aclList.add(new ACL(ZooDefs.Perms.READ, otherIp));    // 指定的ip 只有 r 权限

        // 创建节点
        String nodeName = zookeeper.create("/test", "数据".getBytes(), aclList, CreateMode.PERSISTENT);
        log.info(nodeName);

        // 操作节点
        // 设置了 ip 权限的节点,如果需要操作节点,必须有相应权限的ip电脑才能进行操作,其他ip都无权操作
        byte[] data = zookeeper.getData("/test", false, new Stat());
        log.info("数据:{}", new String(data));
    }

完。

标签: java zookeeper

本文转载自: https://blog.csdn.net/a1053765496/article/details/128028749
版权归原作者 又逢乱世 所有, 如有侵权,请联系我们删除。

“ZooKeeper Java Api 操作”的评论:

还没有评论