文档章节

kettle增量抽取模型

马进举
 马进举
发布于 07/14 18:58
字数 1614
阅读 687
收藏 0

前言

在日常数据流转中,最常见的场景就是增量数据抽取,一个简单好用得增量抽取模型可以事半功倍。

为此我根据实际工作经验,设计出了一个比较通用的增量抽取模型,基于此模型进行增量抽取将只需要关注核心业务需求,通用的增量标记记录读取、日志记录等都被封装在公用作业中,一般不需要关心这些通用部分,有问题也可以做到修改一个地方全局都使用。


资源库规划

资源库文件夹要规划好,杂乱的目录结构让人心烦,最关键是如果目录过多将导致作业转换加载变慢,严重影响后续作业的运行。

以下就是我初创的资源库目录。

资源库目录

  1. test:测试的作业转换目录。
  2. common:公用作业转换,如写日志的转换等。
  3. template:一些模板级的作业转换,一些常用的场景都在此目录做一个样例供参考。
  4. other:一些无法归类或比较少出现的作业转换放在此目录。

增量作业体验

接到一个增量抽取需求后,我们只需要将如下作业和转换另存一份,稍加修改即可。

增量作业

增量转换

另存的作业,一般情况我们只需要修改作业名称、描述、目录即可。

另存的转换,我们需要参考模板更换数据库连接、sql、增量字段、目标表字段映射即可。

如上一个增量抽取就完成,但还没讲公用的作业,所有很不太理解,请往下看。该模型的样例在0.4.0的急速体验版中已经存在,可以参考。

增量公用作业

如下就是增量公用作业的流程图。

增量公用作业

接下来,我们依次介绍作业各组件的作用。

START

开始控件是每个作业必须要有的,但这里开始控件没有定时作用,因为作业的定时我们采用在此作业外在包一层作业实现。

作业初始化:

如下就是作业初始化转换流程图。

作业初始化

获取系统信息

如下使用获取系统信息控件主要用于获取作业开始运行的时间,用于记录作业日志使用。

获取系统信息

js代码

js代码

js代码主要获取一些变量,为后续增量转换、日志记录做准备。

//Script here
//使用我的一个工具类方法获取根作业。
var rootJob=Packages.cn.benma666.kettle.mytuils.KettleUtils.getRootJob(_step_);
//获取根作业的id
var ID_JOB=rootJob.getObjectId().getId();
//获取根作业的名称
var JOB_NAME=rootJob.getObjectName();
//获取增量转换的路径
var ZLZH_PATH = getVariable("ZLZH_PATH",null);
//如果增量转换路径的变量为空,则自动设置根作业的路径为增量转换的路径
if(ZLZH_PATH==null||ZLZH_PATH==""){
	ZLZH_PATH = rootJob.getRepositoryDirectory().getPath();
}

变量设置

变量设置

获取增量时间戳

获取增量时间戳

获取增量时间戳参数

获取增量时间戳参数

如下是在公用作业中引入该转换时设置的,这很重要,不然父级控件无法覆盖子控件的变量。

表输入

从资源库的R_JOB表读取该作业的增量时间戳,这个字段是我添加,我认为放在此处最合适,后续结合我的kettle-manager调度平台一起使用,可以在调度平台中修改此值,便于自行调整增量抽取的起点。

获取变量

js代码

//Script here
//如果作业设置的开始时间戳小于默认开始时间戳则使用默认开始时间戳
if(RUN_START<DEF_RUN_START){
	RUN_START=DEF_RUN_START;
}
//如果增量时间戳小于作业设置的开始时间戳则使用作业设置的开始时间戳
if(ZLSJC<RUN_START){
	ZLSJC = RUN_START;
}

设置变量

增量转换

在公用作业中引用该转换时需要设置如下参数,已达到覆盖子控件相关变量的目的。

增量转换需要参考如下设置,根据需要设置,用于后续数据量统计。

表输入

在表输入中直接使用前面步骤设置的变量ZLSJ(增量时间戳),即可实现增量读取数据的目的。

select t.*,last_update ZLCQZD FROM r_job t where t.last_update<to_char(sysdate,'yyyymmddhh24miss') and t.last_update>'${ZLSJC}'

获取最大时间戳

将增量查询出来的数据中的增量时间戳的最大值设置到NEW_ZLSJC变量中,用于记录日志,下次从此时间开始读取数据。

if(ZLCQZD>getVariable("NEW_ZLSJC","")){
	setVariable("NEW_ZLSJC",LAST_UPDATE,"r");
}

表输出

这里样例是表输出,你可以是txt输出、excel输出等,将增量查询出来的数据入到你想要使用的目的地。

统计数据量

获取上一步骤读取的数据量等信息,用于记录日志,便于日后统计作业运行情况。

//获取数据量
var input = previous_result.getNrLinesInput();
var written = previous_result.getNrLinesWritten();
var output = previous_result.getNrLinesOutput();
var updated = previous_result.getNrLinesUpdated();

//设置变量
parent_job.setVariable('input',input);
parent_job.setVariable('written',written);
parent_job.setVariable('output',output);
parent_job.setVariable('updated',updated);
parent_job.setVariable('result','success');

//正常结束
true;

写日志

公用作业引用写日志转换时做如下设置,用于覆盖子控件的变量。

获取系统信息

获取作业运行结束时间

获取变量

获取前面步骤设置的各种变量

java代码过滤

过滤掉空转的日志,即如果这次没有读取到数据则只打印日志,不更新日志表。

写日志

打印日志

日志处理

时间格式化等

//Script here
//时间格式化
END_TIME = date2str(END_TIME,"yyyyMMddHHmmss");
START_TIME = date2str(START_TIME,"yyyyMMddHHmmss");
//如果新的增量时间戳为空则使用上次的时间戳
if(NEW_ZLSJC==null||NEW_ZLSJC==""){
	NEW_ZLSJC = ZLSJC;
}

更新

更新最新的时间戳到R_JOB表中,便于下次读取该值进行增量抽取。

表输出

输出本次运行日志到业务日志表中,详细记录了本次作业运行的开始结束时间、抽取数据量等信息。

成功

这个是作业结束的标志。

请回头重新看增量作业体验。

© 著作权归作者所有

共有 人打赏支持
下一篇: KettleEasyExpand
马进举
粉丝 34
博文 6
码字总数 7987
作品 1
遂宁
私信 提问
加载中

评论(2)

马进举
马进举

引用来自“wstwinner”的评论

能否分享下源码,获得Job名称的 方法封装成jar包了吗,单独运行那个转换不识别。

@wstwinner 新版调度中有这个模型
w
wstwinner
能否分享下源码,获得Job名称的 方法封装成jar包了吗,单独运行那个转换不识别。
用Kettle增量抽取数据,在不改变源表结构的情况下,有没有比较快的方案?

用Kettle增量抽取数据,在不改变源表结构的情况下,有没有比较快的方案? 源表和目标表上加时间戳的方法不是很适合现在要做的东西,尝试过每次都将目标表清空再插入的方法,效率的确快了很多...

程序员YB
2013/01/31
5.3K
3
kettle下转移mongo中数据到mysql中

版权声明:本文为博主原创文章,未经博主允许不得转载。请注明博客地址(http://blog.csdn.net/gsying1474) https://blog.csdn.net/gsying1474/article/details/54140108 Kettle是一款国外开...

刘迎光-萤火虫工作室
2017/01/06
0
0
kettle 数据抽取

问题是这样的 : 有一张表 sourcetab ,有时间字段,我要抽取张表里的数据,程序有定时任务调用kettle 定时抽取这张表,要求抽取过的数据就不能再次被抽取了,前提是sourcetab表不能改变表结...

南北024
2011/10/20
3.9K
9
Kettle 8.1 RC 发布,开源 ETL 工具

Kettle 8.1 RC 发布了,下载地址: src="https://github.com/pentaho/pentaho-kettle/releases/tag/8.1.0.0-RC">https://github.com/pentaho/pentaho-kettle/releases/tag/8.1.0.0-RC 改进记......

红薯
2017/12/14
1K
7
Kettle实现数据库迁移

需求: 做数据仓库时,需要将业务系统CRM抽取到数据仓库的缓冲层,业务系统使用的是SqlServer数据库,数据仓库的缓冲层使用的是mysql数据库,为实现数据库的迁移,即将SqlServer数据库中的所有...

Zero零_度
2016/11/16
40
0

没有更多内容

加载失败,请刷新页面

加载更多

聊聊storm的ICommitterTridentSpout

序 本文主要研究一下storm的ICommitterTridentSpout ICommitterTridentSpout storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSpout.java public interface......

go4it
29分钟前
1
0
Ubuntu常用操作

查看端口号 netstat -anp |grep 端口号 查看已使用端口情况 netstat -nultp(此处不用加端口号) netstat -anp |grep 82查看82端口的使用情况 查找被占用的端口: netstat -tln netstat -tl...

hc321
昨天
1
0
网站cdn的静态资源突然访问变的缓慢,问题排查流程

1.首先我查看了一下是否自己的网络问题,通过对比其他资源的访问速度和下载速度,确认不是 2.通过ping 和 tracert 判断cdn域名能否正常访问,(最后回想感觉这一步可以省略,因为每次最终能访...

小海bug
昨天
3
0
Mybatis 学习笔记四 MyBatis-Plus插件

Mybatis 学习笔记四 MyBatis-Plus插件 maven依赖 <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus</artifactId> <ve......

晨猫
昨天
5
0
小白带你认识netty(二)之netty服务端启动(下)

承接上一篇小白带你认识netty(二)之netty服务端启动(上),还剩下两步骤:3、注册Selector:将Channel注册到Selector上 和 4、端口的绑定:服务端端口的监听。 3、注册Selector:将Chann...

天空小小
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部