文档章节

使用Kettle进行数据库迁移操作

影非弦
 影非弦
发布于 2017/07/24 15:51
字数 2356
阅读 21
收藏 0

思路:先从源库读取需要迁移的表名,将表明作为变量,并在目标库创建这些表,最后表到表抽取数据

整个迁移流程包括2个job,4个transform。

具体如下:

1.总的Job:

2.获取表名流程的转换:

3.子job表数据抽取作业

3.1转换 表名变量设置

 

3.2转换 创建表结构

表输入如下,注意:这里的查询必须要查询出数据才行,不然后面会无法获取表结构来创建表

(有些人说这里加个条件where 1=2,不需要数据只需要表结构,我自己测试,发现这样无法获取到数据,后面的java脚本中可以打印这些信息,加了条件会报错,打印信息为null)

java脚本:


    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    {
    	// First, get a row from the default input hop
    	//
    	Object[] r = getRow();
		//logBasic("r.size="+r.length);
   		org.pentaho.di.core.database.DatabaseMeta dbmeta = null;
    	java.util.List list = getTrans().getRepository().readDatabases();//3.x中获取资源库的所有数据库连接信息用getDatabases();
    	if(list != null && !list.isEmpty())
    	{
    		for(int i=0;i<list.size();i++)
    		{
    			dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i);
    			//下面是目标库的数据库连接,大家可根据需要修改
    			if("testb".equalsIgnoreCase(dbmeta.getName()))
    			{
					logBasic("数据库连接名为:"+dbmeta.getName());
    				break;
    			}
    		}
    	}
    	if(dbmeta!=null)
    	{
    		org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta);
    		try
    		{
    			db.connect();
    			String tablename = getVariable("TABLENAME");
    			logBasic("开始创建表:" + tablename);
    			if(tablename!=null && tablename.trim().length()>0)
    			{
					logBasic("data.inputRowMeta="+data.inputRowMeta);
    				String sql = db.getDDL(tablename, data.inputRowMeta);//${TABLENAME}
					logBasic("sql="+sql);
    				db.execStatement(sql.replace(";", ""));
    				logBasic(sql);
    			}
    		}catch(Exception e){
    			logError("创建表出现异常",e);
    		}finally{
    			db.disconnect();
    		}
    	}
    	return false;
	} 

3.3数据抽取

运行结果:

2017/07/24 15:16:25 - Spoon - 正在开始任务...
2017/07/24 15:16:25 - 数据库迁移作业 - 开始执行任务
2017/07/24 15:16:25 - 数据库迁移作业 - 开始项[获取表名流程]
2017/07/24 15:16:25 - 源表名称获取 - 为了转换解除补丁开始  [源表名称获取]
2017/07/24 15:16:25 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:25 - 表输入.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2017/07/24 15:16:25 - 字段选择.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 复制记录到结果.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2017/07/24 15:16:25 - 数据库迁移作业 - 开始项[表数据抽取作业]
2017/07/24 15:16:25 - 表数据抽取子作业 - 开始项[表明成变量设置]
2017/07/24 15:16:25 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:25 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:25 - 设置变量.0 - Set variable TABLENAME to value [TESTM]
2017/07/24 15:16:25 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:25 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:25 - 表数据抽取子作业 - 开始项[创建表结构]
2017/07/24 15:16:25 - 创建表结构 - 为了转换解除补丁开始  [创建表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 创建入库表结构.0 - 数据库连接名为:testb
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 创建入库表结构.0 - 开始创建表:TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 创建入库表结构.0 - sql=CREATE TABLE TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - CREATE TABLE TESTM
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[数据抽取] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[创建表结构] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 完成作业项[表明成变量设置] (结果=[true])
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[表明成变量设置]
2017/07/24 15:16:26 - 将记录中的表名设置为变量 - 为了转换解除补丁开始  [将记录中的表名设置为变量]
2017/07/24 15:16:26 - 从结果获取记录.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 设置变量.0 - Setting environment variables...
2017/07/24 15:16:26 - 设置变量.0 - Set variable TABLENAME to value [TESTN]
2017/07/24 15:16:26 - 设置变量.0 - Finished after 1 rows.
2017/07/24 15:16:26 - 设置变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[创建表结构]
2017/07/24 15:16:26 - 创建表结构 - 为了转换解除补丁开始  [创建表结构]
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=1, O=0, R=0, W=1, U=0, E=0)
2017/07/24 15:16:26 - 创建入库表结构.0 - 数据库连接名为:testb
2017/07/24 15:16:26 - 创建入库表结构.0 - 开始创建表:TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - data.inputRowMeta=[ID String(10)], [NAME String(200)], [AGE BigNumber], [ADDRESS String(200)]
2017/07/24 15:16:26 - 创建入库表结构.0 - sql=CREATE TABLE TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - CREATE TABLE TESTN
2017/07/24 15:16:26 - 创建入库表结构.0 - (
2017/07/24 15:16:26 - 创建入库表结构.0 -   ID VARCHAR2(10)
2017/07/24 15:16:26 - 创建入库表结构.0 - , NAME VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - , AGE NUMBER
2017/07/24 15:16:26 - 创建入库表结构.0 - , ADDRESS VARCHAR2(200)
2017/07/24 15:16:26 - 创建入库表结构.0 - )
2017/07/24 15:16:26 - 创建入库表结构.0 - ;
2017/07/24 15:16:26 - 创建入库表结构.0 - 完成处理 (I=0, O=0, R=1, W=0, U=0, E=0)
2017/07/24 15:16:26 - 表数据抽取子作业 - 开始项[数据抽取]
2017/07/24 15:16:26 - 数据迁移 - 为了转换解除补丁开始  [数据迁移]
2017/07/24 15:16:26 - 表输出.0 - Connected to database [testb] (commit=1000)
2017/07/24 15:16:26 - 表输入.0 - Finished reading query, closing connection.
2017/07/24 15:16:26 - 表输入.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表输出.0 - 完成处理 (I=0, O=3, R=3, W=3, U=0, E=0)
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[数据抽取] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[创建表结构] (结果=[true])
2017/07/24 15:16:27 - 表数据抽取子作业 - 完成作业项[表明成变量设置] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 开始项[成功]
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[成功] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[表数据抽取作业] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 完成作业项[获取表名流程] (结果=[true])
2017/07/24 15:16:27 - 数据库迁移作业 - 任务执行完毕
2017/07/24 15:16:27 - Spoon - 任务已经结束.

 

© 著作权归作者所有

影非弦
粉丝 6
博文 5
码字总数 3621
作品 0
海淀
程序员
私信 提问
Kettle实现数据库迁移

需求: 做数据仓库时,需要将业务系统CRM抽取到数据仓库的缓冲层,业务系统使用的是SqlServer数据库,数据仓库的缓冲层使用的是mysql数据库,为实现数据库的迁移,即将SqlServer数据库中的所有...

Zero零_度
2016/11/16
160
0
使用Kettle导入Excel数据

使用Kettle导入Excel数据 曾静的技术博客2017-08-2022 阅读 导入数据kettleExcel ETL(Extraction, Transformation, and Loading),在日常的工作中我们经常会遇到各种数据的处理,转换,迁移...

曾静的技术博客
2017/08/20
0
0
数据迁移实战:基于Kettle的Mysql到DB2的数据迁移

一、什么是ETL ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对...

十月阳光
2015/11/04
3.1K
2
使用Kettle导入数据到ADB for PostgreSQL

Kettle简介 Kettle(现也称为Pentaho Data Integration,简称PDI)是一款非常受欢迎的开源ETL工具软件,主要用于数据整合、转换和迁移。Kettle除了支持各种关系型数据库,HBase MongoDB这样的N...

阿里云云栖社区
05/07
352
0
ETL利器Kettle实战应用解析系列一【Kettle使用介绍】

一、ETL利器Kettle实战应用解析系列一【Kettle使用介绍】 二、ETL利器Kettle实战应用解析系列二 【应用场景和实战DEMO下载】 三、ETL利器Kettle实战应用解析系列三 【ETL后台进程执行配置方式...

李丁玲
2016/03/04
426
0

没有更多内容

加载失败,请刷新页面

加载更多

分布式协调服务zookeeper

ps.本文为《从Paxos到Zookeeper 分布式一致性原理与实践》笔记之一 ZooKeeper ZooKeeper曾是Apache Hadoop的一个子项目,是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它...

ls_cherish
今天
4
0
redis 学习2

网站 启动 服务端 启动redis 服务端 在redis 安装目录下 src 里面 ./redis-server & 可以指定 配置文件或者端口 客户端 在 redis 的安装目录里面的 src 里面 ./redis-cli 可以指定 指定 连接...

之渊
昨天
2
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
昨天
4
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
昨天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
昨天
24
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部