文档章节

Rabbit RPC 代码阅读(一)

o
 osc_a22drz29
发布于 2019/03/27 22:32
字数 2166
阅读 14
收藏 0

精选30+云产品,助力企业轻松上云!>>>

前言

因为想对RPC内部的机制作一个了解,特作以下阅读代码日志,以备忘。

RPC介绍

Rabbit RPC 原理可以用3点概括:

1、服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测)。

2、客户端需要开始调用服务的时候,首先去注册中心获取服务信息。

3、客户端创建远程调用连接,连接后服务端返回处理信息。

第3步又可以细分,下面说说远程过程调用的原理:

1、目标:客户端怎么调用远程机器上的公开方法

2、服务发现,向注册中心获取服务(这里需要做的有很多:拿到多个服务时需要做负载均衡,同机房过滤、版本过滤、服务路由过滤、统一网关等);

3、客户端发起调用,将需要调用的服务、方法、参数进行组装; 序列化编码组装的消息,这里可以使用json,也可以使用xml,也可以使用protobuf,也可以使用hessian,几种方案的序列化速度还有序列化后占用字节大小都是选择的重要指标,对内笔者建议使用高效的protobuf,它基于TCP/IP二进制进行序列化,体积小,速度快。

4、传输协议,可以使用传统的IO阻塞传输,也可以使用高效的nio传输(Netty);

5、服务端收到后进行反序列化,然后进行相应的处理;

6、服务端序列化response信息并且返回;

7、客户端收到response信息并且反序列化;

来源:https://www.cnblogs.com/SteveLee/p/rpc_framework_easy.html

下面将针对以上7个步骤所涉及的代码进行遂一解析,过程是痛苦的,结果是让人欣喜的,.Net Core 原来如此强大。

一、客户端怎么调用远程机器上的公开方法

废话不多说,直接开始解析代码

using doteasy.client.Clients;

namespace doteasy.client
{
    internal static class Program
    {
        private static void Main() => RpcClient.TestNoToken();
    }
}

public static void TestNoToken()
{
    using (var proxy = ClientProxy.Generate<IProxyService>(new Uri("http://127.0.0.1:8500")))
    {
        Console.WriteLine($@"{proxy.Sync(1)}");
        Console.WriteLine($@"{proxy.Async(1).Result}");
        Console.WriteLine($@"{proxy.GetDictionaryAsync().Result["key"]}");
    }

    using (var proxy = ClientProxy.Generate<IProxyCommpoundService>(new Uri("http://127.0.0.1:8500")))
    {
        Console.WriteLine($@"{JsonConvert.SerializeObject(proxy.GetCurrentObject(new CompoundObject()))}");
    }
}

核心代码:

using (var proxy = ClientProxy.Generate<IProxyService>(new Uri("http://127.0.0.1:8500")))

其作用为生成一个实现了IProxyService接口的代理类,接着调用远程服务器的Sync,Async,GetDictionaryAsync三个方法,IProxyService的定义如下:

public interface IProxyService : IDisposable
{
    Task<IDictionary<string, string>> GetDictionaryAsync();
    Task<string> Async(int id);
    string Sync(int id);
}

继续查看ClientProxy.Generate的定义:

public static T Generate<T>(Uri consulUrl)
{
    serviceCollection.AddLogging().AddClient().UseDotNettyTransport().UseConsulRouteManager(new RpcOptionsConfiguration
    {
        ConsulClientConfiguration = new ConsulClientConfiguration {Address = consulUrl}
    });

    //返回一个预编译的代理对象
    return Proxy<T>();
}

首先注入相应的接口及需要使用的中间件,接着返回一个预编译的代理对象,我们继续往下,查看Proxy<T>的定义:

private static T Proxy<T>(string accessToken = "")
{
    var serviceProvider = Builder();
#if DEBUG
    serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole((c, l) => (int) l >= Loglevel);
#endif
    var serviceProxyGenerate = serviceProvider.GetRequiredService<IServiceProxyGenerater>();
    var serviceProxyFactory = serviceProvider.GetRequiredService<IServiceProxyFactory>();

    if (accessToken == "")
    {
        
        return serviceProxyFactory
            .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})
                .ToArray()
                .Single(typeof(T).GetTypeInfo().IsAssignableFrom));
    }

    return serviceProxyFactory
        .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)}, accessToken)
            .ToArray()
            .Single(typeof(T).GetTypeInfo().IsAssignableFrom));
}

创建一个代理对象,构造语法树及动态编译,最后返回一个指定的代理对象类。

看来核心是构造语法树及动态编译,返回代理对象类,仔细分析,核心代码为:

serviceProxyFactory
            .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})
                .ToArray()
                .Single(typeof(T).GetTypeInfo().IsAssignableFrom));

我们首先查看.CreateProxy的实现代码

public static T CreateProxy<T>(this IServiceProxyFactory serviceProxyFactory, Type proxyType)
{
    return (T) serviceProxyFactory.CreateProxy(proxyType);
}

public object CreateProxy(Type proxyType)
{
    return proxyType
        .GetTypeInfo()
        .GetConstructors()
        .First()
        .Invoke(
            //ServiceProxyBase类 构造函数传参
            new object[] {_remoteInvokeService, _typeConvertibleService}
        );
}

没啥特别的,返回一个代理对象类,但问题来了,这个代理对象类是怎么生成的呢,原来核心代码在这段: serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})

查看其详细代码:

public IEnumerable<Type> GenerateProxys(IEnumerable<Type> interfaceTypes)
{
    //获取程序集
    var assembles = DependencyContext.Default.RuntimeLibraries
        .SelectMany(i => i.GetDefaultAssemblyNames(DependencyContext.Default)
            .Select(z => Assembly.Load(new AssemblyName(z.Name))));

    assembles = assembles.Where(i => i.IsDynamic == false).ToArray();


    var enumerable = interfaceTypes as Type[] ?? interfaceTypes.ToArray();
    
    //构造语法树
    var trees = enumerable.Select(GenerateProxyTree).ToList();

    //编译语法树
    var stream = CompilationUnits.CompileClientProxy(trees,
        assembles
            .Select(a => MetadataReference.CreateFromFile(a.Location))
            .Concat(new[]
            {
                MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
            }),
        enumerable.ToArray()[0],
        _logger);

    if (stream == null)
    {
        throw new ArgumentException(@"没有生成任何客户端代码", nameof(stream));
    }

    using (stream)
    {
        var className = enumerable.ToArray()[0].Name.StartsWith("I")
            ? enumerable.ToArray()[0].Name.Substring(1)
            : enumerable.ToArray()[0].Name;
        return AppDomain.CurrentDomain.GetAssemblies().Any(x => x.FullName.Contains(className))
            ? Assembly.Load(StreamToBytes(stream)).GetExportedTypes()
            : AssemblyLoadContext.Default.LoadFromStream(stream).GetExportedTypes();
    }
}

核心代码为:

var trees = enumerable.Select(GenerateProxyTree).ToList();

var stream = CompilationUnits.CompileClientProxy(trees,
assembles
    .Select(a => MetadataReference.CreateFromFile(a.Location))
    .Concat(new[]
    {
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
    }),
enumerable.ToArray()[0],
_logger);

首先获取代码对象的语法树,再进行动态编译。 这里有个问题就比较突出了,这个GenerateProxyTree到底是啥?下面那句比较好理解,对trees对象进行编译并返回一个序列化的字节码。

查看GenerateProxyTree的代码

public SyntaxTree GenerateProxyTree(Type interfaceType)
{
    var className = interfaceType.Name.StartsWith("I") ? interfaceType.Name.Substring(1) : interfaceType.Name;
    className += "ClientProxy";

    var members = new List<MemberDeclarationSyntax>
    {
        GetConstructorDeclaration(className)
    };

    var interf = interfaceType.GetInterfaces()[0];
    var mthods = interfaceType.GetMethods();
    if (interf.FullName != null && interf.FullName.Contains("IDisposable"))
    {
        var m = interf.GetMethods()[0];
        var mm = mthods.ToList();
        mm.Add(m);
        mthods = mm.ToArray();
    }

    members.AddRange(GenerateMethodDeclarations(mthods));

    return SyntaxFactory.CompilationUnit().WithUsings(GetUsings()).WithMembers(
        SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
            SyntaxFactory.NamespaceDeclaration(
                SyntaxFactory.QualifiedName(
                    SyntaxFactory.QualifiedName(
                        SyntaxFactory.IdentifierName("Rpc"),
                        SyntaxFactory.IdentifierName("Common")),
                    SyntaxFactory.IdentifierName("ClientProxys"))).WithMembers(
                SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
                    SyntaxFactory.ClassDeclaration(className).WithModifiers(
                        SyntaxFactory.TokenList(SyntaxFactory.Token(SyntaxKind.PublicKeyword))).WithBaseList(
                        SyntaxFactory.BaseList(
                            SyntaxFactory.SeparatedList<BaseTypeSyntax>(new SyntaxNodeOrToken[]
                            {
                                SyntaxFactory.SimpleBaseType(
                                    SyntaxFactory.IdentifierName("ServiceProxyBase")),
                                SyntaxFactory.Token(SyntaxKind.CommaToken),
                                SyntaxFactory.SimpleBaseType(GetQualifiedNameSyntax(interfaceType))
                            }))).WithMembers(
                        SyntaxFactory.List(members)))))).NormalizeWhitespace().SyntaxTree;

}
        

这段代码都干了些啥?仔细分析,必须先搞明白两个东西

1、SyntaxFactory 类是干什么的,有什么作用?

查了下官网: A class containing factory methods for constructing syntax nodes, tokens and trivia. 构造语法树的类,这是什么东西,在网上查了查,知道了个大概 https://johnkoerner.com/csharp/creating-code-using-the-syntax-factory/

static void Main(string[] args)
{
    var console = SyntaxFactory.IdentifierName("Console");
    var writeline = SyntaxFactory.IdentifierName("WriteLine");
    var memberaccess = SyntaxFactory.MemberAccessExpression(SyntaxKind.SimpleMemberAccessExpression, console, writeline);

    var argument = SyntaxFactory.Argument(SyntaxFactory.LiteralExpression(SyntaxKind.StringLiteralExpression, SyntaxFactory.Literal("A")));
    var argumentList = SyntaxFactory.SeparatedList(new[] { argument });

    var writeLineCall =
        SyntaxFactory.ExpressionStatement(
        SyntaxFactory.InvocationExpression(memberaccess,
        SyntaxFactory.ArgumentList(argumentList)));

    var text = writeLineCall.ToFullString();

    Console.WriteLine(text);
    Console.ReadKey();
}

代码拉出来运行了下,心里已经有谱了,这不会就是像JS那样动态的在页面输出JS,JS动态执行吧。SyntaxFactory就是构造C#语法树的类,所有输入的代码都可以转化成相应的对象语法节点。

2、return的类型为SyntaxTree,这个是干什么的,有什么作用?

最终返回的是一个SyntaxTree,先查下官网的解释:

The parsed representation of a source document

再看看有哪些方法: GetText(CancellationToken) Gets the text of the source document。

构造的语法树,返回相应的源代码,好,我们把代码进行相应的改动,看看返回的是什么东西:

SyntaxTree GPT = SyntaxFactory.CompilationUnit().WithUsings(GetUsings()).WithMembers(
        SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
            SyntaxFactory.NamespaceDeclaration(
                SyntaxFactory.QualifiedName(
                    SyntaxFactory.QualifiedName(
                        SyntaxFactory.IdentifierName("Rpc"),
                        SyntaxFactory.IdentifierName("Common")),
                    SyntaxFactory.IdentifierName("ClientProxys"))).WithMembers(
                SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
                    SyntaxFactory.ClassDeclaration(className).WithModifiers(
                        SyntaxFactory.TokenList(SyntaxFactory.Token(SyntaxKind.PublicKeyword))).WithBaseList(
                        SyntaxFactory.BaseList(
                            SyntaxFactory.SeparatedList<BaseTypeSyntax>(new SyntaxNodeOrToken[]
                            {
                                SyntaxFactory.SimpleBaseType(
                                    SyntaxFactory.IdentifierName("ServiceProxyBase")),
                                SyntaxFactory.Token(SyntaxKind.CommaToken),
                                SyntaxFactory.SimpleBaseType(GetQualifiedNameSyntax(interfaceType))
                            }))).WithMembers(
                        SyntaxFactory.List(members)))))).NormalizeWhitespace().SyntaxTree;

    string SourceText = GPT.GetText().ToString();
    return GPT;

执行,看看SourceText是啥:

using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using DotEasy.Rpc.Core.Runtime.Communally.Convertibles;
using DotEasy.Rpc.Core.Runtime.Client;
using DotEasy.Rpc.Core.Runtime.Communally.Serialization;
using DotEasy.Rpc.Core.Proxy.Impl;

namespace Rpc.Common.ClientProxys
{
    public class ProxyServiceClientProxy : ServiceProxyBase, doteasy.rpc.interfaces.IProxyService
    {
        public ProxyServiceClientProxy(IRemoteInvokeService remoteInvokeService, ITypeConvertibleService typeConvertibleService): base (remoteInvokeService, typeConvertibleService)
        {
        }

        public async Task<IDictionary<System.String, System.String>> GetDictionaryAsync()
        {
            return await InvokeAsync<IDictionary<System.String, System.String>>(new Dictionary<string, object>{}, "doteasy.rpc.interfaces.IProxyService.GetDictionaryAsync");
        }

        public async Task<System.String> Async(System.Int32 id)
        {
            return await InvokeAsync<System.String>(new Dictionary<string, object>{{"id", id}}, "doteasy.rpc.interfaces.IProxyService.Async_id");
        }

        public System.String Sync(System.Int32 id)
        {
            return Invoke<System.String>(new Dictionary<string, object>{{"id", id}}, "doteasy.rpc.interfaces.IProxyService.Sync_id");
        }

        public void Dispose()
        {
        }
    }
}

看完后,确定SyntaxFactory及SyntaxTree的作用就是构造C#源码了。

最后返回的是一个构造好的SyntaxTree对象。

var stream = CompilationUnits.CompileClientProxy(trees,
assembles
    .Select(a => MetadataReference.CreateFromFile(a.Location))
    .Concat(new[]
    {
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
    }),
enumerable.ToArray()[0],
_logger);
        

CompilationUnits.CompileClientProxy 方法对 SyntaxTree 对象进行编译

public static MemoryStream CompileClientProxy(IEnumerable<SyntaxTree> trees, 
            IEnumerable<MetadataReference> references, Type interfaceType, 
            ILogger logger = null)
{
    references = new[]
    {
        MetadataReference.CreateFromFile(typeof(string).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(ServiceDescriptor).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(IRemoteInvokeService).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(IServiceProxyGenerater).GetTypeInfo().Assembly.Location)
    }.Concat(references);
    
    var className = interfaceType.Name.StartsWith("I") ? interfaceType.Name.Substring(1) : interfaceType.Name;
    return Compile(AssemblyInfo.Create($"DotEasy.Rpc.{className}Proxys"), trees, references, logger);
}

public static MemoryStream Compile(AssemblyInfo assemblyInfo, IEnumerable<SyntaxTree> trees, IEnumerable<MetadataReference> references,
            ILogger logger = null) => Compile(assemblyInfo.Title, assemblyInfo, trees, references, logger);

public static MemoryStream Compile(string assemblyName, AssemblyInfo assemblyInfo, IEnumerable<SyntaxTree> trees,
    IEnumerable<MetadataReference> references, ILogger logger = null)
{
    trees = trees.Concat(new[] {GetAssemblyInfo(assemblyInfo)});
    var compilation = CSharpCompilation.Create(assemblyName, trees, references, new CSharpCompilationOptions(OutputKind.DynamicallyLinkedLibrary));

    var stream = new MemoryStream();
    var result = compilation.Emit(stream);
    if (!result.Success && logger != null)
    {
        foreach (var message in result.Diagnostics.Select(i => i.ToString()))
        {
            logger.LogError(message);
            Console.WriteLine(message);
        }

        return null;
    }

    stream.Seek(0, SeekOrigin.Begin);
    return stream;
}
        

最终返回一个MemoryStream流,再接着往下看:

var stream = CompilationUnits.CompileClientProxy(trees,
    assembles
        .Select(a => MetadataReference.CreateFromFile(a.Location))
        .Concat(new[]
        {
            MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
        }),
    enumerable.ToArray()[0],
    _logger);
    
    if (stream == null)
    {
    throw new ArgumentException(@"没有生成任何客户端代码", nameof(stream));
    }
    
    using (stream)
    {
    var className = enumerable.ToArray()[0].Name.StartsWith("I")
        ? enumerable.ToArray()[0].Name.Substring(1)
        : enumerable.ToArray()[0].Name;
    return AppDomain.CurrentDomain.GetAssemblies().Any(x => x.FullName.Contains(className))
        ? Assembly.Load(StreamToBytes(stream)).GetExportedTypes()
        : AssemblyLoadContext.Default.LoadFromStream(stream).GetExportedTypes();
    }
            

最后根据 MemoryStream 生成我们所需要的代理类。

至此,客户端构造代理类的流程基本弄清楚。

接下来会分析:

protected async Task<T> InvokeAsync<T>(IDictionary<string, object> parameters, string serviceId)
{
    var message = await _remoteInvokeService.InvokeAsync(new RemoteInvokeContext
    {
        InvokeMessage = new RemoteInvokeMessage
        {
            Parameters = parameters,
            ServiceId = serviceId
        }
    });

    if (message == null) return default(T);

    var result = _typeConvertibleService.Convert(message.Result, typeof(T));

    return (T) result;
}

/// <summary>
/// 非异步远程调用
/// </summary>
/// <param name="parameters"></param>
/// <param name="serviceId"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
// ReSharper disable once UnusedMember.Global
protected T Invoke<T>(IDictionary<string, object> parameters, string serviceId)
{
    var message = _remoteInvokeService.InvokeAsync(new RemoteInvokeContext
    {
        InvokeMessage = new RemoteInvokeMessage
        {
            Parameters = parameters,
            ServiceId = serviceId
        }
    }).Result;

    if (message == null) return default(T);
    var result = _typeConvertibleService.Convert(message.Result, typeof(T));
    return (T) result;
}

具体是如何远程调用的.

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
在Solaris 10环境下安装rabbitMq后无法新建用户

在Solaris 10系统上安装 rabbitMq 3.6.1,使用的是rabbitMq官网上推荐的rabbitmq-server-generic-unix-3.6.1版本。修改了shell版本以后第一次启动创建guest用户的时候报如下错误: =INFO RE...

wo思远ow
2016/03/10
450
0
Ubuntu 18.04 LTS上安装NFS服务器和客户端

NFS是基于UDP/IP协议的应用,其实现主要是采用远程过程调用RPC机制,RPC提供了一组与机器、操作系统以及低层传送协议无关的存取远程文件的操作。RPC采用了XDR的支持。XDR是一种与机器无关的数...

osc_agn9pfb0
2019/12/10
2
0
RPC使用rabbitmq实现

两天时间重写公司架构在本地实现测试学习 双向连接客户端和服务端配置: 连接rabbitmq服务器 定义消息队列 配置发送请求的模板:交换机、消息队列。 配置监听处理:监听的队列、消息转换处理...

osc_c0g7cjrk
2018/05/12
2
0
openstack I版的搭建九--cinder

我们现在还差openstack的最后一个模块cinder模块,存储节点,那大家现在就有疑问了,虚拟机的存在哪里呢? 在结算节点上,即是: [root@linux-node2 ~]# ll /var/lib/nova/instances/ 总用量...

犀首
2014/08/24
0
0
rabbitmq启动失败,noproc

rabbitmq启动失败 [root@localhost bin]# rabbitmq-server BOOT FAILED =========== Error description: noproc Log files (may contain more information): /var/log/rabbitmq/rabbit@local......

code强迫症xr
2018/08/09
9.9K
2

没有更多内容

加载失败,请刷新页面

加载更多

dict.items()和dict.iteritems()有什么区别?

问题: Are there any applicable differences between dict.items() and dict.iteritems() ? dict.items()和dict.iteritems()之间是否有适用的区别? From the Python docs: 从Python文档中......

法国红酒甜
57分钟前
20
0
R中“ =”和“ <-”赋值运算符有什么区别?

问题: What are the differences between the assignment operators = and <- in R? R中赋值运算符=和<-之间有什么区别? I know that operators are slightly different, as this example ......

fyin1314
今天
20
0
之间的区别 和

问题: I'm learning Spring 3 and I don't seem to grasp the functionality behind <context:annotation-config> and <context:component-scan> . 我正在学习Spring 3,并且似乎不太了解<......

javail
今天
15
0
业内首款,百度工业视觉智能平台全新亮相

本文作者:y****n 业内首款全国产化工业视觉智能平台——百度工业视觉智能平台亮相中国机器视觉展(Vision China),该平台所具有的核心AI能力完全自主可控,在质检、巡检等场景中具有高效、...

百度开发者中心
昨天
7
0
我们如何制作xkcd样式图? - How can we make xkcd style graphs?

问题: Apparently, folk have figured out how to make xkcd style graphs in Mathematica and in LaTeX . 显然,民间已经想出了如何在Mathematica和LaTeX中制作xkcd风格的图形。 Can we d......

富含淀粉
今天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部