节点类型
持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
可分为无序号和有序号的,顺序号可以被用于 为所有的事件进行全局排序,这样客户端可以通 过顺序号推断事件的顺序
短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
也可分为无序号和有序号的
Linux中创建节点
创建永久节点名称为jiedian1,值为dadada,无序号
create /jiedian1 "dadada"
创建子节点
create /jiedian1/zijiedian "dadada"
获取节点值
get -s /jiedian1
创建带序号的节点 只需要加-s
create -s /jiedian1/zijiedian "xuhao"
创建临时
create -e /jiedian2 "linshi"
临时带序号
create -e -s /jiedian2 "linshixuhao"
此时 quit 退出后,临时节点会消失
修改节点
set /jiedian1 "123"
监听器原理
监听那个节点数据变化,若变化通知客户端
1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了process()方法。
使用命令行操作,注册一次监听一次
get -w /jiedian1
此时用其他机器对节点进行修改,此时就会通知当前客户端
整合代码
创建maven工程,添加依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
配置日志文件,在resource下创建log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写测试类
public class zkClient {
//不能有空格
private static String connectString = "192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
//超时时间
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
//接连集群
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "--"
+ watchedEvent.getPath());
// 再次启动监听
try {
List<String> children = zkClient.getChildren("/",
true);
for (String child : children) {
System.out.println(child);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 创建子节点
@Test
public void create() throws Exception {
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
String nodeCreated = zkClient.create("/lzq",
"jiedain".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
// 获取子节点并监听 注册一次生效一次
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
}
写数据流程
写流程之写入请求直接发送给Leader节点,直接写,然后同步到从节点,当超过半数写完,返回给客户端
写流程之写入请求发送给follower节点,通知主节点写。。。。
服务器动态上下线监听
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知 到主节点服务器的上下线
先在集群上创建/servers 节点
create /servers "servers"
IDEA中,服务器端向 Zookeeper 注册代码
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class DistributeServer {
private static String connectString =
"192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException{
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注册服务器
public void registServer(String hostname) throws Exception{
String create = zk.create(parentNode + "/server",
hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
// 业务功能
public void business(String hostname) throws Exception{
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
客户端
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClient {
private static String connectString =
"192.168.6.100:2181,192.168.6.101:2181,192.168.6.102:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child,
false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
此时修改或创建节点就会在控制台输出
版权归原作者 远走于梦游 所有, 如有侵权,请联系我们删除。