文档章节

hadoop 远程调度(二)

政委007
 政委007
发布于 2017/05/03 09:39
字数 1230
阅读 339
收藏 16

hadoop 远程调度(二)

[toc]

远程调度例子

//定义接口
public interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol{
    long versionID = 123456;
    String echo(String str);
    int add(int a, int b);
}
//接口实现
public class ClientProtocolImpl  implements  ClientProtocol{
    public String echo(String str) {
        System.out.println(str);
        return "echo " + str;
    }
    public  int add(int a, int b) {
        return a + b;
    }
    public long getProtocolVersion(String s, long l) throws IOException {
        return ClientProtocol.versionID;
    }
    public ProtocolSignature getProtocolSignature(String protocol, long versionID, int intHashCode) throws IOException {
        return new ProtocolSignature(versionID,null);
    }
}
//客户端
public class RpcClient {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //获取实例 IPC接口对象, 接口版本号, 服务器地址,配置
        ClientProtocol proxy = RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,new InetSocketAddress("127.0.0.1",10240),conf);
        System.out.println(proxy.echo("aaa :fasf bs"));
        RPC.stopProxy(proxy);
    }
}
//服务端
public class RpcServer {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        //Server server2 = RPC.getServer(new ClientProtocolImpl(), "loaclhost", 10240, conf);
        Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class).setInstance(new ClientProtocolImpl()).setBindAddress("127.0.0.1").setPort(10240).setNumHandlers(5).build();
        //启动服务ProtocolProxy
        server.start();
    }
}

#客户端调用 主要涉及的类有

  1. VersionedProtocol
  2. RPC
  3. Server

VersionedProtocol

主要作用: 声明是一个hadoop rpc协议的父类 ###主要方法

//返回服务器的协议接口版本号
public long getProtocolVersion(String protocol,long clientVersion) throws IOException;
//返回协议签名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,int clientMethodsHash) throws IOException;

RPC

主要成员变量

//缓存获取到的rpcEngine
private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES = new HashMap<Class<?>,RpcEngine>();

主要方法

    //获取代理对象。客户端调用时获取动态代理生成的代理对象
    public static <T> T getProxy(Class<T> protocol,long clientVersion,InetSocketAddress addr, Configuration conf) throws IOException {
     return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
    }
    
    // 获取代理对象的封装
    public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    //通过getProtocolEngine()获取代理一个代理引擎,默认是获取WritableRpcEngine 对象,getProxy()方法会调用动态代理生成需要代理对对象,并且封装成ProtocolProxy  实例
    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
        fallbackToSimpleAuth);
  }
  
    // 获取rpc引擎,如果缓存中有对应对象,直接返回,否则创建新的引擎对象
  static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

WritableRpcEngine

    //客户端调用动态代理获取代理对象
 public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
                         AtomicBoolean fallbackToSimpleAuth)
    throws IOException {    

    if (connectionRetryPolicy != null) {
      throw new UnsupportedOperationException(
          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
    }
    //动态代理获取对象
    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout, fallbackToSimpleAuth));
    //封装成ProtocolProxy对象
    return new ProtocolProxy<T>(protocol, proxy, true);
  }

RPC.getProxy() 调度流程

  1. 调用RPC.getProxy() 方法
  2. getProxy()方法会调用RPC中getProtocolProxy()方法获取一个ProtocolProxy对象,这个对象的getProxy()方法会返回动态代理生成的代理对象实例
  3. getProtocolProxy()中 主要先生成一个RpcEngine 对象 默认是WritableRpcEngine类实例,这个类的getProxy方法会调用动态代理生成代理对象 ,并且把这个对象封装成ProtocolProxy 实例

服务器调度

服务器启动流程

  1. 创建一个Rpc.Builder()实例 ,这个实例主要用来收集构建server的参数
  2. 调用build()方法 ,这个方法中主要调用了Rpc.getProtocolEngine()方法,获取一个rpc代理引擎,默认获取的是WritableRpcEngine。获取引擎后调用对应的getServer()方法。下边以WritableRpcEngine为例
  3. WritableRpcEngine 方法会构建一个WritableRpcEngine.Server的实例
  4. WritableRpcEngine.Server的构造方法中首先调用父类的构造方法,然后注册需要代理的接口,和实例到protocolImplMapArray中。
  5. 调用server的start方法,实际上是调用Responder和Listener的start方法 关于server类的关系图 Server类结构图

实际代码解析

Rpc.Builder.build()方法

public Server build() throws IOException, HadoopIllegalArgumentException {
     //getProtocolEngine 获取协议引擎。默认获取的是WritableRpcEngine 类
    //调用getServer()方法
      return getProtocolEngine(this.protocol, this.conf).getServer(
          this.protocol, this.instance, this.bindAddress, this.port,
          this.numHandlers, this.numReaders, this.queueSizePerHandler,
          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
    }
  }

WritableRpcEngine.getServer()方法

  @Override
  public RPC.Server getServer(Class<?> protocolClass,
                      Object protocolImpl, String bindAddress, int port,
                      int numHandlers, int numReaders, int queueSizePerHandler,
                      boolean verbose, Configuration conf,
                      SecretManager<? extends TokenIdentifier> secretManager,
                      String portRangeConfig) 
    throws IOException {
    //构建WritableRpcEngine的内部类 Server
    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }

WritableRpcEngine.Server的构造方法

   public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress,  int port,
        int numHandlers, int numReaders, int queueSizePerHandler, 
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
        String portRangeConfig) 
        throws IOException {
        //调用父类的构造方法 ,父类构造方法中主要创建了Listener 和Responder实例
        //listener主要用来接收端口请求
        //Responder主要用来返回rpc结果数据
        super(bindAddress, port, null, numHandlers, numReaders,
          queueSizePerHandler, conf,
          classNameBase(protocolImpl.getClass().getName()), secretManager,
          portRangeConfig);
          this.verbose = verbose;
      Class<?>[] protocols;
    //如果未指定需要代理的接口,则把实例的所有实现的接口加入代理。
      if (protocolClass == null) { 
        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
      } else {
        //如果指定了代理的接口,只会把需要代理的接口 加入到代理中。 
        //实际存储在rpc类中 
        //ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray 中
        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
        protocols = RPC.getProtocolInterfaces(protocolClass);
      }
      for (Class<?> p : protocols) {
        if (!p.equals(VersionedProtocol.class)) {
          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
        }
      }
    }

rpc server端启动

  /** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() {
    //启动数据返回线程
    responder.start();
    //启动接口监听线程
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

© 著作权归作者所有

政委007
粉丝 10
博文 15
码字总数 15843
作品 0
洛阳
程序员
私信 提问
大数据Hadoop需要了解哪些内容?

一、Hadoop环境搭建 1. Hadoop生态环境介绍 2. Hadoop云计算中的位置和关系 3. 国内外Hadoop应用案例介绍 4. Hadoop概念、版本、历史 5. Hadoop核心组成介绍及hdfs、mapreduce体系结构 6. H...

mo默瑶
2018/05/05
0
0
Hadoop MapReduce优化和资源调度器

Hadoop Shuffle过程 1.Hadoop MapReduce Shuffle过程 Hadoop Shuffle过程 Map Shuffle过程图2 2.Shuffle过程要点记录 每个Map Task把输出结果写到内存中的环形缓冲区。 当内存环形缓冲区写入...

溯水心生
2018/01/14
0
0
学习笔记TF065: TensorFlowOnSpark

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direc...

利炳根
2017/11/13
0
0
好程序员大数据划重点 hadoop常用四大模块文件

1.core-site.xml(工具模块)。包括Hadoop常用的工具类,由原来的Hadoopcore部分更名而来。主要包括系统配置工具Configuration、远程过程调用RPC、序列化机制和Hadoop抽象文件系统FileSystem等...

好程序员IT
05/16
1
0
Eclispe远程调试sqoop

利用eclipse远程调试功能,实现sqoop 本地环境集成: 利用cygwin部署 0、将sqoop-1.4.2放在/home/Administrator/hadoop 1、sqoop中设置HOMEHOME: 修改SQOOPHOME/bin/configure-sqoop :HAD...

超人学院
2015/06/03
353
0

没有更多内容

加载失败,请刷新页面

加载更多

《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
6
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
5
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
1K
11
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部