hadoop 远程调度(二)

原创
2017/05/03 09:39
阅读数 934

hadoop 远程调度(二)

[toc]

远程调度例子

//定义接口
public interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol{
    long versionID = 123456;
    String echo(String str);
    int add(int a, int b);
}
//接口实现
public class ClientProtocolImpl  implements  ClientProtocol{
    public String echo(String str) {
        System.out.println(str);
        return "echo " + str;
    }
    public  int add(int a, int b) {
        return a + b;
    }
    public long getProtocolVersion(String s, long l) throws IOException {
        return ClientProtocol.versionID;
    }
    public ProtocolSignature getProtocolSignature(String protocol, long versionID, int intHashCode) throws IOException {
        return new ProtocolSignature(versionID,null);
    }
}
//客户端
public class RpcClient {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //获取实例 IPC接口对象, 接口版本号, 服务器地址,配置
        ClientProtocol proxy = RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,new InetSocketAddress("127.0.0.1",10240),conf);
        System.out.println(proxy.echo("aaa :fasf bs"));
        RPC.stopProxy(proxy);
    }
}
//服务端
public class RpcServer {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        //Server server2 = RPC.getServer(new ClientProtocolImpl(), "loaclhost", 10240, conf);
        Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class).setInstance(new ClientProtocolImpl()).setBindAddress("127.0.0.1").setPort(10240).setNumHandlers(5).build();
        //启动服务ProtocolProxy
        server.start();
    }
}

#客户端调用 主要涉及的类有

  1. VersionedProtocol
  2. RPC
  3. Server

VersionedProtocol

主要作用: 声明是一个hadoop rpc协议的父类 ###主要方法

//返回服务器的协议接口版本号
public long getProtocolVersion(String protocol,long clientVersion) throws IOException;
//返回协议签名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,int clientMethodsHash) throws IOException;

RPC

主要成员变量

//缓存获取到的rpcEngine
private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES = new HashMap<Class<?>,RpcEngine>();

主要方法

    //获取代理对象。客户端调用时获取动态代理生成的代理对象
    public static <T> T getProxy(Class<T> protocol,long clientVersion,InetSocketAddress addr, Configuration conf) throws IOException {
     return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
    }
    
    // 获取代理对象的封装
    public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    //通过getProtocolEngine()获取代理一个代理引擎,默认是获取WritableRpcEngine 对象,getProxy()方法会调用动态代理生成需要代理对对象,并且封装成ProtocolProxy  实例
    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
        fallbackToSimpleAuth);
  }
  
    // 获取rpc引擎,如果缓存中有对应对象,直接返回,否则创建新的引擎对象
  static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

WritableRpcEngine

    //客户端调用动态代理获取代理对象
 public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
                         AtomicBoolean fallbackToSimpleAuth)
    throws IOException {    

    if (connectionRetryPolicy != null) {
      throw new UnsupportedOperationException(
          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
    }
    //动态代理获取对象
    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout, fallbackToSimpleAuth));
    //封装成ProtocolProxy对象
    return new ProtocolProxy<T>(protocol, proxy, true);
  }

RPC.getProxy() 调度流程

  1. 调用RPC.getProxy() 方法
  2. getProxy()方法会调用RPC中getProtocolProxy()方法获取一个ProtocolProxy对象,这个对象的getProxy()方法会返回动态代理生成的代理对象实例
  3. getProtocolProxy()中 主要先生成一个RpcEngine 对象 默认是WritableRpcEngine类实例,这个类的getProxy方法会调用动态代理生成代理对象 ,并且把这个对象封装成ProtocolProxy 实例

服务器调度

服务器启动流程

  1. 创建一个Rpc.Builder()实例 ,这个实例主要用来收集构建server的参数
  2. 调用build()方法 ,这个方法中主要调用了Rpc.getProtocolEngine()方法,获取一个rpc代理引擎,默认获取的是WritableRpcEngine。获取引擎后调用对应的getServer()方法。下边以WritableRpcEngine为例
  3. WritableRpcEngine 方法会构建一个WritableRpcEngine.Server的实例
  4. WritableRpcEngine.Server的构造方法中首先调用父类的构造方法,然后注册需要代理的接口,和实例到protocolImplMapArray中。
  5. 调用server的start方法,实际上是调用Responder和Listener的start方法 关于server类的关系图 Server类结构图

实际代码解析

Rpc.Builder.build()方法

public Server build() throws IOException, HadoopIllegalArgumentException {
     //getProtocolEngine 获取协议引擎。默认获取的是WritableRpcEngine 类
    //调用getServer()方法
      return getProtocolEngine(this.protocol, this.conf).getServer(
          this.protocol, this.instance, this.bindAddress, this.port,
          this.numHandlers, this.numReaders, this.queueSizePerHandler,
          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
    }
  }

WritableRpcEngine.getServer()方法

  @Override
  public RPC.Server getServer(Class<?> protocolClass,
                      Object protocolImpl, String bindAddress, int port,
                      int numHandlers, int numReaders, int queueSizePerHandler,
                      boolean verbose, Configuration conf,
                      SecretManager<? extends TokenIdentifier> secretManager,
                      String portRangeConfig) 
    throws IOException {
    //构建WritableRpcEngine的内部类 Server
    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }

WritableRpcEngine.Server的构造方法

   public Server(Class<?> protocolClass, Object protocolImpl,
        Configuration conf, String bindAddress,  int port,
        int numHandlers, int numReaders, int queueSizePerHandler, 
        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
        String portRangeConfig) 
        throws IOException {
        //调用父类的构造方法 ,父类构造方法中主要创建了Listener 和Responder实例
        //listener主要用来接收端口请求
        //Responder主要用来返回rpc结果数据
        super(bindAddress, port, null, numHandlers, numReaders,
          queueSizePerHandler, conf,
          classNameBase(protocolImpl.getClass().getName()), secretManager,
          portRangeConfig);
          this.verbose = verbose;
      Class<?>[] protocols;
    //如果未指定需要代理的接口,则把实例的所有实现的接口加入代理。
      if (protocolClass == null) { 
        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
      } else {
        //如果指定了代理的接口,只会把需要代理的接口 加入到代理中。 
        //实际存储在rpc类中 
        //ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray 中
        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
        protocols = RPC.getProtocolInterfaces(protocolClass);
      }
      for (Class<?> p : protocols) {
        if (!p.equals(VersionedProtocol.class)) {
          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
        }
      }
    }

rpc server端启动

  /** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() {
    //启动数据返回线程
    responder.start();
    //启动接口监听线程
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
16 收藏
1
分享
返回顶部
顶部