0


zookeeper之分布式队列

import java.util.ArrayList;

import java.util.Collections;

import java.util.Comparator;

import java.util.List;

/**

*/

public class DistributedSimpleQueue {

protected final ZkClient zkClient;

/**

  • 根节点路径

*/

protected final String root;

/**

  • 顺序节点的前缀

*/

protected static final String Node_NAME = “qn_”;

public DistributedSimpleQueue(ZkClient zkClient, String root) {

this.zkClient = zkClient;

this.root = root;

}

/**

  • 获取队列的大小
  • @return

*/

public int getQueueSize() {

return zkClient.getChildren(root).size();

}

/**

  • 向队列提交数据
  • @param element 提交的数据
  • @return
  • @throws Exception

*/

public boolean offer(T element) throws Exception {

String nodeFullPath = root.concat(“/”).concat(Node_NAME);

try {

// 创建持久的顺序节点

zkClient.createPersistentSequential(nodeFullPath, element);

} catch (ZkNoNodeException e) {

zkClient.createPersistent(root);

offer(element);

} catch (Exception e) {

throw ExceptionUtil.convertToRuntimeException(e);

}

return true;

}

public List element() {

List list = zkClient.getChildren(root);

// 排序队列 根据名称由小到大

Collections.sort(list, new Comparator() {

public int compare(String lhs, String rhs) {

return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

}

});

List res = new ArrayList<>();

try {

for (String nodeName : list) {

String nodeFullPath = root.concat(“/”).concat(nodeName);

res.add((T)zkClient.readData(nodeFullPath));

}

} catch (ZkNoNodeException e) {

// 其他客户端消费了 继续循环

}

return res;

}

/**

  • 从队列获取数据
  • @return
  • @throws Exception

*/

public T poll() throws Exception {

try {

List list = zkClient.getChildren(root);

if (list.size() == 0) {

return null;

}

// 排序队列 根据名称由小到大

Collections.sort(list, new Comparator() {

public int compare(String lhs, String rhs) {

return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

}

});

for (String nodeName : list) {

String nodeFullPath = root.concat(“/”).concat(nodeName);

try {

T node = (T) zkClient.readData(nodeFullPath);

zkClient.delete(nodeFullPath);

return node;

} catch (ZkNoNodeException e) {

// 其他客户端消费了 继续循环

}

}

return null;

} catch (Exception e) {

throw ExceptionUtil.convertToRuntimeException(e);

}

}

private String getNodeNumber(String str, String nodeName) {

int index = str.lastIndexOf(nodeName);

if (index >= 0) {

index += Node_NAME.length();

return index <= str.length() ? str.substring(index) : “”;

}

return str;

}

}

测试用例:

package com.queue;

import com.queue.model.User;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.List;

/**

  • 测试简单的分布式队列
  • @author jerome_s@qq.com
  • @date 2016/8/30 19:48

*/

public class TestDistributedSimpleQueue {

public static void main(String[] args) {

ZkClient zkClient = new ZkClient(“192.168.40.138:2181”, 10000, 500000, new SerializableSerializer());

DistributedSimpleQueue queue = new DistributedSimpleQueue<>(zkClient, “/queue”);

User user1 = new User();

user1.setId(“1”);

user1.setName(“jerome1”);

User user2 = new User();

user2.setId(“2”);

user2.setName(“jerome2”);

try {

queue.offer(user1);

queue.offer(user2);

System.out.println(“queue.offer end!”);

List element = queue.element();

System.out.println(element.toString());

User u1 = queue.poll();

User u2 = queue.poll();

System.out.println("queue.poll() u1 = " + u1.toString());

System.out.println("queue.poll() u2 = " + u2.toString());

} catch (Exception e) {

e.printStackTrace();

}

}

/* console:

queue.offer end!

queue.poll() u1 = User{name=‘jerome1’, id=‘1’}

queue.poll() u2 = User{name=‘jerome2’, id=‘2’}

*/

}
运行结果:

写在最后

还有一份JAVA核心知识点整理(PDF):JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存,Hadoop,Spark,Storm,YARN,机器学习,云计算…

image

加入社区:https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0
ps://img-blog.csdnimg.cn/20200329203257407.png)

写在最后

还有一份JAVA核心知识点整理(PDF):JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存,Hadoop,Spark,Storm,YARN,机器学习,云计算…

[外链图片转存中…(img-Tq1KEQYl-1725749210915)]

加入社区:https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0


本文转载自: https://blog.csdn.net/2401_86402673/article/details/142008113
版权归原作者 2401_86402673 所有, 如有侵权,请联系我们删除。

“zookeeper之分布式队列”的评论:

还没有评论