文档章节

Zookeeper 之二、实现Rmi服务的高可用

captainliu
 captainliu
发布于 2016/09/12 23:37
字数 1250
阅读 20
收藏 1

1、先看不适用zookeeper Rmi服务的实现

首先定义三个Rmi服务端代码类:

HelloService  --》接口,继承remote ;其中有为实现的方法
HelloServiceImpl  --》HelloService  的实现类;实现  HelloService  中的方法
RmiServer   ---》将 HelloServiceImpl 注册为Rmi远程调用方法。

代码: 

package com.bjsxt.server;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface HelloService extends Remote{
	String sayHello(String name) throws RemoteException;
}
package com.bjsxt.server;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class HelloServiceImpl extends UnicastRemoteObject implements HelloService  {

	protected HelloServiceImpl() throws RemoteException {
		super();
	}

	@Override
	public String sayHello(String name) throws RemoteException {
		return String.format("hello  %s",name);
	}

}
package com.bjsxt.server;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

public class RmiServer {
	public static void main(String[] args) throws  Exception {
		int port = 8888;
		String url = "rmi://localhost:8888/demo.zookeeper.remoting.server.HelloServiceImpl";
		LocateRegistry.createRegistry(port);
		Naming.rebind(url	, new HelloServiceImpl());
	}
}

Rmi客户端调用:

客户端比较简单,只需要根据服务端的url地址调用就可以了

代码:

package com.bjsxt.client;

import java.rmi.Naming;
import java.rmi.RemoteException;

import com.bjsxt.server.HelloService;

public class RmiClient {
	
	public static void main(String[] args) throws Exception {
//服务端对应的url
		String url ="rmi://localhost:8888/demo.zookeeper.remoting.server.HelloServiceImpl";
		HelloService helloService = (HelloService) Naming.lookup(url);
	
		while(true){
			String result = helloService.sayHello("haijing");
			System.out.println(result);
		}
	
	}

}

 

2、由于现实生活中可能存在Rmi服务的不稳定性,所以要启动多个Rmi服务共客户端调用,但是这心Rmi由谁来管理调用呢,这时zookeeper就出现了,zookeeper可以实现Rmi服务url的管理。(注:这里要先启动zookeeper集群服务)

1)、看流程图:

这就是zookeeper管理rmi服务的流程图。

2)服务端代码:

在实现代码之前要导入相应的jar包

这里zookeeper.jar是必须的,其他的可以不用

代码:

Constant  -->静态常量接口

ServiceProvider  -->服务的提供类,相关方法的实现

Server  --->注册服务的server

package com.bjsxt.util;

public interface Constant {
	
	 String ZK_CONNECTION_STRING = "192.168.47.21:2181,192.168.47.22:2181,192.168.47.23:2181";
	    int ZK_SESSION_TIMEOUT = 5000;
	    String ZK_REGISTRY_PATH = "/registry";
	    String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";

}
package com.bjsxt.server;

import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bjsxt.util.Constant;

/**
 * 本类的作用是 注册rmi服务,并将服务地址放在zookeeper上 ;消费者在消费时
 * @author root
 *
 */
public class ServiceProvider {
	
	  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
	  
	  //CountDownLatch 中配置的参数是当调用countDown 方法时 参数 -1 当参数变为 0 时 就可以 跳过 await方法 否则 就会阻塞,线程停止运行;
	 CountDownLatch  latch = new CountDownLatch(1);
	
	 /**
	  * 注册服务并建立zookeeper临时节点的方法,入口
	  * @param remote
	  * @param host
	  * @param port
	  */
	public void  publish(Remote remote, String host, int port) {
		 String url = publishService(remote,host,port);
		 if(url!=null){
			   ZooKeeper  zk = connectServer();
			   if(zk !=null){
				   createNode(zk ,url);
			   }
		 }
	}



	/**
	 * 发布rmi服务
	 * @param remote
	 * @param host
	 * @param port
	 * @return
	 */
	private String publishService(Remote remote, String host, int port) {
		String url =null;
		
		try {
			url = String.format("rmi://%s:%d/%s", host,port,remote.getClass().getName());
			LocateRegistry.createRegistry(port);
			 Naming.rebind(url, remote);
		} catch (RemoteException e) {
				e.printStackTrace();
		} catch (MalformedURLException e) {
				e.printStackTrace();
		}
		
		return url;
	}



	/**
	 * 连接zookeeper服务器
	 * @return
	 */
	private ZooKeeper connectServer(){
		ZooKeeper zk = null;
		  try {
			zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher(){

				@Override
				public void process(WatchedEvent event) {
					if(event.getState() == Event.KeeperState.SyncConnected){
						latch.countDown();
						
					}
					
				}
				  
			  });
	
		  latch.await();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		  
		  return zk ;
		
	}
	
	/**
	 * 在zookeeper上创建节点
	 * @param zk
	 * @param url
	 */
	private void createNode(ZooKeeper zk, String url) {
		try {
		 
			byte[] data = url.getBytes();
//			ZooDefs.Ids.OPEN_ACL_UNSAFE  这个是最大权限,所有人都可以来修改
			String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			  LOGGER.debug("create zookeeper node ({} => {})", path, url);
		} catch (KeeperException | InterruptedException e) {
			 
			e.printStackTrace();
		}
		
	}
	
}

 

package com.bjsxt.server;

import java.rmi.RemoteException;

public class Server {
	
	public static void main(String[] args) throws Exception {
		String host = "192.168.1.148";
		int port = Integer.parseInt("11233");
		ServiceProvider serviceProvider = new ServiceProvider();
		
		HelloServiceImpl service = new HelloServiceImpl();
		serviceProvider.publish(service,host,port);
		Thread.sleep(Long.MAX_VALUE);
	}

}

执行server中的main方法会报错说  registry 目录不存在,需要手动创建

手动创建

此时再启动 Server 就不报错了;观察zookeeper中的变化

 

rmi服务已经注册在zookeeper中了。

客户端代码

Client  -->启动客户端的入口

ServiceConsumer  -->相关方法的实现

package com.bjsxt.client;

import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bjsxt.server.HelloService;
import com.bjsxt.util.Constant;

public class ServiceConsumer {
	
	 
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
 
    
    // 用于等待 SyncConnected 事件触发后继续执行当前线程
    private CountDownLatch latch = new CountDownLatch(1);
 
//    放置url的容器
    private volatile List<String> urlList = new ArrayList<>(); 

    public  ServiceConsumer() throws KeeperException, InterruptedException {
		ZooKeeper zk = connectServer();
		if(zk!=null){
			watchNode(zk);
		}
	}
    
    /**
     * 观察节点是否有变化,并将url放入容器中
     * @param zk
     */
	private void watchNode(final ZooKeeper zk)   {
		try {
			List<String> nodelist = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher(){

				@Override
				public void process(WatchedEvent event) {
					if(event.getType()==Event.EventType.NodeChildrenChanged){
						watchNode(zk);
					}
					
				}
				
			});
			
			ArrayList<String> datalist = new ArrayList<String>();
			for (String node : nodelist) {
				byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH+"/"+node, false, null);
				datalist.add(new String(data));
			}
			
			urlList =datalist;// 更新最新的 RMI 地址
		} catch (KeeperException | InterruptedException e) {
			e.printStackTrace();
		}
		
	}
	public <T extends Remote> T lookup() {
		T service = null;
		int size = urlList.size();
		if (size>0) {
			String url;
			if (size==1) {
				url = urlList.get(0);
				
			}else {
				url = urlList.get(ThreadLocalRandom.current().nextInt(size));
			}
			service = lookupService(url);
		}
		return service;
	}
	
	
	/**
	 * 查找rmi远程服务对象
	 * @param url
	 * @return
	 */
    @SuppressWarnings("unchecked")
	private <T> T lookupService(String url) {
    	
    	T remote = null;
    	try {
			remote = (T) Naming.lookup(url);
		} catch (MalformedURLException | RemoteException | NotBoundException e) {
				if ( e instanceof ConnectException) {
					if (urlList.size() !=0) {
						url = urlList.get(0);
						return lookupService(url);
					}
				}
				LOGGER.error("", e);
		}
		return remote;
	}

	// 连接 ZooKeeper 服务器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 唤醒当前正在执行的线程
                    }
                }
            });
            latch.await(); // 使当前线程处于等待状态
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }
}

 

package com.bjsxt.client;

import java.rmi.RemoteException;

import org.apache.zookeeper.KeeperException;

import com.bjsxt.server.HelloService;

public class Client {
	
	public static void main(String[] args) throws RemoteException, InterruptedException, KeeperException {
		ServiceConsumer consumer = new ServiceConsumer();
//		zookeeper测试
		
		while (true) {
			HelloService helloService = consumer.lookup();
			String sayHello = helloService.sayHello("haijing");
			System.out.println(sayHello);
			Thread.sleep(3000);
			
		}
	}

}

 

启动client就可以了  ,这里的server可以更换端口启用多个

© 著作权归作者所有

captainliu
粉丝 11
博文 106
码字总数 83678
作品 0
昌平
程序员
私信 提问
使用 RMI + ZooKeeper 实现远程调用框架

在 Java 世界里,有一种技术可以实现“跨虚拟机”的调用,它就是 (Remote Method Invocation,远程方法调用)。例如,服务A 在 JVM1 中运行,服务B 在 JVM2 中运行,服务A 与 服务B 可相互进...

黄勇
2014/11/15
0
48
基于zookeeper的远程方法调用(RMI)的实现

采用zookeeper的命令服务,采用不同的目录结构存储不同模块不同服务的rmi的url,使用key来对应不同的服务。同时采用zookeeper解决了单点问题。 当有两个相同的服务注册时,因为采用的是临时有...

蔡少东
2015/04/02
0
0
zookeeper、dubbo、kafka随笔

1 zookeeper如何实现高可用 1 zookeeper 多台构成集群实现高可用,有三种角色群首(leader),追随者(follower),观察者(observer)。 Leader作为整个ZooKeeper集群的主节点,负责响应所有...

独一无二zz
2018/06/28
0
0
分布式锁与实现(二)基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包...

rechardchensir
2018/10/08
0
0
ZooKeeper教程资源收集(简介/原理/示例/解决方案)

菩提树下的杨过: ZooKeeper 笔记(1) 安装部署及hello world ZooKeeper 笔记(2) 监听数据变化 ZooKeeper 笔记(3) 实战应用之【统一配置管理】 ZooKeeper 笔记(4) 实战应用之【消除单点故障】...

easonjim
2017/09/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字由于数字2在数组中出现了5次,超过数组长度的一半,因此输出2。如果不存在则输出0。

import java.util.Arrays; public class Solution { public int MoreThanHalfNum_Solution(int [] array) { Arrays.sort(array); int count=0; for(int i=0;i<array.le......

南桥北木
17分钟前
0
0
关于FLAG_ACTIVITY_NEW_TASK的使用

参考文章: https://blog.csdn.net/u010389391/article/details/78558475 Context调用startActivity, 有部分情况会报出如下错误: Caused by: android.util.AndroidRuntimeException: Calli......

Gemini-Lin
33分钟前
0
0
Python开发工具:Webware for Python

原文来之:https://www.oschina.net/p/webware+for+python 前言 Webware for Python 是一组 Python 包和工具用来开发面向对象的 Web 应用。良好的设计模式,包含一个快速的应用服务器、Servl...

A_裙232550246
41分钟前
0
0
高并发场景下的缓存有哪些常见的问题?

一、缓存一致性问题 当数据时效性要求很高时,需要保证缓存中的数据与数据库中的保持一致,而且需要保证缓存节点和副本中的数据也保持一致,不能出现差异现象。 这就比较依赖缓存的过期和更新...

别打我会飞
56分钟前
3
0
List list = new ArrayList()为何父类引用指向子类对象(多态)

态:要有继承,方法的重写,父类引用指向子类对象 疑问一:父类引用指向子类对象 与指向父类对象 Animal cat = new Cat(); //向上转型。 父类引用指向子类对象,该引用不能再访问子类新增加的...

architect刘源源
57分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部