2019独角兽企业重金招聘Python工程师标准>>>
curator-recipes模块提供了zookeeper的一些典型应用场景的使用参考。在curator中,事件监听的支持由curator-recipes模块提供。下面将对这些监听器进行介绍。
监听客户端连接状态
Curator客户端与zookeeper连接的过程其实是一个异步过程,Curator为我们提供了一个监听器监听连接的状态,根据连接的状态做相应的处理。
private CountDownLatch countDownLatch = new CountDownLatch(1);@Testpublic void testClientConnStateListener() throws InterruptedException {int retryIntervalMs = 1000;RetryPolicy retryPolicy = new RetryForever(retryIntervalMs);CuratorFramework testConnStateListenerClient = CuratorFrameworkFactory.builder().connectString(ZookeeperHelper.zkAddress).sessionTimeoutMs(ZookeeperHelper.sessionTimeout).retryPolicy(retryPolicy).build();//添加监听器testConnStateListenerClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {if (newState == ConnectionState.CONNECTED) {try {System.out.println("connected established");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}countDownLatch.countDown(); //释放锁} else {System.out.println("connection state : " + newState.name());}}});testConnStateListenerClient.start();//加锁,暂不往下执行countDownLatch.await();testConnStateListenerClient.close();}
NodeCache
可以监到当前节点数据的变化
@Testpublic void testNodeDataListener() throws Exception {String node_to_listen = "/listened_node";client.create().creatingParentContainersIfNeeded() //自动递归创建父节点.withMode(CreateMode.PERSISTENT).forPath(node_to_listen);NodeCache nodeCache = new NodeCache(client, node_to_listen, false);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("Node data is changed, new data: " +new String(nodeCache.getCurrentData().getData()));}});nodeCache.start();Thread.sleep(1000);client.setData().forPath(node_to_listen, "new data".getBytes());//更新节点的数据Thread.sleep(1000);nodeCache.close();client.delete().deletingChildrenIfNeeded().forPath(node_to_listen);}
PathChildrenCache
(1)可以监听到当前节点下的孩子节点的变化,但是孩子节点下面的孩子节点的事情不能监听。 (2)可以监听到的事件:节点创建、节点数据的变化、节点删除等
@Testpublic void testChildrenNodeListener() throws Exception {String parent_node = "/parent_node";String child_node = parent_node + "/child";PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parent_node, false);PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case CHILD_ADDED: //子节点被添加System.out.println("CHILD_ADDED: " + event.getData().getPath());break;case CHILD_REMOVED: //子节点被删除System.out.println("CHILD_REMOVED: " + event.getData().getPath());break;case CHILD_UPDATED: //子节点数据变化System.out.println("CHILD_UPDATED: " + event.getData().getPath());break;default:break;}}};pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);pathChildrenCache.start();client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(child_node);Thread.sleep(1000);client.setData().forPath(child_node, "new data".getBytes());//更新节点的数据Thread.sleep(1000);pathChildrenCache.close();client.delete().deletingChildrenIfNeeded().forPath(parent_node);}
TreeCache
(1)可以监听到指定节点下所有节点的变化。比如当前节点是”/node”,添加了TreeCacheListener后,不仅可以监听节点 "/node/child" 节点的变化,还能监听孙子节点 "/node/child/grandson"的变化。
(2)可以监听到的事件:节点创建、节点数据的变化、节点删除等
@Testpublic void testTreeListener() throws Exception {String parent_path = "/tree_node_parent";client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(parent_path);Thread.sleep(1000);TreeCache treeCache = new TreeCache(client, parent_path);treeCache.start();treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {switch (event.getType()) {case NODE_ADDED:System.out.println("TreeNode added: " + event.getData().getPath() + " , data: " + new String(event.getData().getData()));break;case NODE_UPDATED:System.out.println("TreeNode updated: " + event.getData().getPath() + " , data: " + new String(event.getData().getData()));break;case NODE_REMOVED:System.out.println("TreeNode removed: " + event.getData().getPath());break;default:break;}}});//创建孩子节点String child_path = parent_path + "/child";client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(child_path);Thread.sleep(1000);//创建孙子节点String grandson_path = child_path + "/grandson";client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(grandson_path);Thread.sleep(1000);//更新孙子节点数据client.setData().forPath(grandson_path, "new_data".getBytes());Thread.sleep(1000);//删除孙子节点client.delete().deletingChildrenIfNeeded().forPath(grandson_path);Thread.sleep(1000);treeCache.close();client.delete().deletingChildrenIfNeeded().forPath(parent_path);}