环境安装 ubuntu 安装 java8 参考
1 2 apt update apt install openjdk-8-jdk
Zookeeper 下载,官方地址 apache-zookeeper-3.6.0-bin.tar.gz
解压文件
1 tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
进去解压目录重新命名配置文件
1 mv zoo_sample.cfg zoo.cfg
修改数据文件目录
1 dataDir=/users/yanrs/zookeeper
服务端启动
1 2 ./bin/zkServer.sh start # 后台启动 # ./bin/zkServer.sh start-foreground # 前台启动
客户端启动
可以使用以下命令查看服务是否正常启动。
1 2 3 ps -ef | grep zookeeper # 方法1 lsof -i:2181 # 方法2,查端口是否被占用 netstat -anp tcp | grep 2181 # 方法3,查端口是否被占用
简单操作 查看根节点信息 使用 ls / 查看根节点下所有的 znode
1 2 3 [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 1]
创建节点 使用 create 创建 worders 节点,并且指定了一个空字符串,这说明我们在创建的时候不希望在 worders 节点中保存数据。这种方式创建的是持久化节点。
1 2 3 4 5 [zk: localhost:2181(CONNECTED) 1] create /worders "" Created /worders [zk: localhost:2181(CONNECTED) 2] ls / [worders, zookeeper] [zk: localhost:2181(CONNECTED) 3]
创建有序的持久化节点(只需加上 -s 即可)
1 2 3 [zk: localhost:2181(CONNECTED) 15] create -s /test testdata Created /test0000000001 [zk: localhost:2181(CONNECTED) 16]
创建临时节点(客户端断开,临时节点就会消失)
1 2 [zk: localhost:2181(CONNECTED) 21] create -e /test_tmp testdata Created /test_tmp
创建有序的临时节点
1 2 [zk: localhost:2181(CONNECTED) 22] create -e -s /test_tmp testdata Created /test_tmp0000000006
总结:创建有序节点只需加上 -s 即可,创建临时节点只需加上 -e 即可
删除节点 使用 delete 加路径来删除节点信息
1 2 3 4 [zk: localhost:2181(CONNECTED) 1] delete /worders [zk: localhost:2181(CONNECTED) 2] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 3]
可以使用 deleteall 来递归删除节点信息
1 [zk: localhost:2181(CONNECTED) 49] deleteall /test # 不必担心test下面是否还有子路径,全都删除
停止服务
会话 会话状态的转换依赖于发生在客户端与服务之间的各种事情
一个会话从 NOT_CONNECTED 状态开始,当 ZooKeeper 客户端初始化后转换到 CONNECTING 状态(图中的箭头1)。正常情况下,成功与 ZooKeeper 服务器建立连接后,会话转换到 CONNECTED 状态(箭头2)。当客户端与 ZooKeeper 服务器断开连接或者无法收到服务器的响应时,它就会转换回 CONNECTING 状态(箭头3)并尝试发现其他 ZooKeeper 服务器。如果可以发现另一个服务器或重连到原来的服务器,当服务器确认会话有效后,状态又会转换回 CONNECTED 状态。否则,它将会声明会话过期,然后转换到 CLOSED状态(箭头4)。应用也可以显式地关闭会话(箭头4和箭头5)。
创建一个会话时,你需要设置会话超时这个重要的参数,这个参数设置了 ZooKeeper 服务允许会话被声明为超时之前存在的时间。如果经过时间 t 之后服务接收不到这个会话的任何消息,服务就会声明会话过期。而在客户端侧,如果经过 t/3 的时间未收到任何消息,客户端将向服务器发送心跳消息。在经过 2t/3 时间后,ZooKeeper 客户端开始寻找其他的服务器,而此时它还有 t/3 时间去寻找。
在仲裁模式下,客户端有多个服务器可以连接,而在独立模式下,客户端只能尝试重新连接单个服务器。在仲裁模式中,应用需要传递可用的服务器列表给客户端,告知客户端可以连接的服务器信息并选择一个进行连接。当尝试连接到一个不同的服务器时,非常重要的是,这个服务器的 ZooKeeper 状态要与最后连接的服务器的 ZooKeeper 状态保持最新。客户端不能连接到这样的服务器:它未发现更新而客户端却已经发现的更新。ZooKeeper 通过在服务中排序更新操作来决定状态是否最新。ZooKeeper 确保每一个变化相对于所有其他已执行的更新是完全有序的。因此,如果一个客户端在位置 i 观察到一个更新,它就不能连接到只观察到 i<i 的服务器上 。在 ZooKeeper 实现中,系统根据每一个更新建立的顺序来分配给事务标识符。
仲裁模式 搭建集群 在 conf/zoo.cfg 下添加配置信息
1 2 3 server.1=127.0.0.1:2222:2223 server.2=127.0.0.1:3333:4444 server.3=127.0.0.1:4444:4445
每一个 server.n 项指定了编号为 n 的 ZooKeeper 服务器使用的地址和端口号。每个 server.n 项通过冒号分隔为三部分,第一部分为服务器 n 的 IP 地址或主机名(hostname),第二部分和第三部分为 TCP 端口号,分别用于仲裁通信和群首选举 。因为我们在同一个机器上运行三个服务器进程,所以我们需要在每一项中使用不同的端口号。通常,我们在不同的服务器上运行每个服务器进程,因此每个服务器项的配置可以使用相同的端口号。
复制 conf/zoo.cfg 文件,在 conf 下创建 z1/z1.cfg。复制 conf/zoo.cfg 文件,在 conf 下创建 z2/z2.cfg,并将其中的端口改为 2182,复制 conf/zoo.cfg 文件,在 conf 下创建 z3/z3.cfg,并将其中的端口改为 2183,目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 root@iZbp18at9sb673q1srsg3oZ:~/apache-zookeeper-3.6.0-bin/conf# tree . . ├── configuration.xsl ├── log4j.properties ├── z1 │?? └── z1.cfg ├── z2 │?? └── z2.cfg ├── z3 │?? └── z3.cfg └── zoo.cfg
切换到 dataDir 所在的目录,分别创建以下目录,用于存放数据文件
1 2 3 4 5 6 mkdir z1 mkdir z1/data mkdir z2 mkdir z2/data mkdir z3 mkdir z3/data
当启动一个服务器时,我们需要知道启动的是哪个服务器。一个服务器通过读取data目录下一个名为myid的文件来获取服务器ID信息。可以通过以下命令来创建这些文件:
1 2 3 echo 1 > z1/data/myid echo 2 > z2/data/myid echo 3 > z3/data/myid
目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 root@iZbp18at9sb673q1srsg3oZ:/users/yanrs/zookeeper# tree . . ├── z1 │?? └── data │?? └── myid ├── z2 │?? └── data │?? └── myid └── z3 └── data └── myid
切换到 conf/z1 下启动 z1 服务
1 ../../bin/zkServer.sh start ./z1.cfg
查看 log 文件夹中的日志,因为我们只启动了三个 ZooKeepe r服务器中的一个,所以整个服务还无法运行。在日志中我们将会看到以下形式的记录:
这个服务器疯狂地尝试连接到其他服务器,然后失败,如果我们启动另一个服务器,我们可以构成仲裁的法定人数。切换到 conf/z2 目录,执行以下命令,启动 z2。
1 ../../bin/zkServer.sh start ./z2.cfg
查看 z1 和 z2 的状态,可以看到 z2 成为了 leader,z1 成为了 follower
访问集群 在 bin 目录下访问集群,访问集群时,我们需要加上 server 的地址,连接后使用 Ctrl + c 进行停止,反复多次连接会发现端口在 2181 和 2182 之间切换,还会注意到尝试 2183 端口后连接失败的消息,之后为成功连接到某一个服务器端口的消息。客户端以随机顺序连接到连接串中的服务器。这样可以用 ZooKeeper 来实现一个简单的负载均衡。不过,客户端无法指定优先选择的服务器来进行连接。
1 ./zkCli.sh -server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
分布式锁 假设有一个应用由 n 个进程组成,这些进程尝试获取一个锁。再次强调,ZooKeeper 并未直接暴露原语,因此我们使用 ZooKeeper 的接口来管理 znode,以此来实现锁。为了获得一个锁,每个进程 p 尝试创建 znode,名为 /lock。如果进程 p 成功创建了 znode,就表示它获得了锁并可以继续执行其临界区域的代码。不过一个潜在的问题是进程 p 可能崩溃,导致这个锁永远无法释放。在这种情况下,没有任何其他进程可以再次获得这个锁,整个系统可能因死锁而失灵。为了避免这种情况,我们不得不在创建这个节点时指定 /lock 为临时节点。
其他进程因 znode 存在而创建 /lock 失败。当收到 /lock 删除的通知时,如果进程 p 还需要继续获取锁,它就继续尝试创建 /lock 的步骤,如果其他进程已经创建了,就继续监听节点。
实现主从模式 主节点 创建主节点 因为只有一个进程会成为主节点,所以一个进程成为 ZooKeeper 的主节点后必须锁定管理权。为此,进程需要创建一个临时 znode,名为 /master:
1 ./zkCli.sh -server 127.0.0.1:2181
连接 2181 并创建 master 节点,其中 -e 指定该节点为临时节点,一个临时节点会在会话过期或关闭时自动被删除。,”” 引号中的为节点数据信息。
1 2 3 [zk: 127.0.0.1:2181(CONNECTED) 0] create -e /master "test data: im 2181" Created /master [zk: 127.0.0.1:2181(CONNECTED) 1]
查看创建的节点的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [zk: 127.0.0.1:2181(CONNECTED) 7] ls / # 查看 zk 树的节点信息 [master, zookeeper] [zk: 127.0.0.1:2181(CONNECTED) 8] get /master # 查看 /master 节点信息 test data: im 2181 [zk: 127.0.0.1:2181(CONNECTED) 9] stat /master # 查看 /master 的元数据信息 cZxid = 0x10000000f ctime = Tue Apr 28 18:31:40 CST 2020 mZxid = 0x10000000f mtime = Tue Apr 28 18:31:40 CST 2020 pZxid = 0x10000000f cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x100004792a60003 dataLength = 18 numChildren = 0 [zk: 127.0.0.1:2181(CONNECTED) 10]
抢占主节点 连接 2182 并尝试创建 master 节点
1 ./zkCli.sh -server 127.0.0.1:2181
会提示节点已经存在
1 2 3 [zk: 127.0.0.1:2182(CONNECTED) 1] create -e /master "test data: im 2182" Node already exists: /master [zk: 127.0.0.1:2182(CONNECTED) 2]
监视主节点 使用 stat -w + 节点名称来为节点创建监视点
1 2 3 4 5 6 7 8 9 10 11 12 13 [zk: 127.0.0.1:2182(CONNECTED) 5] stat -w /master # 为 /master 节点创建监视点 cZxid = 0x10000000f ctime = Tue Apr 28 18:31:40 CST 2020 mZxid = 0x10000000f mtime = Tue Apr 28 18:31:40 CST 2020 pZxid = 0x10000000f cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x100004792a60003 dataLength = 18 numChildren = 0 [zk: 127.0.0.1:2182(CONNECTED) 6]
删除 /master 节点,会看到监视点发出节点被删除的信息
1 2 3 4 5 6 [zk: 127.0.0.1:2182(CONNECTED) 7] delete /master # 删除节点 WATCHER:: WatchedEvent state:SyncConnected type:NodeDeleted path:/master # 监听器效果 [zk: 127.0.0.1:2182(CONNECTED) 8]
抢占主节点 在次运行创建主节点信息时,就能创建了。因为之前的已经被删除了。
1 2 3 [zk: 127.0.0.1:2182(CONNECTED) 8] create -e /master "test data: im 2182" Created /master [zk: 127.0.0.1:2182(CONNECTED) 9]
从节点,任务和分配 创建三个父znode,这三个节点为持久性节点,不包含任何数据
1 2 3 4 create /workers "" create /tasks "" create /assign "" ls /
在真实的应用中,这些 znode 可能由主进程在分配任务前创建,也可能由一个引导程序创建,不管这些节点是如何创建的,一旦这些节点存在了,主节点就需要监视 /workers 和 /tasks 的子节点的变化情况:
1 2 ls -w /workers ls -w /tasks
从节点首先要通知主节点,告知从节点可以执行任务。从节点通过在 /workers 子节点下创建临时性的 znode 来进行通知,并在子节点中使用庄机名来标识自己:
1 create -e /workers/zk.yanrs.com "zk.yanrs.com:2181"
从节点创建后主节点就能收到监听信息
1 2 3 WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/workers
后面操作略,主要流程信息如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 从节点告知主节点可以执行任务,并传递自己的身份(便于后续主节点分配任务给自己) create -e /workers/zk.yanrs.com "zk.yanrs.com:2181" 从节点创建 /assign/zk.yanrs.com 节点来接收分配的任务,并监听状态 create /assign/zk.yanrs.com "" ls -w /assign/zk.yanrs.com 客户端添加任务,并监听任务状态 create -s /tasks/task- "cmd" ls -w /tasks/task-0000000000 主节点分配任务,这时从节点会接收到通知 create /assign/zk.yanrs.com/task-0000000000 "" 如果任务是给自己的,那么就执行任务,完成后添加 status 子节点,并标记为完成 create /tasks/task-0000000000/status "done" # 客户端检查执行结果 get /tasks/task-0000000000
Stat 结构体 Znode 维护了一个 stat 结构,这个 stat 包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让 Zookeeper 验证缓存和协调更新。每次 znode 的数据发生了变化,版本号就增加。例如,无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode 的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败。
查看某个目录的 stat 信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [zk: localhost:2181(CONNECTED) 25] stat test_tmp Path must start with / character [zk: localhost:2181(CONNECTED) 26] stat /test_tmp cZxid = 0x9 ctime = Sun May 17 21:16:29 CST 2020 mZxid = 0x9 mtime = Sun May 17 21:16:29 CST 2020 pZxid = 0x9 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x10010118bfa0000 dataLength = 8 numChildren = 0
Stat 结构体:
1 2 3 4 5 6 7 8 9 10 11 czxid - 创建节点的事务的 zxid(ZooKeeper Transaction Id),即创建节点的时候自动生成的一个 ID ctime - znode 被创建的毫秒数(从1970年开始) mzxid - znode 最后更新的zxid mtime - znode 最后修改的毫秒数(从1970年开始) pZxid - znode 最后更新的子节点 zxid cversion - znode子节点变化号,znode子节点修改次数 dataversion - znode数据变化号 aclVersion - znode访问控制列表的变化号 ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。 dataLength - znode的数据长度 numChildren - znode子节点数量
Znode Znode = path + data + stat
Java API 简单操作 演示连接,创建节点,获取节点信息。源码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.atguigu.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.rmi.server.ExportException;public class Demo1 { private static final String CONNECTSRING="127.0.0.1:2181" ; private static final String PATH="/root" ; private static final int TIMEOUT= 20 * 1000 ; public ZooKeeper startZK () throws IOException { return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { System.out.println("连接监听" ); } }); } public void stopZk (ZooKeeper zooKeeper) throws InterruptedException { if (zooKeeper!=null ){ zooKeeper.close(); } } public String createZNode (ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException { return zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public String getZNode (ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException { byte [] data = zooKeeper.getData(path, false , new Stat()); return new String(data); } public static void main (String[] args) throws Exception { Demo1 demo1 = new Demo1(); ZooKeeper zooKeeper = demo1.startZK(); if (zooKeeper.exists(PATH, false )==null ){ demo1.createZNode(zooKeeper, PATH, "test data" ); String zNode = demo1.getZNode(zooKeeper, PATH); System.out.println(zNode); }else { System.out.println("znode exists!!" ); } } }
监听器 在获取值的时候加入一个监听器,如果值发生变化就触发 triggerValue 函数。源码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 package com.atguigu.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;public class Watch1 { private static final String CONNECTSRING="127.0.0.1:2181" ; private static final String PATH="/root" ; private static final int TIMEOUT= 20 * 1000 ; private ZooKeeper zooKeeper; public ZooKeeper getZooKeeper () { return zooKeeper; } public void setZooKeeper (ZooKeeper zooKeeper) { this .zooKeeper = zooKeeper; } public ZooKeeper startZK () throws IOException { return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { System.out.println("连接监听" ); } }); } public void stopZk () throws InterruptedException { if (this .zooKeeper!=null ){ this .zooKeeper.close(); } } public String createZNode (String path, String data) throws KeeperException, InterruptedException { return this .zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public String getZNode (String path) throws KeeperException, InterruptedException { byte [] data = this .zooKeeper.getData(path, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { String newValue = null ; try { newValue = triggerValue(path); } catch (Exception e) { e.printStackTrace(); } System.out.println("newValue:" + newValue); } }, new Stat()); return new String(data); } private String triggerValue (String path) throws KeeperException, InterruptedException { byte [] data = this .zooKeeper.getData(path, false , new Stat()); return new String(data); } public static void main (String[] args) throws Exception { Watch1 watch1 = new Watch1(); watch1.setZooKeeper(watch1.startZK()); if (watch1.getZooKeeper().exists(PATH, false )==null ){ watch1.createZNode(PATH, "test data" ); String zNode = watch1.getZNode(PATH); System.out.println(zNode); }else { System.out.println("znode exists!!" ); } Thread.sleep(Integer.MAX_VALUE); } }
测试的时候,先将之前的 znode 节点删除,然后运行程序重新创建,创建后重新设置值,就能看到监听器监听到的变化,但是如果再次设置值的时候就监听不到了,原因是每个监听器只通知一次。
运行程序
查看监听器的输出
上面中的监听器通知一次后就结束了,如果一直监听某个节点呢?源码地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 package com.atguigu.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;public class Watch2 { private static final String CONNECTSRING="127.0.0.1:2181" ; private static final String PATH="/root" ; private static final int TIMEOUT= 20 * 1000 ; private ZooKeeper zooKeeper; public ZooKeeper getZooKeeper () { return zooKeeper; } public void setZooKeeper (ZooKeeper zooKeeper) { this .zooKeeper = zooKeeper; } public ZooKeeper startZK () throws IOException { return new ZooKeeper(CONNECTSRING, TIMEOUT, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { System.out.println("连接监听" ); } }); } public void stopZk () throws InterruptedException { if (this .zooKeeper!=null ){ this .zooKeeper.close(); } } public String createZNode (String path, String data) throws KeeperException, InterruptedException { return this .zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public String getZNode (String path) throws KeeperException, InterruptedException { byte [] data = this .zooKeeper.getData(path, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { String newValue = null ; try { triggerValue(path); } catch (Exception e) { e.printStackTrace(); } } }, new Stat()); String oldValue = new String(data); System.out.println("old value: " + oldValue); return oldValue; } private String triggerValue (String path) throws KeeperException, InterruptedException { byte [] data = this .zooKeeper.getData(path, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { try { triggerValue(path); } catch (Exception e) { e.printStackTrace(); } } }, new Stat()); String newValue = new String(data); System.out.println("newValue: " + newValue); return newValue; } public static void main (String[] args) throws Exception { Watch2 watch1 = new Watch2(); watch1.setZooKeeper(watch1.startZK()); if (watch1.getZooKeeper().exists(PATH, false )==null ){ watch1.createZNode(PATH, "test data" ); String zNode = watch1.getZNode(PATH); System.out.println(zNode); }else { System.out.println("znode exists!!" ); } Thread.sleep(Integer.MAX_VALUE); } }
上面程序只要一直改变 /root 节点的值,就会一直处罚监听器,并且在监听器中再次设置一个监听器,下次改变的时候就能发现。这样就实现了一直监听某个节点的需求。