文档章节

hadoop远程过程调度(1)

政委007
 政委007
发布于 2017/05/03 09:37
字数 2398
阅读 42
收藏 2

#hadoop远程过程调度(1) [toc]

RPC基础知识

rpc 原理

rpc就是允许程序调度位于其他机器上的过程。 大致调度步骤:

  1. client通过动态代理获取接口对象
  2. client调用接口的实例对象的方法
  3. 通过socket把需要执行的类,方法,参数传递到服务器
  4. 服务器接受到参数后,反射获取实体类,执行方法,并返回执行结果

rpc 有两种模式: 1. 同步调用 :只能处理一个请求,多请求需要排队,上边介绍的就是同步调用流程 2. 异步调用 : 可以处理多个请求

java远程调度例子

java有完整的远程调度 接口:

package com.bonc.rpc;

import java.rmi.Remote;
import java.rmi.RemoteException;
//远程调度接口必须是public,必须继承Remote类
public interface RMIQuery extends Remote{
	//远程接口必须将java.rmi.RemoteException声明在throws语句中
	public String echo(String message) throws RemoteException;
	public String hello(String word) throws RemoteException;
}

实现,和服务端

package com.bonc.rpc;

import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.server.UnicastRemoteObject;
//继承了UnicastRemoteObject 对象,这个对象中封装了构建远程服务器的一些东西
public class RMIQueryImpl extends UnicastRemoteObject implements RMIQuery{

	public static final String RMI_URL="rmi://localhost:12909/query";
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	protected RMIQueryImpl() throws RemoteException {
		super();
		// TODO Auto-generated constructor stub
	}

	public String echo(String message) throws RemoteException {
		// TODO Auto-generated method stub
		System.out.println(message);
		return "remote get message : " + message;
	}

	public String hello(String word) throws RemoteException {
		// TODO Auto-generated method stub
		System.out.println(word);
		return "hello ," + word + " ! I am a remote server";
	}

	public static void main(String[] args) throws RemoteException, MalformedURLException {
		//创建对象实例,实际上初始化了UnicastRemoteObject的实例,创建服务端
		RMIQueryImpl impl = new RMIQueryImpl();
		//打开12909端口
		LocateRegistry.createRegistry(12909);
		//吧对象绑定到url上
		Naming.rebind(RMI_URL, impl);
		System.out.println("service has ready!");
		
	}
}

客户端

package com.bonc.rpc;

import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;

public class RMIQueryClient {
	public static void main(String[] args) throws MalformedURLException, RemoteException, NotBoundException {
		
		RMIQuery query = (RMIQuery) Naming.lookup(RMIQueryImpl.RMI_URL);
		//调用远程方法
		System.out.println(query.hello("宇宙"));
		System.out.println(query.echo("我要征服世界!"));
		
	}
}

JAVA动态代理

####什么是代理?
一种常用的设计模式,其目的就是为其他对象提供一个代理以控制对某个对象的访问。 代理类负责为委托类预处理消息,过滤消息并转发消息,以及进行消息被委托类执行后的后 续处理

简单代理实现例子(网上例子,能说明问题就行)

// 抽象角色:
abstract public class Subject {
    abstract public void  request();
}
// 真实角色:实现了Subject的request()方法
public class  RealSubject  extends  Subject  {
  public  RealSubject()  { }
  public void  request()  {
     System.out.println( " From real subject. " );
    }
}
// 代理角色:
public class  ProxySubject  extends  Subject  {
  // 以真实角色作为代理角色的属性
  private  Subject realSubject;

  public  ProxySubject(Subject realSubject)  {this.realSubject = realSubject }

  // 该方法封装了真实对象的request方法
  public void  request()  {
     preRequest();
     realSubject.request();  // 此处执行真实对象的request方法
     postRequest();
  }
  ...
}

// 客户端调用:
RealSubject real = new RealSubject();
Subject sub = new  ProxySubject(real);
Sub.request();

动态代理

动态代理中,代理类并不是在Java代码中实现,而是在运行时期生成,相比静态代理,动态代理可以很方便的对委托类的方法进行统一处理,如添加方法调用次数、添加日志功能等等,动态代理分为jdk动态代理和cglib动态代理,下面通过一个例子看看如何实现jdk动态代理。 网上例子

// 抽象角色(之前是抽象类,此处应改为接口):
public  interface Subject {
  abstract  public  void request();
}

// 具体角色RealSubject:
public  class RealSubject implements Subject {
  public RealSubject() {}
  public  void request() {
    System.out.println( " From real subject. " );
 }
}

// 代理处理器:
import java.lang.reflect.Method;
import java.lang.reflect.InvocationHandler;
//实现InvocationHandler接口,可以用来处理多种需要相同代理的接口
public  class DynamicSubject implements InvocationHandler {
  private Object sub;
  public DynamicSubject() {}

  public DynamicSubject(Object obj) {
    sub = obj;
  }
    //实先invoke 方法 
    /**
    * proxy : 需要代理的对象
    * method : 需要执行的方法
    * args : 执行方法需要的参数
    **/
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
    System.out.println( " before calling "  + method);
    Object obj = method.invoke(sub,args);

    System.out.println( " after calling "  + method);
    return  obj ;
  }
}
// 客户端:
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;

public class Client {

  static public void main(String[] args) throws Throwable {
   RealSubject rs = new RealSubject(); // 在这里指定被代理类
   InvocationHandler ds = new DynamicSubject(rs);
   Class cls = rs.getClass();

   // 我们给这个代理对象提供了一组什么接口,那么我这个代理对象就会实现了这组接口,这个时候我们当然可以将这个代理对象强制类型转化为这组接口中的任意一个,因为这里的接口是Subject类型,所以就可以将其转化为Subject类型了。动态生成了一个类,实现了传递的所有接口,对应的接口方法,则调用统一实现,封装最后调用invoke方法
   Subject subject = (Subject) Proxy.newProxyInstance(cls.getClassLoader(), cls.getInterfaces(),ds );
   subject.request();
  }
}

// 程序运行结果:
before calling public abstract void Subject.request()
From real subject.
after calling public abstract void Subject.request()

这块的东西比较深,有兴趣可以单独拿出来研究一下

NIO了解

首先,看看阻塞IO的使用

package com.bonc.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Random;

/**
 * Created by hadoop on 2017/3/10.
 * 客户端
 */
public class BIOClient {
    public static void main(String[] args) throws IOException {

//Close

        for( int i =0 ; i < 1000 ; i++)
        {
            Socket client = new Socket("127.0.0.1",7777);
            //读写
            PrintWriter pw = new PrintWriter(client.getOutputStream());

            pw.write(i+" \n");
            pw.close();
        }

    }
}
//服务器端
package com.bonc.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * Created by hadoop on 2017/3/10.
 */
public class BIOServer {
    public static void main(String[] args) throws IOException {
        //Bind,Listen
        Socket socket;
        ServerSocket ss = new ServerSocket(7777);
        while (true) {
            //Accept
            socket = ss.accept();
            //一般新建一个线程执行读写
            BufferedReader br = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            System.out.println("you input is : " + br.readLine());
           br.close();
        }
    }
}

非阻塞IO的使用

package com.bonc.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {  
    //通道管理器  
    private Selector selector;  
  
    /** 
     * 获得一个ServerSocket通道,并对该通道做一些初始化的工作 
     * @param port  绑定的端口号 
     * @throws IOException 
     */  
    public void initServer(int port) throws IOException {  
        // 获得一个ServerSocket通道  
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  
        // 设置通道为非阻塞  
        serverChannel.configureBlocking(false);  
        // 将该通道对应的ServerSocket绑定到port端口  
        serverChannel.socket().bind(new InetSocketAddress(port));  
        // 获得一个通道管理器  
        this.selector = Selector.open();  
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,  
        //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。  
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
    }  
  
    /** 
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 
     * @throws IOException 
     */  
    @SuppressWarnings("unchecked")  
    public void listen() throws IOException {  
        System.out.println("服务端启动成功!");  
        // 轮询访问selector  
        while (true) {  
            //当注册的事件到达时,方法返回;否则,该方法会一直阻塞  
            selector.select();  
            // 获得selector中选中的项的迭代器,选中的项为注册的事件  
            Iterator ite = this.selector.selectedKeys().iterator();  
            while (ite.hasNext()) {  
                SelectionKey key = (SelectionKey) ite.next();  
                // 删除已选的key,以防重复处理  
                ite.remove();  
                // 客户端请求连接事件  
                if (key.isAcceptable()) {  
                    ServerSocketChannel server = (ServerSocketChannel) key  
                            .channel();  
                    // 获得和客户端连接的通道  
                    SocketChannel channel = server.accept();  
                    // 设置成非阻塞  
                    channel.configureBlocking(false);  
  
                    //在这里可以给客户端发送信息哦  
                    channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));  
                    //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。  
                    channel.register(this.selector, SelectionKey.OP_READ);  
                      
                    // 获得了可读的事件  
                } else if (key.isReadable()) {  
                        read(key);  
                }  
  
            }  
  
        }  
    }  
    /** 
     * 处理读取客户端发来的信息 的事件 
     * @param key 
     * @throws IOException  
     */  
    public void read(SelectionKey key) throws IOException{  
        // 服务器可读取消息:得到事件发生的Socket通道  
        SocketChannel channel = (SocketChannel) key.channel();  
        // 创建读取的缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(10);  
        channel.read(buffer);  
        byte[] data = buffer.array();  
        String msg = new String(data).trim();  
        System.out.println("服务端收到信息:"+msg);  
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());  
        channel.write(outBuffer);// 将消息回送给客户端  
    }  
      
    /** 
     * 启动服务端测试 
     * @throws IOException  
     */  
    public static void main(String[] args) throws IOException {  
        NIOServer server = new NIOServer();  
        server.initServer(8000);  
        server.listen();  
    }  
  
}
// 客户端
package com.bonc.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
public class NIOClient {  
    //通道管理器  
    private Selector selector;  
  
    /** 
     * 获得一个Socket通道,并对该通道做一些初始化的工作 
     * @param ip 连接的服务器的ip 
     * @param port  连接的服务器的端口号          
     * @throws IOException 
     */  
    public void initClient(String ip,int port) throws IOException {  
        // 获得一个Socket通道  
        SocketChannel channel = SocketChannel.open();  
        // 设置通道为非阻塞  
        channel.configureBlocking(false);  
        // 获得一个通道管理器  
        this.selector = Selector.open();  
          
        // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调  
        //用channel.finishConnect();才能完成连接  
        channel.connect(new InetSocketAddress(ip,port));  
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。  
        channel.register(selector, SelectionKey.OP_CONNECT);  
    }  
  
    /** 
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 
     * @throws IOException 
     */  
    @SuppressWarnings("unchecked")  
    public void listen() throws IOException {  
        // 轮询访问selector  
        while (true) {  
            selector.select();  
            // 获得selector中选中的项的迭代器  
            Iterator ite = this.selector.selectedKeys().iterator();  
            while (ite.hasNext()) {  
                SelectionKey key = (SelectionKey) ite.next();  
                // 删除已选的key,以防重复处理  
                ite.remove();  
                // 连接事件发生  
                if (key.isConnectable()) {  
                    SocketChannel channel = (SocketChannel) key  
                            .channel();  
                    // 如果正在连接,则完成连接  
                    if(channel.isConnectionPending()){  
                        channel.finishConnect();  
                          
                    }  
                    // 设置成非阻塞  
                    channel.configureBlocking(false);  
  
                    //在这里可以给服务端发送信息哦  
                    channel.write(ByteBuffer.wrap(new String("send a message to service ").getBytes()));  
                    //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。  
                    channel.register(this.selector, SelectionKey.OP_READ);  
                      
                    // 获得了可读的事件  
                } else if (key.isReadable()) {  
                        read(key);  
                }  
  
            }  
  
        }  
    }  
    /** 
     * 处理读取服务端发来的信息 的事件 
     * @param key 
     * @throws IOException  
     */  
    public void read(SelectionKey key) throws IOException{  
    	// 服务器可读取消息:得到事件发生的Socket通道  
        SocketChannel channel = (SocketChannel) key.channel();  
        // 创建读取的缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(1000);  
        channel.read(buffer);  
        byte[] data = buffer.array();  
        String msg = new String(data,"UTF-8").trim();  
        System.out.println("客户端收到信息:"+msg);  
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());  
        //channel.write(outBuffer);// 将消息回送给客户端  
    }  
      
      
    /** 
     * 启动客户端测试 
     * @throws IOException  
     */  
    public static void main(String[] args) throws IOException {  
        NIOClient client = new NIOClient();  
        client.initClient("localhost",8000);  
        client.listen();  
    }  
  
}

和阻塞io相比,NIO多出了一个通道管理器,用了监听端口事件,如果接受到事件,唤醒执行select()方法,继续后处理

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
Hadoop MapReduce优化和资源调度器

Hadoop Shuffle过程 1.Hadoop MapReduce Shuffle过程 Hadoop Shuffle过程 Map Shuffle过程图2 2.Shuffle过程要点记录 每个Map Task把输出结果写到内存中的环形缓冲区。 当内存环形缓冲区写入...

溯水心生
2018/01/14
0
0
Hadoop Mapreduce架构--Hadoop技术内幕读书笔记

1、Client 用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业运行状态。在Hadoop内部用“作业”Job表示MapReduce程序。一个MapReduce程序...

fmz
2014/04/01
355
0
第6章-MapReduce的工作机制-笔记

作业的提交 可以只用一行代码来运行一个MapReduce作业: JobClient.runJob(conf)。 作业的调度 Hadoop作业调度演进 1、早期版本的Hadoop使用FIFO调度算法来运行作业 早期版本的Hadoop使用一种...

hiqj
2014/10/16
35
0
好程序员大数据划重点 hadoop常用四大模块文件

1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名而来。主要包括系统配置工具Configuration、远程过程调用RPC、序列化机制和Hadoop抽象文件系统FileSystem等...

好程序员IT
05/16
1
0
大数据开发 | MapReduce介绍

1. MapReduce 介绍 1.1MapReduce的作用 假设有一个计算文件中单词个数的需求,文件比较多也比较大,在单击运行的时候机器的内存受限,磁盘受限,运算能力受限,而一旦将单机版程序扩展到集群...

嘿你好夏天
2018/04/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部