一:事件监听
原生zookeeper的事件监听采用Watcher实现,不过Watcher监听是一次性的,如果需要继续监听该事件,必须重新注册。Curator中采用cache来封装了对事件的监听,包括监听节点,监听子节点等,下面分别做介绍
1.1 NodeCache
NodeCache主要用来监听节点本身的变化,当节点的状态发生变更后,回调NodeCachaListener
public interface NodeCacheListener{ /** * Called when a change has occurred */ public void nodeChanged() throws Exception; }
看一个例子:
public class NodeCacheExample { private static final String PATH = "/nodeCache"; private static CountDownLatch latch = new CountDownLatch(1); static NodeCache nodeCache; static CuratorFramework client; static { client = CuratorFrameworkFactory.newClient( "host:2181", 5000, 5000, new ExponentialBackoffRetry( 1000, 3)); client.start(); } public static void initCache() throws Exception { client.create().forPath(PATH); client.setData().forPath(PATH, "节点的初始值".getBytes()); nodeCache = new NodeCache(client, PATH); EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH); ensurePath.ensure(client.getZookeeperClient()); //设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中 nodeCache.start(true); startCache(nodeCache); } private static void startCache(final NodeCache cache) throws Exception { ChildData data = cache.getCurrentData(); System.out.println("第一次启动获取到的内容:" + new String(data.getData())); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("NodeCache changed,data is " + new String(cache.getCurrentData().getData())); latch.countDown(); } }); Thread.sleep(2000); if(client.checkExists().forPath(PATH) != null){ System.out.println("node is exist,准备给节点设置新的内容"); client.setData().forPath(PATH, "节点新内容".getBytes()); } } public static void main(String[] args) throws Exception { initCache(); latch.await(); } }
运行结果如下:
第一次启动获取到的内容:节点的初始值 node is exist,准备给节点设置新的内容 NodeCache changed,data is 节点新内容
1.2 PathChildrenCache
PathChildrenCache主要用来监听子节点,它有几个构造函数,参数最多的是下面这个
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) { this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }
参数解释:
client:一个Curator客户端实例
path:监听路径
cacheData:boolean类型,如果是true,那么curator客户端在请求服务端的时候,会将监听节点的内容保存起来。而zookeeper原生的watcher监听是不会返回节点的内容的,只会返回节点状态,路径等
dataIsCompressed:表示是否对数据进行压缩
executorService:说明可以使用线程池来处理事件通知,当子节点数据发生变化时,会回调
public interface PathChildrenCacheListener { /** * Called when a change has occurred * * @param client the client * @param event describes the change * @throws Exception errors */ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception; }
并且不会对二级节点进行监听,来看一个例子
import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * * Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, * 会包含最新的子节点, 子节点的数据和状态 * * @author tanjie * */ public class PathCacheExample { private static final String PATH = "/cache"; static PathChildrenCache cache; static CuratorFramework client; static CountDownLatch latch = new CountDownLatch(1); static CountDownLatch coutCountDownLatch = new CountDownLatch(5); static { client = CuratorFrameworkFactory.newClient( "host:2181", 5000, 5000, new ExponentialBackoffRetry( 1000, 3)); client.start(); } private static void startCache() throws Exception { cache = new PathChildrenCache(client, PATH, true); cache.start(); // 给当前节点创建子节点 for (int i = 1; i <= 5; i++) { String newPath = PATH + "/child_" + i; String childNodeName = "child_" + i; if (client.checkExists().forPath(PATH) != null) { client.create().creatingParentsIfNeeded().forPath(newPath, childNodeName.getBytes()); } coutCountDownLatch.countDown(); } coutCountDownLatch.await(); addlistener(cache); for (final ChildData childData : cache.getCurrentData()) { System.out.println("pathChildCache:" + "路径:" + childData.getPath() + ",内容:" + new String(childData.getData())); } System.out.println("对父节点进行设置新值不会有通知事件发生........start"); client.setData().forPath(PATH,"哈哈".getBytes()); System.out.println("对父节点设置值 end.........."); //改变子节点的数据 System.out.println("准备删除子节点........start"); client.delete().forPath( PATH + "/child_1"); System.out.println("删除子节点........end,会有节点事件通知返回"); Thread.sleep(2000); //改变二级节点的内容 for(int j=1;j<=2;j++){ String newPath = PATH + "/child_2/" + j; String childNodeName = "child_2" + j; if (client.checkExists().forPath(PATH) != null) { client.create().forPath(newPath, childNodeName.getBytes()); } } addlistener(cache); System.out.println("准备删除二级节点......start"); client.delete().forPath( PATH + "/child_2/" + 1); System.out.println("删除二级节点......end..不会有事件监听返回"); latch.countDown(); } private static void addlistener(final PathChildrenCache cache) { final PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorClient, PathChildrenCacheEvent event) throws Exception { System.out.println("childNode path:" + event.getData().getPath() + ",childNode data : " + new String(event.getData().getData())); } }; cache.getListenable().addListener(pathChildrenCacheListener); } public static void main(String[] args) throws Exception { startCache(); latch.await(); } }
运行结果如下:
pathChildCache:路径:/cache/child_1,内容:child_1 pathChildCache:路径:/cache/child_2,内容:child_2 pathChildCache:路径:/cache/child_3,内容:child_3 pathChildCache:路径:/cache/child_4,内容:child_4 pathChildCache:路径:/cache/child_5,内容:child_5 对父节点进行设置新值不会有通知事件发生........start 对父节点设置值 end.......... 准备删除子节点........start childNode path:/cache/child_1,childNode data : child_1 删除子节点........end,会有节点事件通知返回 准备删除二级节点......start 删除二级节点......end..不会有事件监听返回
1.3 TreeCache
TreeCache即能监听节点也能监听节点的子节点变更
public class TreeCacheExample { private static final String PATH = "/treeNodeCache"; static TreeCache treeCache; static CuratorFramework client; static { client = CuratorFrameworkFactory.newClient( "host:2181", 5000, 5000, new ExponentialBackoffRetry( 1000, 3)); client.start(); } public static void initCache() throws Exception { treeCache = new TreeCache(client, PATH); client.create().forPath(PATH); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { if(null != event.getType()){ System.out.println("节点状态发送了改变.内容为:" + new String(event.getData().getData())); } } }); //添加子节点 for(int i=1;i<=5;i++){ String newPath = PATH + "/child_" + i; client.create().forPath(newPath,("child_"+i).getBytes()); } Thread.sleep(3000); client.setData().forPath(PATH+ "/child_2", "我是给第二个子节点赋值".getBytes()); if(null != client.checkExists().forPath(PATH)){ client.setData().forPath(PATH,"我是来对父节点赋值的".getBytes()); } Thread.sleep(1000); client.setData().forPath(PATH,"我是来对父节点第二次赋值的".getBytes()); } public static void main(String[] args) throws Exception { initCache(); Thread.sleep(Integer.MAX_VALUE); } }
运行结果如下:
节点状态发送了改变.内容为:child_1 节点状态发送了改变.内容为:child_2 节点状态发送了改变.内容为:child_3 节点状态发送了改变.内容为:child_4 节点状态发送了改变.内容为:child_5 节点状态发送了改变.内容为:我是给第二个子节点赋值 节点状态发送了改变.内容为:我是来对父节点赋值的 节点状态发送了改变.内容为:我是来对父节点第二次赋值的
可以看到,只有监听的节点状态发送了变化,监听事件就会执行回调,并且也能监听器父节点且不用反复注册监听
相关推荐
ZooKeeper原理与实战 PPT内容
zookeeper 使用 Curator 进行增、删、改、查、监听、分布式锁
【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper # netty-redis-zookeeper 【书籍学习】Netty、Redis、Zookeeper高并发实战
Marathon+Mesos+Zookeeper+Docker实战,强烈推荐下载学习
zookeeper节点数据的监听与读写操作
netty-redis-zookeeper高并发实战学习-netty-redis-zookeeper
《Netty、Redis、ZooKeeper高并发实战》-netty-redis-zookeeper
zookeeper实战,讲解zookeeer原理和Curator架构
netty、redis、zookeeper高并发实战-源代码
非常强的一套Zookeeper集群实战,包含了全套的学习代码,学习笔记还有...内容从Zookeeper入门教学,本地安装,Zookeeper集群实战,项目需求,Zookeeper企业面试实战,Zookeeper算法实战,Zookeeper核心源码分析等内容。
Zookeeper实战,不错!
ZooKeeper是以Fast Paxos算法为基础的,Paxos 算法存在活锁的问题,即当有多个proposer交错提交时,有可能互相排斥导致没有一个proposer能提交成功,而Fast Paxos作了一些优化,通过选举产生一个leader,只有leader...
Linux Centos7 环境搭建Docker部署Zookeeper服务实战
zookeeper 事件监听机制 zookeeper 集群搭建 一致性协议:zab协议 zookeeper的leader选举 observer角色及其配置 zookeeperAPI连接集群 zookeeper 开源客户端curator介绍 zookeeper四字监控命令 zookeeper图形化的...
本文深入探讨了Zookeeper在分布式系统中的关键应用,特别是在实现分布式锁(包括非公平锁、公平锁和共享锁)、Leader选举和Spring Cloud Zookeeper注册中心等方面的实战应用。通过具体案例,我们理解了Zookeeper的...
今天小编就为大家分享一篇关于zookeeper监听器原理的详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
zookeeper学gn习过程自己总结的基本概念,运行原理,使用方法和应用场景等笔记信息,对于初学者来说可以很快入门zookeeper,上手实战
大数据之Zookeeper视频 欢迎下载
java学习路线 第一阶段:Java核心基础 此阶段为入职java必备知识,必须牢牢掌握,把基础砸实是学习的根基,会让后面的内容学习变得游刃有余 2. Java设计模式 Java程序员核心技术必备,设计模式,快速通透!...