0


09、Hadoop框架Zookeeper Java API

文章目录

Hadoop框架Zookeeper Java API

引入zookeeper依赖

  去Maven官网引入Zookeeper依赖。

  选择3.4.6版本,复制到IDEA的pom文件里

  1. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency>

  新建ZOOKEEPER包

  新建ZKJavaAPI

测试连接

1、新建连接

  这里需要抛出异常

  1. // 1、新建连接ZooKeeper zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);

2、创建临时节点

  这里需要抛出异常

  1. zk.create("/test1","abcdefg".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  2. ,CreateMode.PERSISTENT
  3. );

3、运行测试

  1. packagecom.liangzai.ZOOKEEPER;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.KeeperException;importorg.apache.zookeeper.ZooDefs;importorg.apache.zookeeper.ZooKeeper;importjava.io.IOException;publicclassZKJavaAPI{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,KeeperException{// 1、新建连接ZooKeeper zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);// 2、创建临时节点
  2. zk.create("/test1","abcdefg".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  3. ,CreateMode.EPHEMERAL
  4. );}}
  1. 控制台输出结果:
  2. log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
  3. log4j:WARN Please initialize the log4j system properly.
  4. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  5. Process finished with exit code 0
  • 去ZK里查看运行结果
  1. #启动ZK
  2. zkCli.sh -server node1:2181
  3. #ZK Shell
  4. ls /
  5. get /test1

运行结果:

注意我这里并没有zk.close();
因为创建的是临时节点,断开了就会被删除
这里是运行成功的。
下面我将详细介绍。

ZKJavaAPI

名词解析

  • ZooDefs.Ids :控制所创建的ZNODE的权限
  • OPEN_ACL_UNSAFE :完全开放的ACL,任何连接的客户端都可以操作该属性znode
  • CREATOR_ALL_ACL :只有创建者才有ACL权限
  • READ_ACL_UNSAFE :只能读取ACL
  • CreateMode :创建的ZNODE的类型
  • PERSISTENT :永久创建,连接断开不会删除
  • EPHEMERAL :创建临时节点,连接断开即删除

创建永久节点

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 创建永久节点publicvoidcreatePersistentZNODE()throwsInterruptedException,KeeperException{/**
  3. * ZooDefs.Ids :控制所创建的ZNODE的权限
  4. * OPEN_ACL_UNSAFE : 完全开放的ACL,任何连接的客户端都可以操作该属性znode
  5. * CREATOR_ALL_ACL : 只有创建者才有ACL权限
  6. * READ_ACL_UNSAFE:只能读取ACL
  7. *
  8. * CreateMode : 创建的ZNODE的类型
  9. * PERSISTENT : 永久创建,连接断开不会删除
  10. * EPHEMERAL : 创建临时节点,连接断开即删除
  11. */
  12. zk.create("/test3","def".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  13. ,CreateMode.PERSISTENT
  14. );}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  15. zk.close();}
  1. ls /
  2. get /test3

运行结果:

可见@After注解关闭了ZK连接
test3被创建了
PERSISTENT :永久创建,连接断开不会删除

创建临时节点

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 创建临时节点publicvoidcreateEPHEMERALZNODE()throwsInterruptedException,KeeperException{
  3. zk.create("/test2","abc".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  4. ,CreateMode.EPHEMERAL
  5. );}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  6. zk.close();}
  1. ls /

运行结果:

可见后面@After注解里将ZK连接关闭了
EPHEMERAL : 创建临时节点,连接断开即删除
所以test2并没有被创建

获取节点数据

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 获取节点数据publicvoidgetZNODE()throwsInterruptedException,KeeperException{byte[] data = zk.getData("/test1",null,newStat());// 字节数组转String,这里toString没有意义System.out.println(newString(data));}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  3. zk.close();}
  1. get /test1

运行结果:

这里的watcher就是我们NameNode的故障转移机制

修改数据

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 修改数据publicvoidsetZNODE()throwsInterruptedException,KeeperException{
  3. zk.setData("/test1","liangzai".getBytes(),1);}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  4. zk.close();}
  1. get /test1

运行结果:

刚刚我们test1里面的数据是bacdef
运行后改成了liangzai

删除节点

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 删除节点publicvoiddeleteZNODE()throwsInterruptedException,KeeperException{
  3. zk.delete("/test3",0);}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  4. zk.close();}
  1. ls /

运行结果:

这里通过ls / 查看后
test3被删除了

事件

  1. ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 事件publicvoidtriggerWatcher()throwsInterruptedException,KeeperException{
  3. zk.exists("/test",newWatcher(){@Overridepublicvoidprocess(WatchedEvent event){System.out.println("/test 发生了变化");System.out.println(event.getPath());System.out.println(event.getState());System.out.println(event.getType());System.out.println(event.getWrapper());}});@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  4. zk.close();}
  1. set /test node1

运行结果:

  1. /test 发生了变化
  2. /test
  3. SyncConnected
  4. NodeDataChanged
  5. 3,3,'/test

事件方便我们去监控NameNode的故障转移机制

完整代码

  1. packagecom.liangzai.ZOOKEEPER;importorg.apache.zookeeper.*;importorg.apache.zookeeper.data.Stat;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;importjava.io.IOException;publicclassZKJavaAPI{ZooKeeper zk;@Before// 创建连接publicvoidinit()throwsIOException{
  2. zk =newZooKeeper("master:2181,node1:2181,node2:2181",100000,null);}@Test// 创建临时节点publicvoidcreateEPHEMERALZNODE()throwsInterruptedException,KeeperException{
  3. zk.create("/test2","abc".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  4. ,CreateMode.EPHEMERAL
  5. );}@Test// 创建永久节点publicvoidcreatePersistentZNODE()throwsInterruptedException,KeeperException{/**
  6. * ZooDefs.Ids :控制所创建的ZNODE的权限
  7. * OPEN_ACL_UNSAFE : 完全开放的ACL,任何连接的客户端都可以操作该属性znode
  8. * CREATOR_ALL_ACL : 只有创建者才有ACL权限
  9. * READ_ACL_UNSAFE:只能读取ACL
  10. *
  11. * CreateMode : 创建的ZNODE的类型
  12. * PERSISTENT : 永久创建,连接断开不会删除
  13. * EPHEMERAL : 创建临时节点,连接断开即删除
  14. */
  15. zk.create("/test3","def".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE
  16. ,CreateMode.PERSISTENT
  17. );}@Test// 获取节点数据publicvoidgetZNODE()throwsInterruptedException,KeeperException{byte[] data = zk.getData("/test1",null,newStat());// 字节数组转String,这里toString没有意义System.out.println(newString(data));}@Test// 修改数据publicvoidsetZNODE()throwsInterruptedException,KeeperException{
  18. zk.setData("/test1","liangzai".getBytes(),1);}@Test// 删除节点publicvoiddeleteZNODE()throwsInterruptedException,KeeperException{
  19. zk.delete("/test3",0);}@Test// 事件publicvoidtriggerWatcher()throwsInterruptedException,KeeperException{
  20. zk.exists("/test",newWatcher(){@Overridepublicvoidprocess(WatchedEvent event){System.out.println("/test 发生了变化");System.out.println(event.getPath());System.out.println(event.getState());System.out.println(event.getType());System.out.println(event.getWrapper());}});// 加入死循环,不会被退出 方便在控制台输出while(true){}}@After// 关闭连接publicvoidclosed()throwsInterruptedException{
  21. zk.close();}}

到底啦!觉得靓仔的文章对你学习Hadoop有所帮助的话,一波三连吧!q(≧▽≦q)

标签: zookeeper java hadoop

本文转载自: https://blog.csdn.net/hujieliang123/article/details/122905638
版权归原作者 liangzai2048 所有, 如有侵权,请联系我们删除。

“09、Hadoop框架Zookeeper Java API”的评论:

还没有评论