0


zookeeper

简介

概念:zookeeper是一个分布式应用程序的协调服务。

作用:配置管理、分布式锁、集群管理

Zookeeper命令操作

zookeeper数据模型介绍:

zookeeper是一个树形数据结构。每一个节点被称为 ZNode,每个节点会保存自己的数据和节点信息,并允许少量的数据存储到节点下。

节点分为四类:

  • persistent 持久化节点
  • ephemeral 临时节点:-e
  • persistent_sequential 持久化顺序节点:-s
  • ephemeral_sequentia 临时顺序节点: -es

Zookeeper服务端命令

  • 启动 ZooKeeper 服务: ./zkServer.sh start
  • 查看 ZooKeeper 服务状态: ./zkServer.sh status
  • 停止 ZooKeeper 服务: ./zkServer.sh stop
  • 重启 ZooKeeper 服务: ./zkServer.sh restart

Zookeeper客户端命令

  • 连接服务端命令 ./zkCli.sh -server ip:port
  • 断开连接命令 quit
  • 查看命令帮助 help
  • 显示指定目录下节点 ls 目录
  • 创建节点 create /节点path value
  • 删除节点 delete /节点path
  • 删除带有子节点的节点 deleteall /节点path
  • 设置节点值 set /节点path value
  • 创建临时节点 create -e /节点path
  • 创建顺序节点 create -s /节点path
  • 创建临时顺序节点 create -es /节点path
  • 查看节点详细信息 ls -s /节点path

Curator

curator是 Apache XooKeeper的java客户端库。

curator api操作

  1. public class CuratorTest {
  2. private CuratorFramework client;
  3. /**
  4. * 建立连接
  5. */
  6. @Before//注解是一个 JUnit 注解,用于指示应在测试类中的每个测试方法之前执行带注解的方法。换句话说,该方法将在类中的每个测试方法之前运行。@Before
  7. public void testConnect() {
  8. /*
  9. *
  10. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
  11. * @param sessionTimeoutMs 会话超时时间 单位ms
  12. * @param connectionTimeoutMs 连接超时时间 单位ms
  13. * @param retryPolicy 重试策略
  14. */
  15. /* //重试策略
  16. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
  17. //1.第一种方式
  18. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
  19. 60 * 1000, 15 * 1000, retryPolicy);*/
  20. //重试策略
  21. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  22. //2.第二种方式
  23. //CuratorFrameworkFactory.builder();
  24. client = CuratorFrameworkFactory.builder()
  25. .connectString("192.168.149.135:2181")
  26. .sessionTimeoutMs(60 * 1000)
  27. .connectionTimeoutMs(15 * 1000)
  28. .retryPolicy(retryPolicy)
  29. .namespace("itheima")
  30. .build();
  31. //开启连接
  32. client.start();
  33. }
  34. //==============================create=============================================================================
  35. /**
  36. * 创建节点:create 持久 临时 顺序 数据
  37. * 1. 基本创建 :create().forPath("")
  38. * 2. 创建节点 带有数据:create().forPath("",data)
  39. * 3. 设置节点的类型:create().withMode().forPath("",data)
  40. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
  41. */
  42. @Test
  43. public void testCreate() throws Exception {
  44. //2. 创建节点 带有数据
  45. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  46. String path = client.create().forPath("/app2", "hehe".getBytes());
  47. System.out.println(path);
  48. }
  49. @Test
  50. public void testCreate2() throws Exception {
  51. //1. 基本创建
  52. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  53. String path = client.create().forPath("/app1");
  54. System.out.println(path);
  55. }
  56. @Test
  57. public void testCreate3() throws Exception {
  58. //3. 设置节点的类型
  59. //默认类型:持久化
  60. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  61. System.out.println(path);
  62. }
  63. @Test
  64. public void testCreate4() throws Exception {
  65. //4. 创建多级节点 /app1/p1
  66. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  67. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  68. System.out.println(path);
  69. }
  70. //===========================get================================================================================
  71. /**
  72. * 查询节点:
  73. * 1. 查询数据:get: getData().forPath()
  74. * 2. 查询子节点: ls: getChildren().forPath()
  75. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
  76. */
  77. @Test
  78. public void testGet1() throws Exception {
  79. //1. 查询数据:get
  80. byte[] data = client.getData().forPath("/app1");
  81. System.out.println(new String(data));
  82. }
  83. @Test
  84. public void testGet2() throws Exception {
  85. // 2. 查询子节点: ls
  86. List<String> path = client.getChildren().forPath("/");
  87. System.out.println(path);
  88. }
  89. @Test
  90. public void testGet3() throws Exception {
  91. Stat status = new Stat();
  92. System.out.println(status);
  93. //3. 查询节点状态信息:ls -s
  94. client.getData().storingStatIn(status).forPath("/app1");
  95. System.out.println(status);
  96. }
  97. //===========================set================================================================================
  98. /**
  99. * 修改数据
  100. * 1. 基本修改数据:setData().forPath()
  101. * 2. 根据版本修改: setData().withVersion().forPath()
  102. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
  103. *
  104. * @throws Exception
  105. */
  106. @Test
  107. public void testSet() throws Exception {
  108. client.setData().forPath("/app1", "itcast".getBytes());
  109. }
  110. @Test
  111. public void testSetForVersion() throws Exception {
  112. Stat status = new Stat();
  113. //3. 查询节点状态信息:ls -s
  114. client.getData().storingStatIn(status).forPath("/app1");
  115. int version = status.getVersion();//查询出来的 3
  116. System.out.println(version);
  117. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
  118. }
  119. //===========================delete================================================================================
  120. /**
  121. * 删除节点: delete deleteall
  122. * 1. 删除单个节点:delete().forPath("/app1");
  123. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
  124. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
  125. * 4. 回调:inBackground
  126. * @throws Exception
  127. */
  128. @Test
  129. public void testDelete() throws Exception {
  130. // 1. 删除单个节点
  131. client.delete().forPath("/app1");
  132. }
  133. @Test
  134. public void testDelete2() throws Exception {
  135. //2. 删除带有子节点的节点
  136. client.delete().deletingChildrenIfNeeded().forPath("/app4");
  137. }
  138. @Test
  139. public void testDelete3() throws Exception {
  140. //3. 必须成功的删除
  141. client.delete().guaranteed().forPath("/app2");
  142. }
  143. @Test
  144. public void testDelete4() throws Exception {
  145. //4. 回调
  146. client.delete().guaranteed().inBackground(new BackgroundCallback(){
  147. @Override
  148. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  149. System.out.println("我被删除了~");
  150. System.out.println(event);
  151. }
  152. }).forPath("/app1");
  153. }
  154. @After
  155. public void close() {
  156. if (client != null) {
  157. client.close();
  158. }
  159. }
  160. }

Watch事件监听

  1. public class CuratorWatcherTest {
  2. private CuratorFramework client;
  3. /**
  4. * 建立连接
  5. */
  6. @Before
  7. public void testConnect() {
  8. /*
  9. *
  10. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
  11. * @param sessionTimeoutMs 会话超时时间 单位ms
  12. * @param connectionTimeoutMs 连接超时时间 单位ms
  13. * @param retryPolicy 重试策略
  14. */
  15. /* //重试策略
  16. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
  17. //1.第一种方式
  18. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
  19. 60 * 1000, 15 * 1000, retryPolicy);*/
  20. //重试策略
  21. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  22. //2.第二种方式
  23. //CuratorFrameworkFactory.builder();
  24. client = CuratorFrameworkFactory.builder()
  25. .connectString("192.168.149.135:2181")
  26. .sessionTimeoutMs(60 * 1000)
  27. .connectionTimeoutMs(15 * 1000)
  28. .retryPolicy(retryPolicy)
  29. .namespace("itheima")
  30. .build();
  31. //开启连接
  32. client.start();
  33. }
  34. @After
  35. public void close() {
  36. if (client != null) {
  37. client.close();
  38. }
  39. }
  40. /**
  41. * 演示 NodeCache:给指定一个节点注册监听器
  42. */
  43. @Test
  44. public void testNodeCache() throws Exception {
  45. //1. 创建NodeCache对象
  46. final NodeCache nodeCache = new NodeCache(client,"/app1");
  47. //2. 注册监听
  48. nodeCache.getListenable().addListener(new NodeCacheListener() {
  49. @Override
  50. public void nodeChanged() throws Exception {
  51. System.out.println("节点变化了~");
  52. //获取修改节点后的数据
  53. byte[] data = nodeCache.getCurrentData().getData();
  54. System.out.println(new String(data));
  55. }
  56. });
  57. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
  58. nodeCache.start(true);
  59. while (true){
  60. }
  61. }
  62. /**
  63. * 演示 PathChildrenCache:监听某个节点的所有子节点们
  64. */
  65. @Test
  66. public void testPathChildrenCache() throws Exception {
  67. //1.创建监听对象
  68. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
  69. //2. 绑定监听器
  70. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  71. @Override
  72. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  73. System.out.println("子节点变化了~");
  74. System.out.println(event);
  75. //监听子节点的数据变更,并且拿到变更后的数据
  76. //1.获取类型
  77. PathChildrenCacheEvent.Type type = event.getType();
  78. //2.判断类型是否是update
  79. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  80. System.out.println("数据变了!!!");
  81. byte[] data = event.getData().getData();
  82. System.out.println(new String(data));
  83. }
  84. }
  85. });
  86. //3. 开启
  87. pathChildrenCache.start();
  88. while (true){
  89. }
  90. }
  91. /**
  92. * 演示 TreeCache:监听某个节点自己和所有子节点们
  93. */
  94. @Test
  95. public void testTreeCache() throws Exception {
  96. //1. 创建监听器
  97. TreeCache treeCache = new TreeCache(client,"/app2");
  98. //2. 注册监听
  99. treeCache.getListenable().addListener(new TreeCacheListener() {
  100. @Override
  101. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  102. System.out.println("节点变化了");
  103. System.out.println(event);
  104. }
  105. });
  106. //3. 开启
  107. treeCache.start();
  108. while (true){
  109. }
  110. }
  111. }

ZooKeeper分布式锁原理

  1. public class Ticket12306 implements Runnable{
  2. private int tickets = 10;//数据库的票数
  3. private InterProcessMutex lock ;
  4. public Ticket12306(){
  5. //重试策略
  6. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  7. //2.第二种方式
  8. //CuratorFrameworkFactory.builder();
  9. CuratorFramework client = CuratorFrameworkFactory.builder()
  10. .connectString("192.168.149.135:2181")
  11. .sessionTimeoutMs(60 * 1000)
  12. .connectionTimeoutMs(15 * 1000)
  13. .retryPolicy(retryPolicy)
  14. .build();
  15. //开启连接
  16. client.start();
  17. lock = new InterProcessMutex(client,"/lock");
  18. }
  19. @Override
  20. public void run() {
  21. while(true){
  22. //获取锁
  23. try {
  24. lock.acquire(3, TimeUnit.SECONDS);
  25. if(tickets > 0){
  26. System.out.println(Thread.currentThread()+":"+tickets);
  27. Thread.sleep(100);
  28. tickets--;
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }finally {
  33. //释放锁
  34. try {
  35. lock.release();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  41. }
  42. }
  1. public static void main(String[] args) {
  2. Ticket12306 ticket12306 = new Ticket12306();
  3. //创建客户端
  4. Thread t1 = new Thread(ticket12306,"携程");
  5. Thread t2 = new Thread(ticket12306,"飞猪");
  6. t1.start();
  7. t2.start();
  8. }

本文转载自: https://blog.csdn.net/weixin_45104798/article/details/135159742
版权归原作者 byydzj 所有, 如有侵权,请联系我们删除。

“zookeeper”的评论:

还没有评论