大数据教程(3.8):zookeeper的java客户端API简介及监听原理

原创
2018/08/05 12:32
阅读数 677

    之前章节介绍了zookeeper集群的自动化启动脚本,本节博主将带大家简单的使用zookeeper的java客户端API,从而了解其使用。

    zookeeper监听器原理:

   zookeeper使用步骤:

(1)新建maven项目em-zookeeper-test

(2)解压之前在linux系统中安装集群的zookeeper安装包到E:\zookeeper-3.4.13\,打开E:\zookeeper-3.4.13\dist-maven目录查看pom文件

(3)按照以上信息在em-zookeeper-test项目的pom中加入zookeeper服务器/客户端(目前客户端和服务端是合在一起的)依赖。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.empire.zookeeper.test</groupId>
	<artifactId>em-zookeeper</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>em-zookeeper</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.13</version>
		</dependency>

	</dependencies>
	   <build>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.21.0</version>
                <inherited>true</inherited>
                <configuration>
                    <!-- <skip>true</skip> -->
                    <forkMode>once</forkMode>  
                    <argLine>-Dfile.encoding=UTF-8</argLine>   
                </configuration>
            </plugin>
            <!-- 拷贝依赖的jar包到lib目录 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- ${project.build.directory} 构建目录,缺省为target -->
                            <outputDirectory>
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
           <!--  <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>
                    <classesDirectory>
                    </classesDirectory>
                </configuration>
            </plugin> -->
        </plugins>
    </build>
</project>

(4)zookeeperzookeeper的java客户端API使用案例

package com.empire.zookeeper.test.em_zookeeper;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

public class SimpleZkClient {

	private static final String connectString = "centos-aaron-07:2181,centos-aaron-08:2181,centos-aaron-16:2181";
	private static final int sessionTimeout = 1000;

	private CountDownLatch connectedSemaphore = new CountDownLatch(1); 
	ZooKeeper zkClient = null;

	@Before
	public void init() throws Exception {
		
		zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			public void process(WatchedEvent event) {
				// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
				System.out.println(event.getType() + "---" + event.getPath());
				try {
					zkClient.getChildren("/", true);
					//Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
					connectedSemaphore.countDown(); 
				} catch (Exception e) {
				}
			}
		});
		//Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted. 
		connectedSemaphore.await();
		//Thread.currentThread().sleep(10000);

	}

	/**
	 * 数据的增删改查
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */

	// 创建数据节点到zk中
	@Test
	public void testCreate() throws KeeperException, InterruptedException {
		// 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型
		String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		//上传的数据可以是任何类型,但都要转成byte[]
	}

	//判断znode是否存在
	@Test	
	public void testExist() throws Exception{
		Stat stat = zkClient.exists("/eclipse", false);
		System.out.println(stat==null?"not exist":"exist");
		
		
	}
	
	// 获取子节点
	@Test
	public void getChildren() throws Exception {
		List<String> children = zkClient.getChildren("/", true);
		for (String child : children) {
			System.out.println(child);
		}
		//Thread.sleep(Long.MAX_VALUE);
	}

	//获取znode的数据
	@Test
	public void getData() throws Exception {
		
		byte[] data = zkClient.getData("/eclipse", false, null);
		System.out.println(new String(data));
		
	}
	
	//删除znode
	@Test
	public void deleteZnode() throws Exception {
		
		//参数2:指定要删除的版本,-1表示删除所有版本
		zkClient.delete("/eclipse", -1);
		
		
	}
	//删除znode
	@Test
	public void setData() throws Exception {
		
		zkClient.setData("/app1", "imissyou angelababy".getBytes(), -1);
		
		byte[] data = zkClient.getData("/app1", false, null);
		System.out.println(new String(data));
		
	}
	
	
}

(5)日志配置(zookeeper需要打印日志)

### set log levels ###
log4j.rootLogger = debug ,  stdout ,  D ,  E

### \u8F93\u51FA\u5230\u63A7\u5236\u53F0 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern =  %d{ABSOLUTE} %5p %c{1}:%L - %m%n

### \u8F93\u51FA\u5230\u65E5\u5FD7\u6587\u4EF6 ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = logs/log.log
log4j.appender.D.Append = true
## \u8F93\u51FADEBUG\u7EA7\u522B\u4EE5\u4E0A\u7684\u65E5\u5FD7
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### \u4FDD\u5B58\u5F02\u5E38\u4FE1\u606F\u5230\u5355\u72EC\u6587\u4EF6 ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
## \u5F02\u5E38\u65E5\u5FD7\u6587\u4EF6\u540D
log4j.appender.E.File = logs/error.log
log4j.appender.E.Append = true
## \u53EA\u8F93\u51FAERROR\u7EA7\u522B\u4EE5\u4E0A\u7684\u65E5\u5FD7!!!
log4j.appender.E.Threshold = ERROR 
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

(6)问题思考(在没加锁之前很容易报错)

org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /eclipse
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
	at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1111)
	at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1139)
	at com.empire.zookeeper.test.em_zookeeper.SimpleZkClient.testExist(SimpleZkClient.java:57)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)

该问题主要时由于zookeeper的内部实现机制产生的,因为在项目中创建zookeeper连接时,同时创建了connect、listener两个线程,connet用于数据发送传递,而listener用于监听zookeeper的下发通知,且listener线程是作为connect线程的守护线程工作的。由于zookeeper在创建连接时会花非很多时间以及资源,故在创建过程中,就已经返回了连接对象,该连接对象需要在zookeeper向客户端发送了连接创建成功的通知SyncConnected事件后才可使用,所以此处我们可以加一把锁CountDownLatch来实现业务代码暂停等待。

 

    最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

展开阅读全文
打赏
2
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
2
分享
返回顶部
顶部