import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
- 简单的分布式队列
- @author jerome_s@qq.com
- @date 2016/8/30 20:19
*/
public class DistributedSimpleQueue {
protected final ZkClient zkClient;
/**
- 根节点路径
*/
protected final String root;
/**
- 顺序节点的前缀
*/
protected static final String Node_NAME = “qn_”;
public DistributedSimpleQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}
/**
- 获取队列的大小
- @return
*/
public int getQueueSize() {
return zkClient.getChildren(root).size();
}
/**
- 向队列提交数据
- @param element 提交的数据
- @return
- @throws Exception
*/
public boolean offer(T element) throws Exception {
String nodeFullPath = root.concat(“/”).concat(Node_NAME);
try {
// 创建持久的顺序节点
zkClient.createPersistentSequential(nodeFullPath, element);
} catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}
public List element() {
List list = zkClient.getChildren(root);
// 排序队列 根据名称由小到大
Collections.sort(list, new Comparator() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
List res = new ArrayList<>();
try {
for (String nodeName : list) {
String nodeFullPath = root.concat(“/”).concat(nodeName);
res.add((T)zkClient.readData(nodeFullPath));
}
} catch (ZkNoNodeException e) {
// 其他客户端消费了 继续循环
}
return res;
}
/**
- 从队列获取数据
- @return
- @throws Exception
*/
public T poll() throws Exception {
try {
List list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}
// 排序队列 根据名称由小到大
Collections.sort(list, new Comparator() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
for (String nodeName : list) {
String nodeFullPath = root.concat(“/”).concat(nodeName);
try {
T node = (T) zkClient.readData(nodeFullPath);
zkClient.delete(nodeFullPath);
return node;
} catch (ZkNoNodeException e) {
// 其他客户端消费了 继续循环
}
}
return null;
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
private String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += Node_NAME.length();
return index <= str.length() ? str.substring(index) : “”;
}
return str;
}
}
测试用例:
package com.queue;
import com.queue.model.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
/**
- 测试简单的分布式队列
- @author jerome_s@qq.com
- @date 2016/8/30 19:48
*/
public class TestDistributedSimpleQueue {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(“192.168.40.138:2181”, 10000, 500000, new SerializableSerializer());
DistributedSimpleQueue queue = new DistributedSimpleQueue<>(zkClient, “/queue”);
User user1 = new User();
user1.setId(“1”);
user1.setName(“jerome1”);
User user2 = new User();
user2.setId(“2”);
user2.setName(“jerome2”);
try {
queue.offer(user1);
queue.offer(user2);
System.out.println(“queue.offer end!”);
List element = queue.element();
System.out.println(element.toString());
User u1 = queue.poll();
User u2 = queue.poll();
System.out.println("queue.poll() u1 = " + u1.toString());
System.out.println("queue.poll() u2 = " + u2.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
/* console:
queue.offer end!
queue.poll() u1 = User{name=‘jerome1’, id=‘1’}
queue.poll() u2 = User{name=‘jerome2’, id=‘2’}
*/
}
运行结果:
写在最后
还有一份JAVA核心知识点整理(PDF):JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存,Hadoop,Spark,Storm,YARN,机器学习,云计算…
加入社区:https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0
ps://img-blog.csdnimg.cn/20200329203257407.png)
写在最后
还有一份JAVA核心知识点整理(PDF):JVM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存,Hadoop,Spark,Storm,YARN,机器学习,云计算…
[外链图片转存中…(img-Tq1KEQYl-1725749210915)]
加入社区:https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0
版权归原作者 2401_86402673 所有, 如有侵权,请联系我们删除。