<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.6</version> </dependency>
package com; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import; public class BaseZookeeper implements Watcher{ private ZooKeeper zookeeper; /** * 超时时间 */ private static final int SESSION_TIME_OUT = 2000; private CountDownLatch countDownLatch = new CountDownLatch(1); @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { System.out.println("Watch received event"); countDownLatch.countDown(); } } /**连接zookeeper * @param host * @throws Exception */ public void connectZookeeper(String host) throws Exception{ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); countDownLatch.await(); System.out.println("zookeeper connection success"); } /** * 创建节点 * @param path * @param data * @throws Exception */ public String createNode(String path,String data) throws Exception{ return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 获取路径下所有子节点 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public List<String> getChildren(String path) throws KeeperException, InterruptedException{ List<String> children = zookeeper.getChildren(path, false); return children; } /** * 获取节点上面的数据 * @param path 路径 * @return * @throws KeeperException * @throws InterruptedException */ public String getData(String path) throws KeeperException, InterruptedException{ byte[] data = zookeeper.getData(path, false, null); if (data == null) { return ""; } return new String(data); } /** * 设置节点信息 * @param path 路径 * @param data 数据 * @return * @throws KeeperException * @throws InterruptedException */ public Stat setData(String path,String data) throws KeeperException, InterruptedException{ Stat stat = zookeeper.setData(path, data.getBytes(), -1); return stat; } /** * 删除节点 * @param path * @throws InterruptedException * @throws KeeperException */ public void deleteNode(String path) throws InterruptedException, KeeperException{ zookeeper.delete(path, -1); } /** * 获取创建时间 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getCTime(String path) throws KeeperException, InterruptedException{ Stat stat = zookeeper.exists(path, false); return String.valueOf(stat.getCtime()); } /** * 获取某个路径下孩子的数量 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{ int childenNum = zookeeper.getChildren(path, false).size(); return childenNum; } /** * 关闭连接 * @throws InterruptedException */ public void closeConnection() throws InterruptedException{ if (zookeeper != null) { zookeeper.close(); } } }
public class Demo { public static void main(String[] args) throws Exception { BaseZookeeper zookeeper = new BaseZookeeper(); zookeeper.connectZookeeper(""); List<String> children = zookeeper.getChildren("/"); System.out.println(children); } }
static Logger logg = LoggerFactory.getLogger(ZKApi.class); private static final String zkServerPath = ""; private static final String zkServerPath = ""; private static final Integer timeOut = 5000; private static Stat stat = new Stat();
public void process(WatchedEvent event) { try { if (event.getType() == Event.EventType.NodeDataChanged) { ZooKeeper zk = null; zk = ZKApi.getZkConnect(); byte[] resByt = new byte[0]; resByt = zk.getData("/test1", false, stat); String resStr = new String(resByt); System.out.println("更改后的值:" + resStr); System.out.println("版本号的变化:" + stat.getVersion()); System.out.println("-------"); countDown.countDown(); }else if(event.getType() == Event.EventType.NodeChildrenChanged){ System.out.println("NodeChildrenChanged"); ZooKeeper zk = null; zk = ZKApi.getZkConnect(); List<String> srcChildList = zk.getChildren(event.getPath(), false); for (String child:srcChildList){ System.out.println(child); } countDown.countDown(); }else if(event.getType() == Event.EventType.NodeCreated){ countDown.countDown(); }else if (event.getType() == Event.EventType.NodeCreated){ countDown.countDown(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
/* @param connectString zk连接地址以及端口号 格式如:,如果多个zk,则使用逗号分隔 @param sessionTimeout session超时时间 单位ms @param watcher 监听器,使用watcher必须实现接口Watcher实现process方法 @sessionId session id 可以用作恢复回话的参数 @sessionPassword session password 可以用作恢复回话的参数 @canbeReadOnly zk3.4添加的 只读模式 * */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
public static ZooKeeper getZkConnect() throws IOException { ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi()); logg.debug("连接状态:{}", zk.getState()); return zk; }
DEBUG [main] - zookeeper.disableAutoWatchReset is false DEBUG [main] - 连接状态:CONNECTING
public static void recoveryConnect() throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi()); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); logg.debug("开始连接服务器 . . ."); logg.debug("连接状态:{}",zooKeeper.getState()); new Thread().sleep(1000 ); logg.debug("开始重连 . . . "); ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd); logg.debug("重连状态:{}",zooSession.getState()); new Thread().sleep(200); logg.debug("重连状态:{}",zooSession.getState()); }
DEBUG [main] - 开始连接服务器 . . . DEBUG [main] - 连接状态:CONNECTING DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/ Will not attempt to authenticate using SASL (unknown error) INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/, initiating session DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/ INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/, sessionid = 0x10000ea59aa0011, negotiated timeout = 5000 DEBUG [main] - 开始重连 . . . INFO [main] - Initiating client connection, connectString= sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden> DEBUG [main] - 重连状态:CONNECTING DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/ Will not attempt to authenticate using SASL (unknown error) INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/, initiating session DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/ INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000 DEBUG [main] - 重连状态:CONNECTED
/** @param path 创建的节点路径 @param data 节点数据 @param acl 权限列表, @param createMode 指定之创建节点的类型 @param cb 异步调用方法 @param ctx 回调对象 */ public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
public static void createZkNode1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); String result = zk.create("/test1", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个/test的持续节点 System.out.println(result); //输出/test1 public static void createZkNode2() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); String ctx = "{'create': 'success'}"; zk.create("/test2", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new CreateCallBack() ,ctx); new Thread().sleep(2000);//需要暂停一会,否则创建失败 }
public Stat setData(final String path, byte data[], int version) public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
public static void setZkNode1() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0); System.out.println(stat.getVersion()); } public static void setZkNode2() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); String ctx = "{'modify': 'success'}"; zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx); new Thread().sleep(1000);//必须加上,否则回掉不成功 }
public void delete(final String path, int version) public void delete(final String path, int version, VoidCallback cb, Object ctx)
public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); zk.delete("/test1",1);//不能够删除子节点 } public static void deleteZkNode2() throws IOException, InterruptedException { ZooKeeper zk = getZkConnect(); String ctx = "{'delete': 'success'}"; zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能够删除子节点 new Thread().sleep(1000);//必须加上,否则回掉不成功 }
public byte[] getData(String path, boolean watch, Stat stat) public byte[] getData(final String path, Watcher watcher, Stat stat) public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx) public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public static CountDownLatch countDown = new CountDownLatch(1); public static void selectData1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); byte[] data = zk.getData("/test1", true, stat); String s = new String(data); System.out.println("value: "+s); countDown.await(); }
if (event.getType() == Event.EventType.NodeDataChanged) { ZooKeeper zk = null; zk = ZKApi.getZkConnect(); byte[] resByt = new byte[0]; resByt = zk.getData("/test1", false, stat); String resStr = new String(resByt); System.out.println("更改后的值:" + resStr); System.out.println("版本号的变化:" + stat.getVersion()); System.out.println("-------"); countDown.countDown();
public List<String> getChildren(final String path, Watcher watcher) public List<String> getChildren(String path, boolean watch) public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx) public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx) public List<String> getChildren(final String path, Watcher watcher, Stat stat) public List<String> getChildren(String path, boolean watch, Stat stat) public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx) public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
public static CountDownLatch countDown = new CountDownLatch(1); public static void selectchildData1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); List<String> srcChildList = zk.getChildren("/test", true, stat); for (String child:srcChildList){ System.out.println(child); } countDown.await(); }
if(event.getType() == Event.EventType.NodeChildrenChanged){ System.out.println("NodeChildrenChanged"); ZooKeeper zk = null; zk = ZKApi.getZkConnect(); List<String> srcChildList = zk.getChildren(event.getPath(), false); for (String child:srcChildList){ System.out.println(child); }
public static void selectchildData2() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); String ctx = "{'selectChild': 'success'}"; zk.getChildren("/test",false,new ChildrenCallback(),ctx); new Thread().sleep(1000); }
public static void selectchildData3() throws IOException, KeeperException, InterruptedException{ getChild("/"); } public static void getChild(String path) throws IOException, KeeperException, InterruptedException { System.out.println(path); ZooKeeper zk = getZkConnect(); List<String> childrenList = zk.getChildren(path, false, stat); if(childrenList.isEmpty() || childrenList ==null) return; for(String s:childrenList){ if(path.equals("/")) getChild(path+s); else { getChild(path+"/"+s); } } }
public static void existNode() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); Stat stat = zk.exists("/ff", true); System.out.println(stat); } //输出null则不存在
public static void oneSelfACL() throws Exception { ZooKeeper zk = getZkConnect(); ArrayList<ACL> acls = new ArrayList<ACL>(); // zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可访问 Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456")); Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456")); // Id ipId = new Id("ip","");ip设置 // acls.add(new ACL(ZooDefs.Perms.ALL,id1)); acls.add(new ACL(ZooDefs.Perms.ALL,id1)); acls.add(new ACL(ZooDefs.Perms.DELETE,id2)); //注册过的用户必须通过addAuthInfo才可以操作节点 zk.addAuthInfo("digest","id1:123456".getBytes()); zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT); }
Java多线程Thread , Future , Callable ,
本文主要介绍了Java多线程Thread , Future , Callable , FutureTask的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2023-03-03IDEA提示:Boolean method ‘xxx‘ is always&nb
这篇文章主要介绍了IDEA提示:Boolean method ‘xxx‘ is always inverted问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-08-08java中实现对象排序的两种方法(Comparable,Comparator)