hadoop 远程调度(二)
[toc]
远程调度例子
//定义接口
public interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol{
long versionID = 123456;
String echo(String str);
int add(int a, int b);
}
//接口实现
public class ClientProtocolImpl implements ClientProtocol{
public String echo(String str) {
System.out.println(str);
return "echo " + str;
}
public int add(int a, int b) {
return a + b;
}
public long getProtocolVersion(String s, long l) throws IOException {
return ClientProtocol.versionID;
}
public ProtocolSignature getProtocolSignature(String protocol, long versionID, int intHashCode) throws IOException {
return new ProtocolSignature(versionID,null);
}
}
//客户端
public class RpcClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
//获取实例 IPC接口对象, 接口版本号, 服务器地址,配置
ClientProtocol proxy = RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,new InetSocketAddress("127.0.0.1",10240),conf);
System.out.println(proxy.echo("aaa :fasf bs"));
RPC.stopProxy(proxy);
}
}
//服务端
public class RpcServer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//Server server2 = RPC.getServer(new ClientProtocolImpl(), "loaclhost", 10240, conf);
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class).setInstance(new ClientProtocolImpl()).setBindAddress("127.0.0.1").setPort(10240).setNumHandlers(5).build();
//启动服务ProtocolProxy
server.start();
}
}
#客户端调用 主要涉及的类有
- VersionedProtocol
- RPC
- Server
VersionedProtocol
主要作用: 声明是一个hadoop rpc协议的父类 ###主要方法
//返回服务器的协议接口版本号
public long getProtocolVersion(String protocol,long clientVersion) throws IOException;
//返回协议签名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,int clientMethodsHash) throws IOException;
RPC
主要成员变量
//缓存获取到的rpcEngine
private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES = new HashMap<Class<?>,RpcEngine>();
主要方法
//获取代理对象。客户端调用时获取动态代理生成的代理对象
public static <T> T getProxy(Class<T> protocol,long clientVersion,InetSocketAddress addr, Configuration conf) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
// 获取代理对象的封装
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
//通过getProtocolEngine()获取代理一个代理引擎,默认是获取WritableRpcEngine 对象,getProxy()方法会调用动态代理生成需要代理对对象,并且封装成ProtocolProxy 实例
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}
// 获取rpc引擎,如果缓存中有对应对象,直接返回,否则创建新的引擎对象
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
WritableRpcEngine
//客户端调用动态代理获取代理对象
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
//动态代理获取对象
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
//封装成ProtocolProxy对象
return new ProtocolProxy<T>(protocol, proxy, true);
}
RPC.getProxy() 调度流程
- 调用RPC.getProxy() 方法
- getProxy()方法会调用RPC中getProtocolProxy()方法获取一个ProtocolProxy对象,这个对象的getProxy()方法会返回动态代理生成的代理对象实例
- getProtocolProxy()中 主要先生成一个RpcEngine 对象 默认是WritableRpcEngine类实例,这个类的getProxy方法会调用动态代理生成代理对象 ,并且把这个对象封装成ProtocolProxy 实例
服务器调度
服务器启动流程
- 创建一个Rpc.Builder()实例 ,这个实例主要用来收集构建server的参数
- 调用build()方法 ,这个方法中主要调用了Rpc.getProtocolEngine()方法,获取一个rpc代理引擎,默认获取的是WritableRpcEngine。获取引擎后调用对应的getServer()方法。下边以WritableRpcEngine为例
- WritableRpcEngine 方法会构建一个WritableRpcEngine.Server的实例
- WritableRpcEngine.Server的构造方法中首先调用父类的构造方法,然后注册需要代理的接口,和实例到protocolImplMapArray中。
- 调用server的start方法,实际上是调用Responder和Listener的start方法 关于server类的关系图
实际代码解析
Rpc.Builder.build()方法
public Server build() throws IOException, HadoopIllegalArgumentException {
//getProtocolEngine 获取协议引擎。默认获取的是WritableRpcEngine 类
//调用getServer()方法
return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}
}
WritableRpcEngine.getServer()方法
@Override
public RPC.Server getServer(Class<?> protocolClass,
Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
//构建WritableRpcEngine的内部类 Server
return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
}
WritableRpcEngine.Server的构造方法
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
//调用父类的构造方法 ,父类构造方法中主要创建了Listener 和Responder实例
//listener主要用来接收端口请求
//Responder主要用来返回rpc结果数据
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig);
this.verbose = verbose;
Class<?>[] protocols;
//如果未指定需要代理的接口,则把实例的所有实现的接口加入代理。
if (protocolClass == null) {
protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
} else {
//如果指定了代理的接口,只会把需要代理的接口 加入到代理中。
//实际存储在rpc类中
//ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray 中
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}
}
rpc server端启动
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() {
//启动数据返回线程
responder.start();
//启动接口监听线程
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}