前言
上篇,分析了基于HTTP方式的RPC调用。本篇将在上篇的基础上,分析基于TCP方式的RPC调用。代码的整体思路是一致的,可以看作是在上篇功能上的扩展——即通信的方式。
代码:https://gitee.com/marvelcode/marvelcode-rpc.git
源码
上篇基于HTTP的远程服务调用,在服务消费方,提到过一个 ReferenceAgent 的抽象,可看作是远程服务的代理对象。即通过 JDK 动态代理所有接口方法,进而在 invoke 中使用 RestTemplate 发起HTTP请求并获取响应,接口最重要的方法定义如下图:
package com.menghao.rpc.consumer.handle;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.util.RequestIdUtils;
import java.lang.reflect.Method;
import java.util.List;
/**
* <p>Rpc框架消费方代理.<br>
* <p>执行具体调用,子类实现Http、Tcp方式的Rpc请求<p/>
* <p>需要保证线程安全(会被并发调用)</p>
*
* @author MarvelCode.
*/
public interface ReferenceAgent {
/**
* 通过rpc方式调用处理
*
* @param method 调用方法
* @param args 调用参数
* @return Object 调用结果
*/
Object invoke(Method method, Object[] args);
}
同样的,扩展 TCP 方式调用就需要从此处着手,来扩展新的实现,以发送TCP包给服务提供方。其余的,像对象代理赋值,以及ZooKeeper节点的监听,都可以复用。由此也可以看出,面向接口编程的优势:屏蔽实现,以最小化的代价切换、扩展方案。
那么就来看下子类 TcpReferenceAgent 的实现:
package com.menghao.rpc.consumer.handle.tcp;
import com.menghao.rpc.consumer.balance.LoadBalancer;
import com.menghao.rpc.consumer.balance.RandomLoadBalancer;
import com.menghao.rpc.consumer.handle.ReferenceAgent;
import com.menghao.rpc.consumer.model.ReferenceKey;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.netty.TcpConnectionContainer;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.spring.BeansManager;
import lombok.Getter;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* <p>ReferenceAgent T方式实现.</br>
* <p>调用原始接口的任意方法会被该类的invoke方法代理:使用Netty发送请求</p>
* <p>sourceInterface/implCode:唯一标识一个服务</p>
*
* @author MarvelCode
*/
public class TcpReferenceAgent implements ReferenceAgent {
@Getter
private Class sourceInterface;
@Getter
private String implCode;
private List<String> providerHosts;
private TcpConnectionContainer tcpConnectionContainer;
private LoadBalancer defaultBalancer = new RandomLoadBalancer();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public TcpReferenceAgent(ReferenceKey referenceKey) {
this.sourceInterface = referenceKey.getSourceInterface();
this.implCode = referenceKey.getName();
this.tcpConnectionContainer = BeansManager.getInstance().getBeanByType(TcpConnectionContainer.class);
this.providerHosts = new ArrayList<>();
}
@Override
public Object invoke(Method method, Object[] args) {
// 构造请求参数
RpcRequest rpcRequest = makeParam(method, args);
// 负载均衡选取Tcp连接
TcpConnection tcpConnection = select();
// 构建调用上下文
InvocationContext invocationContext = new InvocationContext(tcpConnection, rpcRequest);
// 执行调用
invocationContext.execute();
// 阻塞获取结果
return invocationContext.get();
}
@Override
public void setProviderHosts(List<String> hosts) {
lock.writeLock().lock();
try {
// 刷新Tcp连接
refreshConnection(providerHosts, hosts);
this.providerHosts = hosts;
} finally {
lock.writeLock().unlock();
}
}
private TcpConnection select() {
lock.readLock().lock();
try {
if (providerHosts == null || providerHosts.size() == 0) {
throw new InvokeException("There are currently no service providers available");
}
// 负载均衡
String host = defaultBalancer.select(providerHosts);
String[] info = host.split(":");
return tcpConnectionContainer.get(info[0], Integer.valueOf(info[1]));
} finally {
lock.readLock().unlock();
}
}
private void refreshConnection(List<String> lastHost, List<String> nowHost) {
Set<String> commonHost = new HashSet<>(lastHost);
// 当前存活机器与上次存活机器交集
commonHost.retainAll(nowHost);
Set<String> lostHost = new HashSet<>(lastHost);
Set<String> addHost = new HashSet<>(nowHost);
// 当前存活机器与交集的差集,得出新增的机器
addHost.removeAll(commonHost);
// 上次存活机器与交集的差集,得出下线的机器
lostHost.removeAll(commonHost);
// 下线的机器,将关闭并移除Tcp连接
for (String host : lostHost) {
String[] info = host.split(":");
tcpConnectionContainer.remove(info[0], Integer.valueOf(info[1]));
}
// 新增的机器,将新建Tcp连接
for (String host : addHost) {
String[] info = host.split(":");
tcpConnectionContainer.register(info[0], Integer.valueOf(info[1]));
}
}
}
从逻辑上看,分为构造参数、挑选请求机器信息、构造调用上下文、执行,返回结果五步。其中构造参数借助了 JDK8 特性,将 RpcRequest请求实体的构造,放在了接口层实现。其余的步骤引入了几个新的模型对象:
- TcpConnection:通过 Netty 实现的,服务消费方和服务提供方的长连接。针对一台远程服务器,该连接全局唯一。
- 举个例子,服务A和服务B部署在相同的两台机器上,那么消费方将持有两条Tcp长连接(针对服务部署的机器,而非服务),调用服务A和B将复用这两条连接。
- TcpConnectionContainer:上述连接的容器,管理着连接的生命周期。服务的启停会触发连接的打开、关闭。
- InvocationContext:一次调用的上下文环境(伴随调用的生命周期)。存储了请求数据及响应结果,重要的职责在于回写结果。
介绍完 TcpReferenceAgent ,我就以连接创建——发起请求——请求处理——响应处理这四步来分析如何实现。
连接创建
首先,来看看连接的创建过程吧。我的实现采用了 Netty 作为客户端与服务端通信的基础。没Netty基础的小伙伴需要自行补下课了。
public TcpConnection(String ip, int port) throws InterruptedException {
this.close = new AtomicBoolean(true);
TcpClient tcpClient = BeansManager.getInstance().getBeanByType(TcpClient.class);
ChannelFuture future = tcpClient.connect(ip, port);
this.channel = future.channel();
this.close.set(false);
}
package com.menghao.rpc.netty;
import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.netty.in.TcpInboundHandler;
import com.menghao.rpc.netty.in.TcpMessageDecoder;
import com.menghao.rpc.netty.out.TcpMessageEncoder;
import com.menghao.rpc.provider.model.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.List;
/**
* <p>借助Netty实现TCP客户端.<br>
*
* @author MarvelCode.
*/
public class TcpClient {
private static EventLoopGroup workerGroup;
private Bootstrap bootstrap;
private int connectTimeout;
private int maxFrameLength;
private int readIdle;
private int writIdle;
private List<TcpMessageHandler> messageHandlers;
public TcpClient(int connectTimeout, int maxFrameLength, int readIdle, int writIdle, List<TcpMessageHandler> messageHandlers) {
this.connectTimeout = connectTimeout;
this.maxFrameLength = maxFrameLength;
this.readIdle = readIdle;
this.writIdle = writIdle;
this.messageHandlers = messageHandlers;
}
static {
// 当jvm关闭的时候执行addShutdownHook添加的钩子方法
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
workerGroup.shutdownGracefully();
}
});
}
public void initBootstrap() {
bootstrap = new Bootstrap();
workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("netty-server-io", true));
int ct = connectTimeout > 0 ? this.connectTimeout : 5000;
bootstrap.group(workerGroup)
// 连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ct)
// 是否使用Nagle的算法以尽可能发送大块数据
.option(ChannelOption.TCP_NODELAY, true)
// 是否启动心跳保活机制(长连接)
.option(ChannelOption.SO_KEEPALIVE, true)
// 是否允许一个地址重复绑定
.option(ChannelOption.SO_REUSEADDR, true)
// 基于内存池的缓冲区重用机制
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
// 客户端需要序列化 rpcRequest、反序列化 rpcResponse
ch.pipeline().addLast(new TcpMessageEncoder())
.addLast(new TcpMessageDecoder(maxFrameLength, RpcResponse.class))
.addLast(new IdleStateHandler(readIdle, writIdle, 0))
.addLast(new TcpInboundHandler(messageHandlers));
}
});
}
public ChannelFuture connect(String ip, int port) throws InterruptedException {
return bootstrap.connect(ip, port).sync();
}
}
其中 TcpClient 的初始化时机,借助了 Spring Bean生命周期的 init-method:
@Bean(initMethod = "initBootstrap")
public TcpClient tcpClient() {
List<TcpMessageHandler> messageHandlers = new ArrayList<>(2);
messageHandlers.add(new RpcRequestMsgHandler());
messageHandlers.add(new RpcResponseMsgHandler());
return new TcpClient(tcpProperties.getConnectTimeout(),
tcpProperties.getMaxFrameLength(),
tcpProperties.getReadIdle(),
tcpProperties.getWritIdle(),
messageHandlers);
}
在进行了一些参数的设置之后,就可以调用 connect 方法来创建一条指定ip、端口的连接。最终会使用 io.netty.channel.Channel 对象来进行数据的传输通信。
其中 ChannelOption.SO_KEEPALIVE 参数设置为 true,来保持长连接。之后会往 ChannelPipeline 中添加 ChannelHandler 处理类。这里包含了信息的编解码Handler、心跳检测Handler,TcpInboundHandler 是对解码后的 RpcRequest/RpcResponse 数据进行处理。
请求创建好之后,接下来就该发起请求了。
发起请求
请求的主要工作集中在了 InvocationContext 这个类中,同时这个类还包含了响应数据的回写。利用锁机制,通过响应 result 是否为空来判断一次调用是否已完成,在调用 get 方法时,调用未完成前会一直阻塞,直到 result 结果被回写。
如果响应的 RpcResponse 是正常响应的结果,就返回;如果是异常,则抛出 :
package com.menghao.rpc.consumer.handle.tcp;
import com.menghao.rpc.consumer.model.Future;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.spring.BeansManager;
import lombok.Getter;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>一次调用请求上下文.<br>
* <p>存放请求及结果,伴随一次调用的生命周期</p>
*
* @author MarvelCode.
*/
public class InvocationContext implements Future {
private InvocationContextContainer invocationContextContainer;
@Getter
private RpcRequest rpcRequest;
private TcpConnection tcpConnection;
/** 异步调用锁,在结果返回时解锁 */
private Lock lock = new ReentrantLock();
private Condition doneCondition = lock.newCondition();
/** 预防调用无返回值的判断 */
private static final Object NULL_OBJECT = new Object();
private Object result;
InvocationContext(TcpConnection tcpConnection, RpcRequest rpcRequest) {
this.tcpConnection = tcpConnection;
this.rpcRequest = rpcRequest;
invocationContextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class);
}
public void execute() {
invocationContextContainer.add(this);
tcpConnection.write(rpcRequest);
}
@Override
public Object get() {
if (!isDone()) {
try {
lock.lock();
if (!isDone()) {
doneCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
if (result instanceof Throwable) {
throw new InvokeException((Throwable) result);
}
return result == NULL_OBJECT ? null : result;
}
@Override
public boolean isDone() {
return result != null;
}
public void notifyCompleted(RpcResponse rpcResponse) {
lock.lock();
try {
if (rpcResponse.getThrowable() != null) {
setResult(rpcResponse.getThrowable());
}
setResult(rpcResponse.getResult());
doneCondition.signalAll();
} finally {
lock.unlock();
}
}
private void setResult(Object value) {
lock.lock();
try {
result = (value == null) ? NULL_OBJECT : value;
doneCondition.signalAll();
} finally {
lock.unlock();
}
invocationContextContainer.remove(rpcRequest.getId());
}
}
这里的 execute 方法,首先将自身(InvocationContext)放置到容器中,因为通过管道写完数据流后,不会立马读取到响应流,所以需要有一个唯一标识(标识请求-响应的对应关系),这样在反序列化得到响应数据时,就可以找到对应请求的 InvocationContext 来回写数据了。
这里生成唯一标识的规则可自定义,即保证一段时间内不会出现重复的即可(并发量越大,越需要考量唯一标识生成的唯一性)。
当执行了 tcpConnection.write(rpcRequest) 这行代码时,会进而调用 channel.writeAndFlush(rpcRequest),将数据写入管道:
package com.menghao.rpc.netty.out;
import com.menghao.rpc.serialize.ObjectOutput;
import com.menghao.rpc.serialize.hessian.HessianObjectOutput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.OutputStream;
/**
* <p>Tcp信息编码类.</br>
*
* @author MarvelCode
*/
public class TcpMessageEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
OutputStream outputStream = new ByteBufOutputStream(byteBuf);
// 预占,回写包长度
byteBuf.writeInt(0);
ObjectOutput objectOutput = new HessianObjectOutput(outputStream);
objectOutput.writeObject(o);
objectOutput.flush();
// 写包长度
byteBuf.setInt(0, byteBuf.writerIndex() - 4);
}
}
可以看到,将对象序列化写入缓冲区前,会先预写一个整形(4字节)的“0”进去,在写入数据流之后,再使用“写指针索引”减去整形的4字节,就得到了数据包的长度。
这样做的目的,就是利用“消息分割”的方式,解决了Tcp半包、粘包的问题。类似 Http协议的 content-length。
请求处理
既然有序列化,自然有反序列化的处理:
package com.menghao.rpc.netty.in;
import com.menghao.rpc.serialize.ObjectInput;
import com.menghao.rpc.serialize.hessian.HessianObjectInput;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.InputStream;
/**
* <p>Tcp信息解码类.</br>
*
* @author MarvelCode
*/
public class TcpMessageDecoder extends LengthFieldBasedFrameDecoder {
private Class<?> covertClass;
/**
* @param maxFrameLength 解码时,处理每个帧数据的最大长度
* 0 该帧数据中,存放该帧数据的长度的数据的起始位置
* 4 记录该帧数据长度的字段本身的长度
* 0 修改帧数据长度字段中定义的值,可以为负数
* 4 解析的时候需要跳过的字节数
*/
public TcpMessageDecoder(int maxFrameLength, Class<?> covertClass) {
super(maxFrameLength, 0, 4, 0, 4);
this.covertClass = covertClass;
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
if (byteBuf == null) {
return null;
}
try {
InputStream inputStream = new ByteBufInputStream(byteBuf);
ObjectInput objectInput = new HessianObjectInput(inputStream);
return objectInput.readObject(covertClass);
} finally {
byteBuf.release();
}
}
}
这里通过集成 Netty 提供的 LengthFieldBasedFrameDecoder ,来实现对Tcp包数据部分的截取工作。
由于这里实现的Tcp协议是我自定义的,所以我很清楚开头使用了几字节来记录数据长度,应该越过几字节才是真正数据的部分。因此配置好参数(即构造器指定的0、4、0、4),进而调用 super.decode(ctx, in),就可以获取到数据流了,然后使用 Hessian 反序列化就获取到了 RpcRequest 对象。
由于 Netty 的处理链使用的是“责任链”设计模式,上述解码类获取到的 RpcRequest 对象会作为下个处理器的入参:
package com.menghao.rpc.netty.in;
import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
/**
* <p>Netty上行处理Handler.</br>
* <p>该层已经可以获取到反序列化后的对象</p>
* <p>根据对象为 RpcRequest、RpcResponse,执行不同的处理逻辑</p>
*
* @author MarvelCode
*/
public class TcpInboundHandler extends SimpleChannelInboundHandler<Object> {
private List<TcpMessageHandler> messageHandlers;
public TcpInboundHandler(List<TcpMessageHandler> messageHandlers) {
this.messageHandlers = messageHandlers;
}
@SuppressWarnings("unchecked")
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
if (o == null) {
return;
}
for (TcpMessageHandler tcpMessageHandler : messageHandlers) {
Channel channel = channelHandlerContext.channel();
if (tcpMessageHandler.support(o.getClass())) {
tcpMessageHandler.handler(new TcpConnection(channel), o);
}
}
}
}
由于服务提供方、服务消费方都会共用 TcpInboundHandler ,但提供方只会处理 RpcRequest,消费方只会处理 RpcResponse,所以就定义了 TcpMessageHandler 来定制化处理。是不是能看到 SpringMVC-HttpMessageConverter<T> 的影子?
package com.menghao.rpc.netty;
import com.menghao.rpc.netty.model.TcpConnection;
import java.io.Serializable;
/**
* <p>Tcp消息处理Handler.</br>
*
* @author MarvelCode
*/
public interface TcpMessageHandler<T> {
/**
* 判断该Handler是否支持指定类型的处理
*
* @param supportClass 支持处理的类型
* @return 是否支持该处理
*/
boolean support(Class<?> supportClass);
/**
* 处理读取到的数据
*
* @param connection 使用Netty Channel封装的连接
* @param data 将要处理的数据
*/
void handler(TcpConnection connection, T data);
}
该接口的两种实现,分别对应 RpcRequest 和 RpcResponse 类型数据的处理。这里先来看下 RpcRequestMsgHandler:
package com.menghao.rpc.provider.handle.tcp;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.spring.BeansManager;
/**
* <p>RpcRequest处理类.</br>
* <p>根据请求信息,定位服务,在 ExecutionExecutor中反射调用</p>
*
* @author MarvelCode
*/
public class RpcRequestMsgHandler implements TcpMessageHandler<RpcRequest> {
@Override
public boolean support(Class<?> supportClass) {
return RpcRequest.class.isAssignableFrom(supportClass);
}
@Override
public void handler(TcpConnection connection, RpcRequest data) {
ExecutionExecutor executor = BeansManager.getInstance().getBeanByType(ExecutionExecutor.class);
executor.execute(new ExecutionExecutor.ExecutionTask(
new ExecutionContext(data, connection)
));
}
}
package com.menghao.rpc.provider.handle.tcp;
import com.menghao.rpc.consumer.model.RpcRequest;
import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.exception.InvokeException;
import com.menghao.rpc.provider.model.ProviderKey;
import com.menghao.rpc.provider.regisiter.ProviderRepository;
import com.menghao.rpc.spring.BeansManager;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.concurrent.*;
/**
* <p>Rpc调用的执行器.</br>
* <p>线程池执行</p>
*
* @author MarvelCode
*/
public class ExecutionExecutor {
private ThreadPoolExecutor threadPoolExecutor;
private int queueLimit;
private static final String THREAD_PREFIX = "Executor-Execution-Task";
public ExecutionExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, int queueLimit) {
this.queueLimit = queueLimit;
threadPoolExecutor = new ExecutionTaskThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS, new ExecutionTaskBlockQueue(),
new NamedThreadFactory(THREAD_PREFIX, true));
}
public void execute(ExecutionTask task) {
int queueSize = threadPoolExecutor.getQueue().size();
// TODO 队列溢出处理
if (queueSize > queueLimit) {
}
threadPoolExecutor.execute(task);
}
public static class ExecutionTask implements Runnable {
private ExecutionContext executionContext;
private ProviderRepository providerRepository;
ExecutionTask(ExecutionContext executionContext) {
this.executionContext = executionContext;
this.providerRepository = BeansManager.getInstance().getBeanByType(ProviderRepository.class);
}
@Override
public void run() {
RpcRequest rpcRequest = executionContext.getRequest();
ProviderKey providerKey = new ProviderKey(rpcRequest.getContract(), rpcRequest.getImplCode());
Object instance = providerRepository.getProvider(providerKey);
if (instance == null) {
// 无对应的服务单例,抛异常
executionContext.writeException(new InvokeException(
MessageFormat.format("service {0} not found", providerKey)));
return;
}
try {
Object result = invoke(instance, rpcRequest);
executionContext.writeResult(result);
} catch (Exception e) {
executionContext.writeException(new InvokeException(e));
}
}
private Object invoke(Object instance, RpcRequest rpcRequest) {
Method method = ReflectionUtils.findMethod(instance.getClass(), rpcRequest.getMethod(), rpcRequest.getArgsType());
return ReflectionUtils.invokeMethod(method, instance, rpcRequest.getArgs());
}
}
private class ExecutionTaskThreadPoolExecutor extends ThreadPoolExecutor {
ExecutionTaskThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
/**
* TODO 执行前置操作(可扩展,计数同一时刻某方法/某服务并发量)
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
/**
* TODO 执行后置操作(可扩展,计数同一时刻某方法/某服务并发量)
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
}
/**
* TODO 自定义任务队列
*/
private class ExecutionTaskBlockQueue extends LinkedBlockingQueue<Runnable> {
}
}
服务的调用逻辑,同上篇Http方式一样,都是使用“反射调用”的方式。不同点在于,采用了线程池执行,可以应付更高的并发请求。
这里的 ExecutionContext 是服务提供方的执行上下文,不同于服务消费方的 InvocationContext,别搞混了~
其中 executionContext.writeResult(result),同样使用了 TcpMessageEncoder 进行序列化。
响应处理
服务提供方将响应数据通过管道传输给消费方,服务消费方再借助 TcpMessageDecoder 对数据流反序列化获取到 RpcResponse,同样进入 TcpInboundHandler 处理,这些代码之前都分析过了,直接来看 RpcResponse 对应的处理类即可:
package com.menghao.rpc.consumer.handle.tcp;
import com.menghao.rpc.netty.TcpMessageHandler;
import com.menghao.rpc.netty.model.TcpConnection;
import com.menghao.rpc.provider.model.RpcResponse;
import com.menghao.rpc.spring.BeansManager;
/**
* <p>RpcResponse处理类.</br>
* <p>分析相应结果,调用 InvocationContext回写结果</p>
*
* @author MarvelCode
*/
public class RpcResponseMsgHandler implements TcpMessageHandler<RpcResponse> {
@Override
public boolean support(Class<?> supportClass) {
return RpcResponse.class.isAssignableFrom(supportClass);
}
@Override
public void handler(TcpConnection connection, RpcResponse data) {
InvocationContextContainer contextContainer = BeansManager.getInstance().getBeanByType(InvocationContextContainer.class);
InvocationContext invocationContext = contextContainer.get(data.getId());
if (invocationContext != null) {
invocationContext.notifyCompleted(data);
contextContainer.remove(data.getId());
}
}
}
嗯,没错。逻辑很简单,通过“唯一标识”找到对应的 InvocationContext,然后调用 invocationContext.notifyCompleted(data) 以通知调用完成,最后从 InvocationContextContatiner 中移除已经完成的调用上下文。
到此整个基于 Tcp 方式的远程服务调用就分析结束了。
配置
在上篇的基础上,其余配置不变,仅将 marvel.rpc.type 由 http 改为 tcp 即可。
marvel.rpc.type=tcp
总结
到此,我用两篇博文,分别对基于HTTP、TCP两种方式的远程服务调用进行了简要分析。后期会逐渐对代码进行完善,各位如果发现代码有任何的缺陷,或对代码思路有更好的建议,都可以跟我讨论。