蚂蚁调度AntJob-分布式任务调度系统

01/01 10:17
阅读数 301

分布式任务调度系统,纯NET打造的重量级大数据实时计算平台,万亿级调度经验积累!面向中小企业大数据分析场景。

开源地址:https://github.com/NewLifeX/AntJob

使用教程:https://www.yuque.com/smartstone/blood/antjob

体验地址:http://ant.newlifex.com

功能特点

AntJob的核心是蚂蚁算法把任意大数据拆分成为小块,采用蚂蚁搬家策略计算每一块!

(蚂蚁搬家,一个馒头掉在地上,众多小蚂蚁会把馒头掰成小块小块往家里般!)

该算法设计于2008年,最开始用于处理基金公司的短信/邮件/传真群发(每批两百万)和电话话费分析(上百种国际长途计费规则),数据量不算大,但是有一定复杂度,并且要求支持持续处理(实时计算)以及出错重试。

2016年在中通快递某产品项目中使用该算法进行大数据实时计算,成功挑战每日1200万的订单。并进一步发展衍生成为重量级实时计算平台,集分布式计算、集群调度、配置中心、负载均衡、故障转移、跨机房冗余、作业监控告警、百亿级数据清洗、超大Redis缓存(>2T)于一身,于2019年达到每年万亿级计算量(2019年双十一日订单量破亿)。

AntJob是开源简化版,仅提供分布式计算和集中调度能力,支持百亿级调度(需要改造)。

AntJob主要功能点:

  1. 作业处理器。每一个最小业务模块实现一个处理器类,用于处理这一类作业。例如同步数据表时,每张表写一个处理器类,并在调度中心注册一个作业,调度中心按照作业时间切片得到任务,然后把任务(主要包含时间区间)分派给各个计算节点上的处理器类执行。又如,每天汇总计算是一个作业,而每月汇总计算又是另一个作业;

  2. 任务上下文。作业处理器类实例化以后,将反复向调度中心申请任务来执行,每个任务的上下文核心数据是时间区间(数据调度)、时间点(定时调度)、消息体(消息调度)。调度中心记录任务处理结果;

  3. 数据切片。支持按照时间区间(如5秒)把大数据切分为小片,也即是数据调度,处理过最大单表60亿行;

  4. 定时调度。支持定时执行(秒级)指定业务逻辑,每个执行时间点得到一个任务;

  5. 任务重试。每个任务完整记录处理结果,失败任务在延迟一段时间后将会自动重新分派(可能由原节点或其它节点执行);

  6. 任务重置。支持批量重置已执行完成的任务,让其再次执行处理;

  7. 作业面板。在Web控制台上可查看每个应用所有作业的运行状态,或修改参数;

  8. 作业重置。调整作业参数,让其再次处理某段时间的任务数据,例如重算过去一个月的数据;

其它细节功能将穿插在以下各主要功能点中进行讲解。

定时调度

以下源码位于 https://github.com/NewLifeX/AntJob/tree/master/Samples/HisAgent 

新建项目

新建.net core 3.1项目,从nuget引用 AntJob。实例化一个调度器Scheduler,配置网络提供者。

using System;
using AntJob;
using AntJob.Providers;
using NewLife.Log;
namespace HisAgent
{
    class Program
    {
        static void Main(string[] args)
        {
            XTrace.UseConsole();
            var set = AntSetting.Current;
            // 实例化调度器
            var sc = new Scheduler();
            // 使用分布式调度引擎替换默认的本地文件调度
            sc.Provider = new NetworkJobProvider
            {
                Server = set.Server,
                AppID = set.AppID,
                Secret = set.Secret,
            };
            // 添加作业处理器
            sc.Handlers.Add(new HelloJob());
            // 启动调度引擎,调度器内部多线程处理
            sc.Start();
            Console.WriteLine("OK!");
            Console.ReadKey();
        }
    }
}

然后添加第一个定时调度的作业处理器

using System;
using AntJob;
namespace HisAgent
{
    internal class HelloJob : Handler
    {
        public HelloJob()
        {
            // 今天零点开始,每10秒一次
            var job = Job;
            job.Start = DateTime.Today;
            job.Step = 10;
        }
        protected override Int32 Execute(JobContext ctx)
        {
            // 当前任务时间
            var time = ctx.Task.Start;
            WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time);
            // 成功处理数据量
            return 1;
        }
    }
}

作业处理器必须继承自Handler,并且重写Execute实现业务逻辑。

我们这里的业务逻辑就是输出一行日志,其中的ctx.Task就是切分得到的任务上下文,Start是时间点。

构造函数中设定的开始时间和步进Step,仅用于首次注册作业到调度中心,后面就没有用处了。

为了编译观察,修改项目输出目录,在项目文件上点右键选“编辑项目文件”

<PropertyGroup>
  <OutputType>Exe</OutputType>
  <TargetFramework>netcoreapp3.1</TargetFramework>
  <AssemblyVersion>1.0.*</AssemblyVersion>
  <Deterministic>false</Deterministic>
  <OutputPath>..\..\Bin\HisAgent</OutputPath>
  <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
</PropertyGroup>

编译执行

代码能编译通过,先跑起来看看

可以看到,调度器首先连接 tcp://127.0.0.1:9999,其次 tcp://ant.newlifex.com:9999 ,而上面代码中并没有提及这两个地址。其实这就是调度中心地址,默认本地用于调试,如果链接失败再连接公开版调度中心,位于配置文件中:

/// <summary>蚂蚁配置。主要用于网络型调度系统</summary>
[Config("Ant")]
public class AntSetting : Config<AntSetting>
{
    #region 属性
    /// <summary>调试开关。默认false</summary>
    [Description("调试开关。默认false")]
    public Boolean Debug { get; set; }
    /// <summary>调度中心。逗号分隔多地址,主备架构</summary>
    [Description("调度中心。逗号分隔多地址,主备架构")]
    public String Server { get; set; } = "tcp://127.0.0.1:9999,tcp://ant.newlifex.com:9999";
    /// <summary>应用标识。调度中心以此隔离应用,默认当前应用</summary>
    [Description("应用标识。调度中心以此隔离应用,默认当前应用")]
    public String AppID { get; set; }
    /// <summary>应用密钥。</summary>
    [Description("应用密钥。")]
    public String Secret { get; set; }
    #endregion
    #region 方法
    /// <summary>重载</summary>
    protected override void OnLoaded()
    {
        if (AppID.IsNullOrEmpty())
        {
            var asm = Assembly.GetEntryAssembly();
            if (asm != null) AppID = asm.GetName().Name;
        }
        base.OnLoaded();
    }
    #endregion
}

其实上面Main函数中已经看到从配置文件里面读取Server+AppID+Secret,该配置类读取的配置文件在这:

AppID默认取本应用名,Secret由调度中心生成并下发。

调度中心默认打开自动注册AutoRegistry,任意应用登录时自动注册,省去人工配置应用账号的麻烦。

企业内部正式场景使用时,为安全期间,建议关闭自动注册。

再来看看前面跑起来的日志

21:33:08.470  1 N - 启动任务调度引擎[AntJob.Providers.NetworkJobProvider],作业[1]项,定时5秒
21:33:08.471  1 N - HelloJob 开始工作 False 区间(2020-04-09 00:00:00, 0001-01-01 00:00:00) Offset=15 Step=10 MaxTask=8
21:33:08.587  5 Y Job HelloJob 停止工作
21:33:09.467  7 Y T [180.174.185.180:53926]上线!X3

启动了调度引擎,带有一个作业;

作业HelloJob,就是我们通过 sc.Handlers.Add(new HelloJob()) 添加进去的作业处理器实例;

HelloJob状态False,处于停止工作状态,那是因为作业注册后,默认都是停止状态,需要去web控制台配置参数后手工开启;

最后一个xxx上线,这是蚂蚁调度的Peers功能,可以探测得到当前应用下所有已连接节点的状态。当HisAgent部署于多个服务器时,每个进程都可以通过Peers得知其它节点的存在;

作业管理

不用关闭HistAgent客户端窗口,我们去线上web控制台看看 http://ant.newlifex.com/

可以看到应用节点在线,点击应用名进去作业面板

这就是我们的HelloJob作业,对应HisAgent中的HelloJob作业处理器。

它处于停用状态,下一次执行时间是 00:00:00 ,也就是今天零点,加上10秒步进,也远小于当前时间,因此,只要启用该作业,调度中心将会马上开始切分任务,并分派给客户去执行。

我们来点击红色叉叉,让它改变为启用状态

几秒后,客户端HisAgent欢快地跑起来!它正在以10秒间隔不断切分并执行任务。

刷新作业面板,可以看到,开始时间已经变为当前附近的时间,右边也有了执行次数。

点击作业名HelloJob,进去查看任务明细

任务切分后,插入作业任务表,此时状态为“就绪”,等待分发给客户端执行。

客户端执行后,向调度中心报告执行结果,可能“完成”,可能“错误”。

错误的任务,会在1分钟后,重新执行,最多连续错误10次。

随系统自动启动

至今我们仍然使用控制台来跑调度程序,怎么样实现稳定可靠的自动化处理呢?

那就必须解决随系统自动启动,以及进程守护(包括Windows和Linux)的问题。

这里推荐 NewLife.Agent,可以把调度程序包装成为一个 Windows服务,或者Linux守护进程,支持看门狗守护。

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/agent

多节点部署时,推荐 星尘Stardust 中的星尘代理 StarAgent,调度程序无需修改继续使用控制台,由StarAgent负责拉起进程并守护,同时Stardust支持远程多节点部署以及集中监控。

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/blood/stardust

双跑,沸腾吧,分布式计算

再开两个HisAgent进程,查看应用在线表,可以看到有三个节点在线。

HisAgent控制台中,可以看到各自都有机会分配了任务,每个任务有且仅有一个节点执行。

刷新作业HelloJob的任务列表,可以看到不同客户端执行了不同的任务。

调度中心

公开版调度中心 http://ant.newlifex.com 仅用于开发测试,不建议用于生产场景。各企业内部应该自己部署调度中心。

获取AntJob源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Server,然后跑起来 AntServer.exe

这是一个标准的NewLife.Agent应用,可以选择2安装为Windows服务(需要管理员权限),或者Linux守护进程(需要root权限)。这里仅为了测试,选择5循环调试,直接跑起来核心业务:

可以看到AntServer在tcp/udp/ipv6上监听了9999接口,下方是它所使用的RPC接口。除了前三个内置接口意外,AntJob的接口也就7个,非常简单!

ApiServer的具体内容可参考

此处为语雀文档,点击链接查看:https://www.yuque.com/smartstone/nx/api_server

再启动一个HisAgent

可以看到,它自动连接了本机这个调度中心,因为配置文件Server里面,127写在第一位!

AntJob客户端支持调度中心的故障转移,配置多个服务端,其中一个断开后,自动选择下一个。

配置文件 Config\AntJob.config 很简单,只有端口和自动注册开关。

<?xml version="1.0" encoding="utf-8"?>
<Setting>
  <!--调试开关。默认true-->
  <Debug>true</Debug>
  <!--端口-->
  <Port>9999</Port>
  <!--自动注册。任意应用登录时自动注册,省去人工配置应用账号的麻烦,默认true-->
  <AutoRegistry>true</AutoRegistry>
</Setting>

多年使用经验来看,还没遇到过需要关闭自动注册的情况,毕竟都是在企业内网。

推荐部署两套调度中心,一套Web控制台,共用MySql数据库!

如果服务器足够多,或者为了跨机房,部署4套8套也是可以的。

Web控制台

为了查看作业任务状态,以及调整参数,控制作业启停,需要借助控制台。

获取源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Web,执行 AntWeb.exe。也可以访问公开版AntJob控制台 http://ant.newlifex.com/ 。

首先可以看到应用管理,点击应用名进去应用面板,管理该应用底下的作业。

双击应用所在行空白处,可查看修改应用信息

应用在线记录每个应用实例(应用可以多跑)的实时状态,应用历史记录操作历史。

任务处理过程中,如果抛出异常,将会上报给调度中心,标记任务为“错误”状态,同时把错误信息记录到作业错误中来。也可以通过应用或作业的快捷方式链接进来。

应用消息用于消息调度,消息生产者把消息推送给调度中心时,就是存储在“应用消息”数据表中,消费的时候取出来,创建消息型任务,并从“应用消息”表中删除。

数据调度

数据调度时AntJob毫无疑问的首席角色,它的使用占比超过70%,可见其重要程度。

为了方便处理大数据,我们需要新建一些辅助项目,数据结构来自某医院。

新建数据集项目

新建 .netstandard2.0 类库项目,nuget引用 NewLife.XCode,准备医院的模型文件:

<?xml version="1.0" encoding="utf-8"?>
<Tables Version="9.16.7398.1902" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" xs:schemaLocation="http://www.newlifex.com http://www.newlifex.com/Model2020.xsd" NameSpace="HisData" ConnName="His" Output="Entity" BaseClass="Entity" IgnoreNameCase="True" xmlns="http://www.newlifex.com/Model2020.xsd">
  <Table Name="ZYBH0" Description="病人基本信息" IgnoreNameCase="False">
    <Columns>
      <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" />
      <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Master="True" Description="病人ID" />
      <Column Name="XM" DataType="String" Description="姓名" />
      <Column Name="Ryrq" ColumnName="RYRQ" DataType="Int32" Description="入院日期" />
      <Column Name="Cyrq" ColumnName="CYRQ" DataType="Int32" Description="出院日期" />
      <Column Name="Sfzh" ColumnName="SFZH" DataType="String" Description="身份证号" />
      <Column Name="FB" DataType="String" Description="费用类别" />
      <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" />
      <Column Name="Flag" ColumnName="FLAG" DataType="Int32" Description="标记" />
      <Column Name="Remark" DataType="String" Length="500" Description="内容" />
      <Column Name="CreateUser" DataType="String" Description="创建者" />
      <Column Name="CreateUserID" DataType="Int32" Description="创建者" />
      <Column Name="CreateTime" DataType="DateTime" Description="创建时间" />
      <Column Name="CreateIP" DataType="String" Description="创建地址" />
      <Column Name="UpdateUser" DataType="String" Description="更新者" />
      <Column Name="UpdateUserID" DataType="Int32" Description="更新者" />
      <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" />
      <Column Name="UpdateIP" DataType="String" Description="更新地址" />
    </Columns>
    <Indexes>
      <Index Columns="BHID" Unique="True" />
    </Indexes>
  </Table>
  <Table Name="ZYBHYZ0" Description="病人医嘱信息" IgnoreNameCase="False">
    <Columns>
      <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" />
      <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" />
      <Column Name="Mgroupid" ColumnName="MGROUPID" DataType="Int32" Master="True" Description="医嘱组号" />
      <Column Name="Kyzrq" ColumnName="KYZRQ" DataType="Int32" Description="开医嘱日期" />
      <Column Name="Tyzrq" ColumnName="TYZRQ" DataType="Int32" Description="停医嘱日期" />
      <Column Name="Kyzys" ColumnName="KYZYS" DataType="String" Description="开医嘱医生" />
      <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" />
      <Column Name="CreateUser" DataType="String" Description="创建者" />
      <Column Name="CreateUserID" DataType="Int32" Description="创建者" />
      <Column Name="CreateTime" DataType="DateTime" Description="创建时间" />
      <Column Name="CreateIP" DataType="String" Description="创建地址" />
      <Column Name="UpdateUser" DataType="String" Description="更新者" />
      <Column Name="UpdateUserID" DataType="Int32" Description="更新者" />
      <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" />
      <Column Name="UpdateIP" DataType="String" Description="更新地址" />
    </Columns>
    <Indexes>
      <Index Columns="BHID,MGROUPID" Unique="True" />
      <Index Columns="BHID" />
    </Indexes>
  </Table>
  <Table Name="ZYBHYZ1" Description="病人医嘱明细信息" IgnoreNameCase="False">
    <Columns>
      <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" />
      <Column Name="Dgroupid" ColumnName="DGROUPID" DataType="Int32" Master="True" Description="医嘱组号" />
      <Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="医嘱编码" />
      <Column Name="Yzmc" ColumnName="YZMC" DataType="String" Description="医嘱名称" />
      <Column Name="DJ" DataType="Decimal" Description="单价" />
      <Column Name="SL" DataType="Double" Description="数量" />
      <Column Name="FY" DataType="Decimal" Description="费用" />
      <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" />
      <Column Name="CreateUser" DataType="String" Description="创建者" />
      <Column Name="CreateUserID" DataType="Int32" Description="创建者" />
      <Column Name="CreateTime" DataType="DateTime" Description="创建时间" />
      <Column Name="CreateIP" DataType="String" Description="创建地址" />
      <Column Name="UpdateUser" DataType="String" Description="更新者" />
      <Column Name="UpdateUserID" DataType="Int32" Description="更新者" />
      <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" />
      <Column Name="UpdateIP" DataType="String" Description="更新地址" />
    </Columns>
    <Indexes>
      <Index Columns="DGROUPID,YZBM" Unique="True" />
      <Index Columns="DGROUPID" />
    </Indexes>
  </Table>
  <Table Name="ZYYFQLD" Description="病人药房请领单分月表202001" IgnoreNameCase="False">
    <Columns>
      <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" />
      <Column Name="Qlrq" ColumnName="QLRQ" DataType="Int32" Description="请领日期" />
      <Column Name="Qlsj" ColumnName="QLSJ" DataType="Int32" Description="请领时间" />
      <Column Name="Ksbm" ColumnName="KSBM" DataType="String" Description="请领科室" />
      <Column Name="Yzgroupid" ColumnName="YZGROUPID" DataType="Int32" Description="医嘱ID" />
      <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" />
      <Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="药品编码" />
      <Column Name="DJ" DataType="Decimal" Description="单价" />
      <Column Name="SL" DataType="Double" Description="请领数量" />
      <Column Name="Yfbm" ColumnName="YFBM" DataType="String" Description="发药药房" />
      <Column Name="Fyrq" ColumnName="FYRQ" DataType="Int32" Description="发药日期" />
      <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" />
      <Column Name="Remark" DataType="String" Length="500" Description="内容" />
      <Column Name="CreateUser" DataType="String" Description="创建者" />
      <Column Name="CreateUserID" DataType="Int32" Description="创建者" />
      <Column Name="CreateTime" DataType="DateTime" Description="创建时间" />
      <Column Name="CreateIP" DataType="String" Description="创建地址" />
      <Column Name="UpdateUser" DataType="String" Description="更新者" />
      <Column Name="UpdateUserID" DataType="Int32" Description="更新者" />
      <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" />
      <Column Name="UpdateIP" DataType="String" Description="更新地址" />
    </Columns>
    <Indexes>
      <Index Columns="BHID" />
    </Indexes>
  </Table>
  <Table Name="ZDSF" Description="收费字典" IgnoreNameCase="False">
    <Columns>
      <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" />
      <Column Name="BM" DataType="String" Master="True" Nullable="False" Description="编码" />
      <Column Name="DH" DataType="String" Description="拼音码" />
      <Column Name="MC" DataType="String" Description="名称" />
      <Column Name="DJ" DataType="Decimal" Description="单价" />
      <Column Name="DW" DataType="String" Description="单位" />
      <Column Name="Mzyflb" ColumnName="MZYFLB" DataType="Int32" Description="门诊费用类别" />
      <Column Name="Zyfylb" ColumnName="ZYFYLB" DataType="Int32" Description="住院费用类别" />
      <Column Name="Zfbl" ColumnName="ZFBL" DataType="Double" Description="自费比例" />
      <Column Name="CreateUser" DataType="String" Description="创建者" />
      <Column Name="CreateUserID" DataType="Int32" Description="创建者" />
      <Column Name="CreateTime" DataType="DateTime" Description="创建时间" />
      <Column Name="CreateIP" DataType="String" Description="创建地址" />
      <Column Name="UpdateUser" DataType="String" Description="更新者" />
      <Column Name="UpdateUserID" DataType="Int32" Description="更新者" />
      <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" />
      <Column Name="UpdateIP" DataType="String" Description="更新地址" />
    </Columns>
    <Indexes>
      <Index Columns="BM" Unique="True" />
    </Indexes>
  </Table>
</Tables>

再去找一个 build_netcore.tt 的T4模板,可以这里下载 http://x.newlifex.com/XCode_BuildModel.zip 。也可以从AntJob.Web项目中拷贝一个。

记得把build_netcore.tt在vs文件属性的自定义工具设置为TextTemplatingFileGenerator。

由于netstandard项目输出目录中不包括XCode.dll等引用程序集,因此build.tt需要改一下

<#@ template language="C#" hostSpecific="true" debug="true" #>
<#@ assembly name="netstandard" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\NewLife.Core.dll" #>
<#@ assembly name="$(ProjectDir)\..\..\DLL\XCode.dll" #>
<#@ import namespace="System.Diagnostics" #>
<#@ import namespace="System.IO" #>
<#@ import namespace="XCode.Code" #>
<#@ output extension=".log" #>
<#
    // 设置当前工作目录
    PathHelper.BasePath = Host.ResolvePath(".");
    // 导入模型文件并生成实体类,模型文件、输出目录、命名空间、连接名、中文文件名、表名字段名大小写
    //EntityBuilder.Build(String xmlFile = null, String output = null, String nameSpace = null, String connName = null, Boolean? chineseFileName = true,Boolean? nameIgnoreCase = null);
    EntityBuilder.Build();
    //var tables = DAL.ImportFrom("Company.Project.xml");
    //EntityBuilder.Build(tables);
#>

我们从AntJob.Web中拷贝两个文件 NewLife.Core.dll 和 XCode.dll 到外面的DLL目录中,供build.tt调用。如果实际目录不同,可以修改build.tt文件的指向。

在build.tt文件上右键,执行自定义工具,即可生成一批实体类。

编译通过

新建Web项目

新建 .netcore3.1 的web项目,Nuget引用 NewLife.Cube.Core,并引用项目HisData。

该Web项目主要用于查看和管理那些数据表的数据。

修改Main函数,增加 XTrace.UseConsole,用于拦截所有日志

public class Program
{
    public static void Main(string[] args)
    {
        XTrace.UseConsole();
        CreateHostBuilder(args).Build().Run();
    }
    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder.UseStartup<Startup>();
            });
}

Startup.cs 中引用魔方,services.AddCube()/app.UseCube()

编译运行,浏览器访问 http://localhost:5000/Admin

可以看到魔方跑起来了,但是还没有我们需要的数据页面。

新建His控制器区域

区域文件内容:

using System;
using System.ComponentModel;
using NewLife.Cube;
namespace HisWeb.Areas.His
{
    [DisplayName("医院管理")]
    public class HisArea : AreaBase
    {
        public HisArea() : base(nameof(HisArea).TrimEnd("Area")) { }
        static HisArea() => RegisterArea<HisArea>();
    }
}

为每个实体类新建一个控制器,如下

using HisData;
using NewLife.Cube;
namespace HisWeb.Areas.His.Controllers
{
    [HisArea]
    public class ZYBH0Controller : EntityController<ZYBH0>
    {
        static ZYBH0Controller() => MenuOrder = 100;
    }
}

编译项目,执行 HisWeb.exe,刷新浏览器页面,即可看到每张数据表对应了一个页面。

生成病人数据

在引用项目HisData。

新建一个作业处理器 BuildPatient 用于随机生成病人

using System;
using System.Collections.Generic;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{
    internal class BuildPatient : Handler
    {
        public BuildPatient()
        {
            var job = Job;
            job.Start = DateTime.Today;
            job.Step = 15;
        }
        protected override Int32 Execute(JobContext ctx)
        {
            // 随机造几个病人
            var count = Rand.Next(1, 9);
            var list = new List<ZYBH0>();
            for (var i = 0; i < count; i++)
            {
                var time = DateTime.Now.AddSeconds(Rand.Next(-30 * 24 * 3600, 0));
                var time2 = time.AddSeconds(Rand.Next(3600, 10 * 24 * 3600));
                var pi = new ZYBH0
                {
                    Bhid = Rand.Next(999999),
                    XM = Rand.NextString(8),
                    Ryrq = time,
                    Cyrq = time2,
                    Sfzh = Rand.NextString(18),
                    FB = Rand.NextString(6),
                    State = Rand.Next(8),
                    Flag = Rand.Next(2),
                };
                list.Add(pi);
            }
            list.Insert(true);
            // 成功处理数据量
            return count;
        }
    }
}

主函数中把该处理器添加到调度器

编译运行,HisAgent将在控制台新增一个作业,把它启用

很快,作业处理器就跑起来了

每次定时任务所添加病人数时随机的,我们通过Execute返回,记录着控制台作业任务表的“成功”字段。

去HisWeb中看看数据

很不幸,啥也没有……

原来,我们并没有配置数据库连接字符串,各个应用就会默认使用SQLite数据库,位于自己目录中,HisAgent生成的数据,HisWeb自然就无法访问了。

修改HisWeb输出目录,让它跟HisAgent并排

  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <AssemblyVersion>1.0.*</AssemblyVersion>
    <Deterministic>false</Deterministic>
    <OutputPath>..\..\Bin\HisWeb</OutputPath>
    <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath>
  </PropertyGroup>

修改配置文件appsettings.json给的链接字符串

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "His": {
      "connectionString": "Data Source=..\\Hisagent\\Data\\His.db",
      "providerName": "SQLite"
    }
  }
}

重新跑起来后,成功看到病人数据

清洗病人数据

由于HisAgent需要使用数据调度,除了AntJob,我们还需要从nuget引用AntJob.Extensions。

这一次,我们来实时消费病人数据,为其生成医嘱,高仿数据清洗过程。

新增生成医嘱的作业处理器 BuildWill

using System;
using AntJob;
using HisData;
using NewLife.Security;
using XCode;
namespace HisAgent
{
    class BuildWill : DataHandler
    {
        public BuildWill()
        {
            var job = Job;
            job.Start = DateTime.Today;
            job.Step = 30;
        }
        public override Boolean Start()
        {
            // 指定要抽取数据的实体类以及时间字段
            Factory = ZYBH0.Meta.Factory;
            Field = ZYBH0._.CreateTime;
            return base.Start();
        }
        protected override Boolean ProcessItem(JobContext ctx, IEntity entity)
        {
            var pi = entity as ZYBH0;
            // 创建医嘱信息
            var will = new ZYBHYZ0
            {
                Bhid = pi.Bhid,
                Mgroupid = Rand.Next(9999),
                Kyzrq = pi.Ryrq.AddHours(1),
                Tyzrq = pi.Cyrq.AddHours(-3),
                Kyzys = Rand.NextString(8),
                State = pi.State,
            };
            will.Insert();
            return true;
        }
    }
}

数据调度的处理器基类是DataHandler,并且需要在Start之前指定实体工厂以及时间字段。调度系统将会从该表抽取数据,根据调度中心分派的时间区间(StartTime+EndTime),对时间字段进行查询。

处理函数ProcessItem就是业务核心代码了,也可以重写Execute,实现批量处理。

从今天零点开始消费处理数据,步进30秒,也就是每次抽取30秒的数据来分析处理。

不要忘了在Main中把该处理器加入调度器。

跑起来,去控制台启用作业

可以看到,在(00:30:00, 00:30:30)区间内,得到4个病人,创建了4个医嘱。

运行HisWeb查看数据

消息调度

设计概要

计算型应用(继承Handler)

系统架构

调度中心主从架构

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部