文档章节

kettle增量抽取模型

马进举
 马进举
发布于 2018/07/14 18:58
字数 1614
阅读 1185
收藏 2

前言

在日常数据流转中,最常见的场景就是增量数据抽取,一个简单好用得增量抽取模型可以事半功倍。

为此我根据实际工作经验,设计出了一个比较通用的增量抽取模型,基于此模型进行增量抽取将只需要关注核心业务需求,通用的增量标记记录读取、日志记录等都被封装在公用作业中,一般不需要关心这些通用部分,有问题也可以做到修改一个地方全局都使用。


资源库规划

资源库文件夹要规划好,杂乱的目录结构让人心烦,最关键是如果目录过多将导致作业转换加载变慢,严重影响后续作业的运行。

以下就是我初创的资源库目录。

资源库目录

  1. test:测试的作业转换目录。
  2. common:公用作业转换,如写日志的转换等。
  3. template:一些模板级的作业转换,一些常用的场景都在此目录做一个样例供参考。
  4. other:一些无法归类或比较少出现的作业转换放在此目录。

增量作业体验

接到一个增量抽取需求后,我们只需要将如下作业和转换另存一份,稍加修改即可。

增量作业

增量转换

另存的作业,一般情况我们只需要修改作业名称、描述、目录即可。

另存的转换,我们需要参考模板更换数据库连接、sql、增量字段、目标表字段映射即可。

如上一个增量抽取就完成,但还没讲公用的作业,所有很不太理解,请往下看。该模型的样例在0.4.0的急速体验版中已经存在,可以参考。

增量公用作业

如下就是增量公用作业的流程图。

增量公用作业

接下来,我们依次介绍作业各组件的作用。

START

开始控件是每个作业必须要有的,但这里开始控件没有定时作用,因为作业的定时我们采用在此作业外在包一层作业实现。

作业初始化:

如下就是作业初始化转换流程图。

作业初始化

获取系统信息

如下使用获取系统信息控件主要用于获取作业开始运行的时间,用于记录作业日志使用。

获取系统信息

js代码

js代码

js代码主要获取一些变量,为后续增量转换、日志记录做准备。

//Script here
//使用我的一个工具类方法获取根作业。
var rootJob=Packages.cn.benma666.kettle.mytuils.KettleUtils.getRootJob(_step_);
//获取根作业的id
var ID_JOB=rootJob.getObjectId().getId();
//获取根作业的名称
var JOB_NAME=rootJob.getObjectName();
//获取增量转换的路径
var ZLZH_PATH = getVariable("ZLZH_PATH",null);
//如果增量转换路径的变量为空,则自动设置根作业的路径为增量转换的路径
if(ZLZH_PATH==null||ZLZH_PATH==""){
	ZLZH_PATH = rootJob.getRepositoryDirectory().getPath();
}

变量设置

变量设置

获取增量时间戳

获取增量时间戳

获取增量时间戳参数

获取增量时间戳参数

如下是在公用作业中引入该转换时设置的,这很重要,不然父级控件无法覆盖子控件的变量。

表输入

从资源库的R_JOB表读取该作业的增量时间戳,这个字段是我添加,我认为放在此处最合适,后续结合我的kettle-manager调度平台一起使用,可以在调度平台中修改此值,便于自行调整增量抽取的起点。

获取变量

js代码

//Script here
//如果作业设置的开始时间戳小于默认开始时间戳则使用默认开始时间戳
if(RUN_START<DEF_RUN_START){
	RUN_START=DEF_RUN_START;
}
//如果增量时间戳小于作业设置的开始时间戳则使用作业设置的开始时间戳
if(ZLSJC<RUN_START){
	ZLSJC = RUN_START;
}

设置变量

增量转换

在公用作业中引用该转换时需要设置如下参数,已达到覆盖子控件相关变量的目的。

增量转换需要参考如下设置,根据需要设置,用于后续数据量统计。

表输入

在表输入中直接使用前面步骤设置的变量ZLSJ(增量时间戳),即可实现增量读取数据的目的。

select t.*,last_update ZLCQZD FROM r_job t where t.last_update<to_char(sysdate,'yyyymmddhh24miss') and t.last_update>'${ZLSJC}'

获取最大时间戳

将增量查询出来的数据中的增量时间戳的最大值设置到NEW_ZLSJC变量中,用于记录日志,下次从此时间开始读取数据。

if(ZLCQZD>getVariable("NEW_ZLSJC","")){
	setVariable("NEW_ZLSJC",LAST_UPDATE,"r");
}

表输出

这里样例是表输出,你可以是txt输出、excel输出等,将增量查询出来的数据入到你想要使用的目的地。

统计数据量

获取上一步骤读取的数据量等信息,用于记录日志,便于日后统计作业运行情况。

//获取数据量
var input = previous_result.getNrLinesInput();
var written = previous_result.getNrLinesWritten();
var output = previous_result.getNrLinesOutput();
var updated = previous_result.getNrLinesUpdated();

//设置变量
parent_job.setVariable('input',input);
parent_job.setVariable('written',written);
parent_job.setVariable('output',output);
parent_job.setVariable('updated',updated);
parent_job.setVariable('result','success');

//正常结束
true;

写日志

公用作业引用写日志转换时做如下设置,用于覆盖子控件的变量。

获取系统信息

获取作业运行结束时间

获取变量

获取前面步骤设置的各种变量

java代码过滤

过滤掉空转的日志,即如果这次没有读取到数据则只打印日志,不更新日志表。

写日志

打印日志

日志处理

时间格式化等

//Script here
//时间格式化
END_TIME = date2str(END_TIME,"yyyyMMddHHmmss");
START_TIME = date2str(START_TIME,"yyyyMMddHHmmss");
//如果新的增量时间戳为空则使用上次的时间戳
if(NEW_ZLSJC==null||NEW_ZLSJC==""){
	NEW_ZLSJC = ZLSJC;
}

更新

更新最新的时间戳到R_JOB表中,便于下次读取该值进行增量抽取。

表输出

输出本次运行日志到业务日志表中,详细记录了本次作业运行的开始结束时间、抽取数据量等信息。

成功

这个是作业结束的标志。

请回头重新看增量作业体验。

© 著作权归作者所有

共有 人打赏支持
下一篇: KettleEasyExpand
马进举
粉丝 36
博文 6
码字总数 7987
作品 1
遂宁
私信 提问
加载中

评论(3)

w
wt25
您好,请问有kettle-manager 0.4.0的源码吗?预览转换图和作业图怎么实现的,可否帮助一下。
马进举
马进举

引用来自“wstwinner”的评论

能否分享下源码,获得Job名称的 方法封装成jar包了吗,单独运行那个转换不识别。

@wstwinner 新版调度中有这个模型
w
wstwinner
能否分享下源码,获得Job名称的 方法封装成jar包了吗,单独运行那个转换不识别。
用Kettle增量抽取数据,在不改变源表结构的情况下,有没有比较快的方案?

用Kettle增量抽取数据,在不改变源表结构的情况下,有没有比较快的方案? 源表和目标表上加时间戳的方法不是很适合现在要做的东西,尝试过每次都将目标表清空再插入的方法,效率的确快了很多...

程序员YB
2013/01/31
5.9K
3
kettle下转移mongo中数据到mysql中

版权声明:本文为博主原创文章,未经博主允许不得转载。请注明博客地址(http://blog.csdn.net/gsying1474) https://blog.csdn.net/gsying1474/article/details/54140108 Kettle是一款国外开...

刘迎光-萤火虫工作室
2017/01/06
0
0
kettle 数据抽取

问题是这样的 : 有一张表 source_tab ,有时间字段,我要抽取张表里的数据,程序有定时任务调用kettle 定时抽取这张表,要求抽取过的数据就不能再次被抽取了,前提是source_tab表不能改变表...

南北024
2011/10/20
4.3K
9
如何通过TASKCTL工具实现kettle作业的多次循环执行?

我有一个需求需要从其他系统定时抽取数据,而且这个数据需要抽取多次,我ETL用的kettle,调度用的免费版的taskctl,有谁知道taskctl怎么配置循环调用kettle吗?

老衲大海
2017/08/14
191
1
Kettle 8.1 RC 发布,开源 ETL 工具

Kettle 8.1 RC 发布了,下载地址: https://github.com/pentaho/pentaho-kettle/releases/tag/8.1.0.0-RC 改进记录请看: https://github.com/pentaho/pentaho-kettle/compare/8.1.0.0-RC........

红薯
2017/12/14
4.6K
8

没有更多内容

加载失败,请刷新页面

加载更多

Linux Wireshark普通用户启动使用方案

当系统安装好Wireshark后请正常启动是否可以进行正常使用,如果不行请参考下列指导 向系统添加一个用户组 sudo groupadd wireshark //如提示此组存在可跳过 将指定用户添加到这个组中 sudo...

CHONGCHEN
今天
1
0
CSS 选择器参考手册

CSS 选择器参考手册 选择器 描述 [attribute] 用于选取带有指定属性的元素。 [attribute=value] 用于选取带有指定属性和值的元素。 [attribute~=value] 用于选取属性值中包含指定词汇的元素。...

Jack088
今天
2
0
数据库篇一

数据库篇 第1章 数据库介绍 1.1 数据库概述  什么是数据库(DB:DataBase) 数据库就是存储数据的仓库,其本质是一个文件系统,数据按照特定的格式将数据存储起来,用户可以对数据库中的数据...

stars永恒
今天
4
0
Intellij IDEA中设置了jsp页面,但是在访问页面时却提示404

在Intellij IDEA中设置了spring boot的jsp页面,但是在访问时,却出现404,Not Found,经过查找资料后解决,步骤如下: 在Run/Debug Configurations面板中设置该程序的Working Directory选项...

uknow8692
昨天
4
0
day24:文档第五行增内容|每月1号压缩/etc/目录|过滤文本重复次数多的10个单词|人员分组|

1、在文本文档1.txt里第五行下面增加如下内容;两个方法; # This is a test file.# Test insert line into this file. 分析:给文档后增加内容,可以用sed 来搞定;也可以用while do done...

芬野de博客
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部