Zookeeper基本使用
概述
Zookeeper是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
Zookeeper = 文件系统 + 通知机制
Zookeeper特点
- 一个领导者(Leader),多个跟随者(Follower)组成的集群
- 集群中只要有半数以上节点存活,Zookeeper集群就能够正常服务
- 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的
- 更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行
- 数据更新原子性,一次数据更新要么成功,要么失败
- 实时性,在一定时间范围内,Client能读到最新数据
数据结构
Zookeeper的数据模型结构与 Unix 文件系统很类似,整体上可以看作一棵树,每个节点称为一个ZNode
,每一个ZNode
默认能够存储1MB的数据,每个ZNode
都可以通过其路径唯一标识。
应用场景
- 统一命名服务:在分布式环境下,经常需要对应用或服务进行统一命名,便于识别。例如IP不容易记,使用域名。
- 统一配置管理
- 在分布式环境下,一般要求在一个集群下,所有节点的配置信息是一致的。在 Zookeeper 中,可以将配置信息写入到 Zookeeper 上的一个 Znode 中。
- 对配置文件修改后,希望能够快速同步到各个节点上。每个客户端服务器监听这个Znode,一旦Znode中的数据被修改,Zookeeper将通知各个客户端服务器。
- 统一集群管理:分布式环境中,实时掌握每个节点的状态是必要的,要求可以根据节点实时状态作出一些调整。Zookeeper可以实现实时监控节点状态变化。可将节点信息写入到Zookeeper的一个Znode,监听这个Znode可以获取它实时状态变化。
- 服务器动态上下线:客户端能够实时洞察到服务器上下线的变化。服务器将是否在线的状态写入到Znode中,客户端获取到当前在线服务器列表,并注册监听,当服务器有上下线时就会发起事件通知。
- 软负载均衡:在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
Zookeeper安装
单机安装
在Zookeeper官网上下载压缩包,解压后配置文件在conf/zoo_sample.cfg
文件中。可以根据需要修改配置文件。
启动和关闭的相关文件在bin
目录下:
- 启动Zookeeper:
bin/zkServer.sh start
- 查看进程是否启动
jps
- 查看状态
$ bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /Users/xiaoming/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone
- 启动客户端
bin/zkCli.sh
- 退出客户端
quit
- 停止Zookeeper
bin/zkServer.sh stop
配置文件参数
# 通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
# Zookeeper使用的基本时间,服务器之间和客户端和服务器之间维持心跳的时间间隔,也就是每个ticketTime就会发送一次心跳,时间单位为毫秒。
# 用于心跳机制,并且设置最小的session超时时间为两倍心跳时间
tickTime=2000
# LF初始通信时限(初次连接时间)
# 集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(ticketTime的数量),用来限定集群中Zookeeper服务器连接到Leader的时限
initLimit=10
# LF同步通信时限
# 集群中Leader和Follower之间的最大响应时间单位,
syncLimit=5
# 数据文件目录+数据持久化路径
# 主要用于保存Zookeeper中的数据
dataDir=/tmp/zookeeper
# 客户端连接的端口,用于监听客户端连接的端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to “0” to disable auto purge feature
#autopurge.purgeInterval=1
集群安装部署
集群模式下需要在每台服务器存储zookeeper数据的文件中创建一个myid
文件,这个文件只有一个数据就是该节点的值,例如server1则写1,server2则写2。Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg
里面的配置信息比较从而判断到底是哪个server。
同时,需要在配置文件最后加入集群信息:
server.A=B:C:D
- B是这个服务器的ip地址
- C是这个服务器与集群中的Leader服务器交换信息的端口
- D是万一集群中Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器互相通信的端口
配置示例:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/Users/xiaoming/Zookeeper/server1/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to “0” to disable auto purge feature
#autopurge.purgeInterval=1
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
之后只需要在各个服务器启动Zookeeper即可。可以通过bin/zkServer.sh status
查看当前节点状态。
Shell命令操作
- 启动客户端
bin/zkCli.sh
- 显示所有操作命令
help
- 查看当前znode中所包含的内容
ls /
- 查看当前节点数据并能看到更新次数等数据
ls2 /
- 创建普通节点
create /app1 “hello app1”
create /app1/server101 “127.0.0.1”
- 获取节点的值
get /app1
- 创建短暂节点(在当前客户端可以查看到,退出当前客户端然后再重启客户端会发现该节点已经被删除)
create -e /app-emphemeral 888
- 创建带有序号的节点(会按照顺序依次在创建的节点后面添加递增序号)
create -s /app2/aa 888
Created /app2/bb0000000001
- 修改节点数据值
set /app1 999
- 节点的值变化监听
- 在 104 主机上注册监听
/app1
节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /app1 watch
- 在 103 主机上修改
/app1
节点的数据
[zk: localhost:2181(CONNECTED) 5] set /app1 777
- 观察 104 主机收到数据变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1
- 节点的字节点变化监听(路径变化)
- 在 104 主机上注册监听/app1 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls /app1 watch
[aa0000000001, server101]
- 在 103 主机/app1 节点上创建子节点
[zk: localhost:2181(CONNECTED) 6] create /app1/bb 666
Created /app1/bb
- 观察 104 主机收到子节点变化的监听
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1
- 删除节点(如果存在字节点,需要先删除字节点)
delete /app1/bb
- 递归删除节点
rmr /app2
- 查看节点状态
stat /app1
内部原理
- 半数机制:集群中半数以上的机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
- Zookeeper虽然在配置文件中并没有指定
Master
和Slave
。但是Zookeeper工作时,是有一个节点为Leader
,其他则为Follower
,Leader
是通过内部的选举机制临时产生的。
选举机制
Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举:
- 服务器初始化启动
- 服务器运行期间无法和Leader保持连接
服务器启动时期的Leader选举
若进行Leader选举,则至少需要两台机器,这里选取3台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器Server1启动时,其单独无法进行和完成Leader选举,当第二台服务器Server2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进入Leader选举过程。选举过程如下:
- 每个Server发出一个投票:由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的
myid
和ZXID
,使用(myid, ZXID)
来表示,此时Server1的投票为(1, 0)
,Server2的投票为(2, 0)
,然后各自将这个投票发给集群中其他机器。 - 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
- 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:
- 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
- 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。
对于Server1而言,它的投票是(1, 0)
,接收Server2的投票为(2, 0)
,首先会比较两者的ZXID
,均为0,再比较myid
,此时Server2的myid
最大,于是更新自己的投票为(2, 0)
,然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
- 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了
(2, 0)
的投票信息,此时便认为已经选出了Leader。 - 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为
FOLLOWING
,如果是Leader,就变更为LEADING
。
服务器运行期间的Leader选举
在Zookeeper运行期间,Leader与非Leader服务器各司其职,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下:
- 变更状态。Leader挂后,余下的非Observer服务器都会讲自己的服务器状态变更为
LOOKING
,然后开始进入Leader选举过程。 - 每个Server会发出一个投票。在运行期间,每个服务器上的
ZXID
可能不同,此时假定Server1的ZXID
为123,Server3的ZXID
为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123)
,(3, 122)
,然后各自将投票发送给集群中所有机器。 - 接收来自各个服务器的投票。与启动时过程相同。
- 处理投票。与启动时过程相同,此时,Server1将会成为Leader。
- 统计投票。与启动时过程相同。
- 改变服务器的状态。与启动时过程相同。
节点类型
主要有两种类型:
持久(Persistent)
:客户端与服务器端断开连接后,创建的节点不删除短暂(Ephemeral)
:客户端和服务器断开连接后,创建的节点不删除
持久化节点
分为持久化目录节点和持久化顺序编号目录节点:
- 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在。
- 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。
临时节点
分为临时目录节点和临时顺序编号目录节点。
- 临时目录节点:客户端与Zookeeper断开连接后,该节点被删除。
- 临时顺序编号目录节点:客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。
创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
Stat结构体
使用stat
命令可以查看节点的状态,其返回的数据代表的意义如下:
- czxid:引起这个znode创建的
zxid
,创建节点事务的zxid
每次修改 ZooKeeper 状态都会收到一个
zxid
形式的时间戳,也就是 ZooKeeper 事务 ID。 事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的zxid
,如果zxid1
小 于zxid2
,那么zxid1
在zxid2
之前发生。 - ctime:znode被创建的毫秒数(从1970年开始)
- mzxid:znode最后更行的zxid
- mtime:最后修改的毫秒数(从1970年开始)
- pZxid:znode做后更新的字节点zxid
- cversion:znode字节点变化号,znode字节点修改次数
- dataversion:znode数据变化号
- aclVersion:znode访问控制列表的变化号
- ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
- dataLength:znode的数据长度
- numChildren:znode字节点数量
监听器原理
- 首先有一个
main()
线程 - 在
main()
线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener) - 通过
connect
线程将注册的监听事件发送给Zookeeper - 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
- Zookeeper监听到有数据或者路径变化,就会将这个消息发送给
listener
线程 listener
线程内部调用了process()
方法
常见的监听:
- 监听节点数据的变化:
get -w path
- 监听字节点增减的变化:
ls -w path
写数据流程
API使用
需要引入如下配置:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class TestZookeeper {
//连接的服务器,用”,”分割,注意不能有空格
private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
/**
* 创建Zookeeper客户端
* @throws IOException
*/
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType()+”—“+watchedEvent.getPath());
//再次启动监听
try {
List<String> children = zkClient.getChildren(“/“, true);
for (String child:children) {
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 创建字节点
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void create() throws KeeperException, InterruptedException {
//参数1:创建节点的路径
//参数2:节点数据,需要使用字节数组
//参数3:节点权限
//参数4:节点的类型
String nodeCreated = zkClient.create(“/idea”,”hello,idea”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(nodeCreated);
}
/**
* 获取子节点
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getChildren() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren(“/“, true);
for(String child:children) {
System.out.println(child);
}
//延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
/**
* 判断znode是否存在
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void exist() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/idea", false);
System.out.println(exists == null?"not exist":"exist");
}
}
监听服务器节点动态上下线案例
在分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
从zookeeper角度来说,服务器和客户端其实都属于zookeepr的客户端。
具体实现
服务器端
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = “/servers”;//提前创建好该节点
/**
* 创建到zk的客户端连接
* @throws IOException
*/
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 注册服务器
* @param hostname
* @throws KeeperException
* @throws InterruptedException
*/
public void registServer(String hostname) throws KeeperException, InterruptedException {
String create = zk.create(parentNode + “/server”, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+” is online” + create);
}
/**
* 业务功能
* @param hostname
* @throws InterruptedException
*/
public void business(String hostname) throws InterruptedException {
System.out.println(hostname + " is working...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//获取zk连接
DistributeServer server = new DistributeServer();
server.getConnect();
//利用zk连接注册服务器信息
server.registServer(args[0]);
//启动业务功能
server.business(args[0]);
}
}
客户端
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
private String parentNode = “/servers”;//提前创建好该节点
private volatile ArrayList<String> serversList = new ArrayList<>();
//创建到zk的客户端连接
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//启动监听
try {
getServerList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public void getServerList() throws KeeperException, InterruptedException {
//获取服务器字节点信息,并且对父节点进行监听
List<String> children = zkClient.getChildren(parentNode, true);
ArrayList<String> servers = new ArrayList<>();
for(String child:children) {
byte[] data = zkClient.getData(parentNode + “/“ + child, false, null);
servers.add(new String(data));
}
//把servers赋给成员serversList,以提供给各业务线程使用
serversList = servers;
System.out.println(serversList);
}
//业务功能
public void business() throws InterruptedException {
System.out.println(“client is working…”);
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 业务进程启动
client.business();
}
}