文档章节

使用Kettle进行数据库迁移操作

影非弦
 影非弦
发布于 2017/07/24 15:51
字数 2356
阅读 18
收藏 0

思路:先从源库读取需要迁移的表名,将表明作为变量,并在目标库创建这些表,最后表到表抽取数据

整个迁移流程包括2个job,4个transform。

具体如下:

1.总的Job:

2.获取表名流程的转换:

3.子job表数据抽取作业

3.1转换 表名变量设置

 

3.2转换 创建表结构

表输入如下,注意:这里的查询必须要查询出数据才行,不然后面会无法获取表结构来创建表

(有些人说这里加个条件where 1=2,不需要数据只需要表结构,我自己测试,发现这样无法获取到数据,后面的java脚本中可以打印这些信息,加了条件会报错,打印信息为null)

java脚本:


    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    {
    	// First, get a row from the default input hop
    	//
    	Object[] r = getRow();
		//logBasic("r.size="+r.length);
   		org.pentaho.di.core.database.DatabaseMeta dbmeta = null;
    	java.util.List list = getTrans().getRepository().readDatabases();//3.x中获取资源库的所有数据库连接信息用getDatabases();
    	if(list != null && !list.isEmpty())
    	{
    		for(int i=0;i<list.size();i++)
    		{
    			dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i);
    			//下面是目标库的数据库连接,大家可根据需要修改
    			if("testb".equalsIgnoreCase(dbmeta.getName()))
    			{
					logBasic("数据库连接名为:"+dbmeta.getName());
    				break;
    			}
    		}
    	}
    	if(dbmeta!=null)
    	{
    		org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta);
    		try
    		{
    			db.connect();
    			String tablename = getVariable("TABLENAME");
    			logBasic("开始创建表:" + tablename);
    			if(tablename!=null && tablename.trim().length()>0)
    			{
					logBasic("data.inputRowMeta="+data.inputRowMeta);
    				String sql = db.getDDL(tablename, data.inputRowMeta);//${TABLENAME}
					logBasic("sql="+sql);
    				db.execStatement(sql.replace(";", ""));
    				logBasic(sql);
    			}
    		}catch(Exception e){
    			logError("创建表出现异常",e);
    		}finally{
    			db.disconnect();
    		}
    	}
    	return false;
	} 

3.3数据抽取

运行结果:

2017/07/24 15:16:25 - Spoon - 正在开始任务...
2017/07/24 15:16:25 - 数据库迁移作业 - 开始执行任务
2017/07/24 15:16:25 - 数据库迁移作业 - 开始项[获取表名流程]
2017/07/24 15:16:25 - 源表名称获取 - 为了转换解除补丁开始  [源表名称获取]
2017/07/24 15:16:25 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:25 - 表输入.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2017/07/24 15:16:25 - 字段选择.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 复制记录到结果.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 数据库迁移作业 - 开始项[表数据抽取作业]
2017/07/24 15:16:25 - 表数据抽取子作业 - 开始项[表明成变量设置]
2017/07/24 15:16:25 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:25 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:25 - 设置变量.0 - Set variable TABLENAME to value [TESTM]
2017/07/24 15:16:25 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:25 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 表数据抽取子作业 - 开始项[创建表结构]
2017/07/24 15:16:25 - 创建表结构 - 为了转换解除补丁开始  [创建表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 创建入库表结构.0 - 数据库连接名为:testb
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 创建入库表结构.0 - 开始创建表:TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 创建入库表结构.0 - sql=CREATE TABLE TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - CREATE TABLE TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[数据抽取] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[创建表结构] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[表明成变量设置] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[表明成变量设置]
2017/07/24 15:16:26 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:26 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:26 - 设置变量.0 - Set variable TABLENAME to value [TESTN]
2017/07/24 15:16:26 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:26 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[创建表结构]
2017/07/24 15:16:26 - 创建表结构 - 为了转换解除补丁开始  [创建表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 创建入库表结构.0 - 数据库连接名为:testb
2017/07/24 15:16:26 - 创建入库表结构.0 - 开始创建表:TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 创建入库表结构.0 - sql=CREATE TABLE TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - CREATE TABLE TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[数据抽取] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[创建表结构] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[表明成变量设置] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 开始项[成功]
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[成功] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[表数据抽取作业] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[获取表名流程] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 任务执行完毕
2017/07/24 15:16:27 - Spoon - 任务已经结束.

 

© 著作权归作者所有

共有 人打赏支持
影非弦
粉丝 4
博文 5
码字总数 3621
作品 0
海淀
程序员
Kettle实现数据库迁移

需求: 做数据仓库时,需要将业务系统CRM抽取到数据仓库的缓冲层,业务系统使用的是SqlServer数据库,数据仓库的缓冲层使用的是mysql数据库,为实现数据库的迁移,即将SqlServer数据库中的所有...

Zero零_度
2016/11/16
40
0
数据迁移实战:基于Kettle的Mysql到DB2的数据迁移

一、什么是ETL ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对...

十月阳光
2015/11/04
0
2
使用Kettle导入Excel数据

使用Kettle导入Excel数据 曾静的技术博客2017-08-2022 阅读 导入数据kettleExcel ETL(Extraction, Transformation, and Loading),在日常的工作中我们经常会遇到各种数据的处理,转换,迁移...

曾静的技术博客
2017/08/20
0
0
数据迁移,急用,请高手指教

通过kettle实现数据库的迁移后,能否使用kettle检查迁移后的数据与源数据库的数据内容相同?

robert.feng
2011/12/12
747
4
ETL利器Kettle实战应用解析系列一【Kettle使用介绍】

一、ETL利器Kettle实战应用解析系列一【Kettle使用介绍】 二、ETL利器Kettle实战应用解析系列二 【应用场景和实战DEMO下载】 三、ETL利器Kettle实战应用解析系列三 【ETL后台进程执行配置方式...

李丁玲
2016/03/04
133
0

没有更多内容

加载失败,请刷新页面

加载更多

Bash各类扩展详解

Bash各类扩展详解 Bash中主要包括大括号扩展、波浪号扩展、变量扩展、子命令扩展、文件名扩展和算数扩展。这些扩展组合在一起为Bash带来了极大的易用性。掌握这些扩展的用法和功能,能够为B...

小陶小陶
46分钟前
1
0
EventBus原理深度解析

一、问题描述 在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于Event...

yangjianzhou
今天
5
0
OpenCV图像处理实例:libuv+cvui显示摄像头视频

#include <iostream>#include <opencv2/opencv.hpp>#define CVUI_IMPLEMENTATION#include <cvui.h>extern "C"{#include <uv.h>}using namespace std;#define WINDOW_NAM......

IOTService
今天
3
0
openJDK之JDK9的String

1.openJDK8的String 先来看下openJDK8的String的底层,如下图1.1所示: 图1.1 底层上使用的是char[],即char数组 每个char占16个bit,Character.SIZE的值是16。 2.openJDK9中的String 图2.1...

克虏伯
今天
3
0
UEFI 模式下如何安装 Ubuntu 16.04

作者:知乎用户 链接:https://www.zhihu.com/question/52092661/answer/259583475 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 针对UEFI模式下安装U...

寻知者
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部