Zookeeper基本使用

概述

Zookeeper是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

Zookeeper = 文件系统 + 通知机制

Zookeeper特点

  • 一个领导者(Leader),多个跟随者(Follower)组成的集群
  • 集群中只要有半数以上节点存活,Zookeeper集群就能够正常服务
  • 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的
  • 更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行
  • 数据更新原子性,一次数据更新要么成功,要么失败
  • 实时性,在一定时间范围内,Client能读到最新数据

数据结构

Zookeeper的数据模型结构与 Unix 文件系统很类似,整体上可以看作一棵树,每个节点称为一个ZNode,每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

Zookerper节点结构

应用场景

  1. 统一命名服务:在分布式环境下,经常需要对应用或服务进行统一命名,便于识别。例如IP不容易记,使用域名。
  2. 统一配置管理
  • 在分布式环境下,一般要求在一个集群下,所有节点的配置信息是一致的。在 Zookeeper 中,可以将配置信息写入到 Zookeeper 上的一个 Znode 中。
  • 对配置文件修改后,希望能够快速同步到各个节点上。每个客户端服务器监听这个Znode,一旦Znode中的数据被修改,Zookeeper将通知各个客户端服务器。
  1. 统一集群管理:分布式环境中,实时掌握每个节点的状态是必要的,要求可以根据节点实时状态作出一些调整。Zookeeper可以实现实时监控节点状态变化。可将节点信息写入到Zookeeper的一个Znode,监听这个Znode可以获取它实时状态变化。
  2. 服务器动态上下线:客户端能够实时洞察到服务器上下线的变化。服务器将是否在线的状态写入到Znode中,客户端获取到当前在线服务器列表,并注册监听,当服务器有上下线时就会发起事件通知。
  3. 软负载均衡:在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

Zookeeper安装

单机安装

在Zookeeper官网上下载压缩包,解压后配置文件在conf/zoo_sample.cfg文件中。可以根据需要修改配置文件。

启动和关闭的相关文件在bin目录下:

  • 启动Zookeeper:
bin/zkServer.sh start
  • 查看进程是否启动
jps
  • 查看状态
$ bin/zkServer.sh status

/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /Users/xiaoming/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone
  • 启动客户端
bin/zkCli.sh
  • 退出客户端
quit
  • 停止Zookeeper
bin/zkServer.sh stop

配置文件参数

# 通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
# Zookeeper使用的基本时间,服务器之间和客户端和服务器之间维持心跳的时间间隔,也就是每个ticketTime就会发送一次心跳,时间单位为毫秒。
# 用于心跳机制,并且设置最小的session超时时间为两倍心跳时间
tickTime=2000
# LF初始通信时限(初次连接时间)
# 集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(ticketTime的数量),用来限定集群中Zookeeper服务器连接到Leader的时限
initLimit=10
# LF同步通信时限
# 集群中Leader和Follower之间的最大响应时间单位,
syncLimit=5
# 数据文件目录+数据持久化路径
# 主要用于保存Zookeeper中的数据
dataDir=/tmp/zookeeper
# 客户端连接的端口,用于监听客户端连接的端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to “0” to disable auto purge feature
#autopurge.purgeInterval=1

集群安装部署

集群模式下需要在每台服务器存储zookeeper数据的文件中创建一个myid文件,这个文件只有一个数据就是该节点的值,例如server1则写1,server2则写2。Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server

同时,需要在配置文件最后加入集群信息:

server.A=B:C:D
  • B是这个服务器的ip地址
  • C是这个服务器与集群中的Leader服务器交换信息的端口
  • D是万一集群中Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器互相通信的端口

配置示例:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/Users/xiaoming/Zookeeper/server1/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to “0” to disable auto purge feature
#autopurge.purgeInterval=1
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

之后只需要在各个服务器启动Zookeeper即可。可以通过bin/zkServer.sh status查看当前节点状态。

Shell命令操作

  1. 启动客户端
bin/zkCli.sh
  1. 显示所有操作命令
help
  1. 查看当前znode中所包含的内容
ls /
  1. 查看当前节点数据并能看到更新次数等数据
ls2 /
  1. 创建普通节点
create /app1 “hello app1”
create /app1/server101 “127.0.0.1”
  1. 获取节点的值
get /app1
  1. 创建短暂节点(在当前客户端可以查看到,退出当前客户端然后再重启客户端会发现该节点已经被删除)
create -e /app-emphemeral 888
  1. 创建带有序号的节点(会按照顺序依次在创建的节点后面添加递增序号)
create -s /app2/aa 888

Created /app2/bb0000000001
  1. 修改节点数据值
set /app1 999
  1. 节点的值变化监听
  • 在 104 主机上注册监听/app1 节点数据变化
 [zk: localhost:2181(CONNECTED) 26] get  -w /app1 watch
  • 在 103 主机上修改/app1节点的数据
 [zk: localhost:2181(CONNECTED) 5] set /app1 777 
  • 观察 104 主机收到数据变化的监听
WATCHER:: 
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1
  1. 节点的字节点变化监听(路径变化)
  • 在 104 主机上注册监听/app1 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls /app1 watch 
[aa0000000001, server101] 
  • 在 103 主机/app1 节点上创建子节点
[zk: localhost:2181(CONNECTED) 6] create /app1/bb 666 
Created /app1/bb 
  • 观察 104 主机收到子节点变化的监听
WATCHER:: 
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1
  1. 删除节点(如果存在字节点,需要先删除字节点)
delete /app1/bb
  1. 递归删除节点
rmr /app2
  1. 查看节点状态
stat /app1

内部原理

  1. 半数机制:集群中半数以上的机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
  2. Zookeeper虽然在配置文件中并没有指定MasterSlave。但是Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。

选举机制

Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举:

  1. 服务器初始化启动
  2. 服务器运行期间无法和Leader保持连接

服务器启动时期的Leader选举

若进行Leader选举,则至少需要两台机器,这里选取3台机器组成的服务器集群为例。在集群初始化阶段,当有一台服务器Server1启动时,其单独无法进行和完成Leader选举,当第二台服务器Server2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进入Leader选举过程。选举过程如下:

  1. 每个Server发出一个投票:由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myidZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
  2. 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:
  • 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
  • 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。
    对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
  1. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。
  2. 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING

服务器运行期间的Leader选举

在Zookeeper运行期间,Leader与非Leader服务器各司其职,即便当有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。假设正在运行的有Server1、Server2、Server3三台服务器,当前Leader是Server2,若某一时刻Leader挂了,此时便开始Leader选举。选举过程如下:

  1. 变更状态。Leader挂后,余下的非Observer服务器都会讲自己的服务器状态变更为LOOKING,然后开始进入Leader选举过程。
  2. 每个Server会发出一个投票。在运行期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1, 123)(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同。
  4. 处理投票。与启动时过程相同,此时,Server1将会成为Leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同。

节点类型

主要有两种类型:

  • 持久(Persistent):客户端与服务器端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务器断开连接后,创建的节点不删除

持久化节点

分为持久化目录节点和持久化顺序编号目录节点:

  • 持久化目录节点:客户端与Zookeeper断开连接后,该节点依旧存在。
  • 持久化顺序编号目录节点:客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号。

临时节点

分为临时目录节点和临时顺序编号目录节点。

  • 临时目录节点:客户端与Zookeeper断开连接后,该节点被删除。
  • 临时顺序编号目录节点:客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

Zookeeper节点类型

Stat结构体

使用stat命令可以查看节点的状态,其返回的数据代表的意义如下:

  1. czxid:引起这个znode创建的zxid,创建节点事务的zxid

    每次修改 ZooKeeper 状态都会收到一个zxid形式的时间戳,也就是 ZooKeeper 事务 ID。 事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小 于zxid2,那么zxid1zxid2之前发生。

  2. ctime:znode被创建的毫秒数(从1970年开始)
  3. mzxid:znode最后更行的zxid
  4. mtime:最后修改的毫秒数(从1970年开始)
  5. pZxid:znode做后更新的字节点zxid
  6. cversion:znode字节点变化号,znode字节点修改次数
  7. dataversion:znode数据变化号
  8. aclVersion:znode访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  10. dataLength:znode的数据长度
  11. numChildren:znode字节点数量

监听器原理

  1. 首先有一个main()线程
  2. main()线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener)
  3. 通过connect线程将注册的监听事件发送给Zookeeper
  4. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
  5. Zookeeper监听到有数据或者路径变化,就会将这个消息发送给listener线程
  6. listener线程内部调用了process()方法

Zookerper监听器原理

常见的监听:

  • 监听节点数据的变化:get -w path
  • 监听字节点增减的变化:ls -w path

写数据流程

Zookeeper写数据流程

API使用

需要引入如下配置:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.6</version>
</dependency>

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.12.1</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>compile</scope>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class TestZookeeper {

    //连接的服务器,用”,”分割,注意不能有空格
    private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;

    /**
     * 创建Zookeeper客户端
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //收到事件通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType()+”—“+watchedEvent.getPath());

                //再次启动监听
                try {
                    List<String> children = zkClient.getChildren(“/“, true);
                    for (String child:children) {
                        System.out.println(child);
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 创建字节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void create() throws KeeperException, InterruptedException {
        //参数1:创建节点的路径
        //参数2:节点数据,需要使用字节数组
        //参数3:节点权限
        //参数4:节点的类型
        String nodeCreated = zkClient.create(“/idea”,”hello,idea”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        System.out.println(nodeCreated);
    }

    /**
     * 获取子节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren(“/“, true);
        for(String child:children) {
            System.out.println(child);
        }

        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 判断znode是否存在
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void exist() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/idea", false);
        System.out.println(exists == null?"not exist":"exist");
    }
}

监听服务器节点动态上下线案例

在分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

Zookeeper动态节点上下线

从zookeeper角度来说,服务器和客户端其实都属于zookeepr的客户端。

具体实现

服务器端

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {
    private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = “/servers”;//提前创建好该节点

    /**
     * 创建到zk的客户端连接
     * @throws IOException
     */
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    /**
     * 注册服务器
     * @param hostname
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void registServer(String hostname) throws KeeperException, InterruptedException {
        String create = zk.create(parentNode + “/server”, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+” is online” + create);
    }

    /**
     * 业务功能
     * @param hostname
     * @throws InterruptedException
     */
    public void business(String hostname) throws InterruptedException {
        System.out.println(hostname + " is working...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //获取zk连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
        //利用zk连接注册服务器信息
        server.registServer(args[0]);
        //启动业务功能
        server.business(args[0]);
    }
}

客户端

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {
    private static String connectString = “127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183”;
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
    private String parentNode = “/servers”;//提前创建好该节点
    private volatile ArrayList<String> serversList = new ArrayList<>();

    //创建到zk的客户端连接
    public void getConnect() throws IOException {
         zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //启动监听
                try {
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void getServerList() throws KeeperException, InterruptedException {
        //获取服务器字节点信息,并且对父节点进行监听
        List<String> children = zkClient.getChildren(parentNode, true);
        ArrayList<String> servers = new ArrayList<>();

        for(String child:children) {
            byte[] data = zkClient.getData(parentNode + “/“ + child, false, null);
            servers.add(new String(data));
        }

        //把servers赋给成员serversList,以提供给各业务线程使用
        serversList = servers;
        System.out.println(serversList);
    }

    //业务功能
    public void business() throws InterruptedException {
        System.out.println(“client is working…”);
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        // 获取 zk 连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
        // 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
        // 业务进程启动
        client.business();
    }
}