文档章节

新老DataHub迁移手册

_夜枫
 _夜枫
发布于 2017/06/22 14:21
字数 2620
阅读 21
收藏 0

原文链接

 

DataHub服务用户迁移文档

前言

 

原Odps版内测DataHub(下文统称为老DataHub服务),于2016年11月21日起已经处于维护状态,新版DataHub届时已经开启公测,公测至今已有半年以上时间,我们决定开始逐步下线老DataHub服务,老版部分用户需要迁移至新版DataHub。

新版本具有更多的特性,性能功能都有不少提升,可以同时支持数据一份数据同步到Odps、OSS、ElasticSearch等多个不同服务中,且提供WebConsole控制台进行更简单的操作。

准备工作

本文档针对使用Logstash、Fluentd、Flume以及使用SDK写入老DataHub服务的用户,提供迁移到新服务的指引,过程中遇到任何困难可以联系我们

 

dingtalk

新版DataHub相关文档

DataHub产品使用文档

DataHub控制台

创建新datahub project

新版DataHub中存在项目空间-Project概念,与Odps中Project类似,但是不等于Odps中的Project,为了方便管理,我们建议迁移时在DataHub中创建与Odps Project同名的Project(不同名称也可以)

  • 登录DataHub官网控制台,使用阿里云账号登录;
  • 点击创建Project,输入名称及描述,点击创建(Project描述中建议携带Project用处及Owner的邮箱或联系方式)

创建新DataHub topic

新版DataHub存在主题-Topic的概念,与Odps的Table类似,但是不等于Odps的Table,通常如果是需要导入数据到Odps的话,需要为每张表创建一个Topic,且字段类型、顺序与名称需要一致,Odps中的分区字段当做普通的Topic字段处理,新版DataHub会根据该分区字段再DataHub中的数据值,将数据同步到Odps离线表中。

例如:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
对应Topic应为如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)

创建Topic可以通过以下方式:

  • 若Topic数量较少,可以再WebConsole控制台,进入Project页面后点击创建Topic按钮,选择从MaxCompute导入,输入配置信息后勾选“自动创建DataConnector”,点击“导入表结构”即可导入odps表对应的格式,确认格式无误后选择Shard数量及生命周期, Shard数量建议与老服务一样,生命周期建议3天,点击创建即可。
  • 若Topic过多,可以使用迁移工具DataHub表结构迁移工具,工具将对列表中的所有表创建对应Topic及Connector。

DataHub与MaxCompute字段类型对应表

MaxCompute表中的类型 DataHub Topic中的类型
STRING STRING
DOUBLE DOUBLE
BIGINT BIGINT
DATETIME TIMESTAMP (注:以微秒为度量单位)
BOOLEAN BOOLEAN
DECIMAL 不支持
MAP 不支持
ARRAY 不支持

映射Odps分区

老DataHub在写入数据时需要直接指定分区,如果是通过fluend或logstash等插件写入的用户是需要配置分区信息或者通过某个时间字段转为固定格式作为分区

新版DataHub在这一行为上有所改变,Odps表的分区字段再DataHub中将会变成一个普通字段,后台Connector同步任务在同步数据到Odps表时会根据分区字段比如pt具体每条记录的值写入Odps对应分区中。

例如:

MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
对应Topic应为如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
数据1: ("test", "test", "0.14", "a1", "20170405")
数据2: ("test", "test", "0.14", "aa", "20170406")
则数据1将会同步到odps分区ds=a1,pt=20170405
则数据2将会同步到odps分区ds=a2,pt=20170406
  • 若使用插件导入,并且是通过字符串转换为固定格式的分区值的用户,新的插件需要使用fluentd/logstash的filter功能,对分区字段的值进行转换,具体使用方式可以参考这些开源工具的官方文档

不同类型接入方式迁移

使用Java SDK

需要换成新版本DataHub的SDK,Mvn依赖变化

原依赖

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>0.xxx</version>
</dependency>

新依赖

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.3.0-public</version>
</dependency>

Client初始化

原Client初始化步骤

Account account = new AliyunAccount(accessId, accessKey);
odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(odpsEndpoint);
DatahubClient client = new DatahubClient(odps, project, table, datahubEndpoint);
client.loadShard(shardNumber);
client.waitForShardLoad();

新Client初始化步骤

AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, datahubEndpoint);
DatahubClient client = new DatahubClient(conf);

获取Shard列表

原获取Shard列表及状态方式

HashMap<Long, DatahubClient.ShardState> shardStatus = client.getShardStatus();

新方式

ListShardResult listShardResult = client.listShard(projectName, topicName);

写入数据

原写入方式

DatahubWriter writer = client.openDatahubWriter(shardId);
TableSchema schema = client.getStreamSchema();
DatahubRecordPack recordPack = new DatahubRecordPack(schema);

/* Write another 20 records recordPack into another partition */
for (int i = 0; i < 20; i++) {
    Record record = makeRecord(schema);
    recordPack.append(record);
}

partSpec = "pt='20150809'";
packId = writer.write(new PartitionSpec(partSpec), recordPack)
    .getPackId();
System.out.println("record append to the pack: " + packId);

新写入方式

List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
    List<ErrorEntry> errors = result.getFailedRecordError();
    // deal with result.getFailedRecords()
}

完整写入新DataHub示例代码

使用Fluentd

通过Fluend插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

  • 更换,安装新插件包
  • 根据配置文件对比,修改现有配置文件
  • 使用新配置文件重新启动fluend进程

插件包更换

新版Fluentd插件使用文档

原安装语句

gem install fluent-plugin-aliyun-odps

新安装语句(也可按照新版文档提供的一键安装包安装logstash)

gem install fluent-plugin-datahub

配置对比

部分配置不需更改,更改match 部分配置即可。

老服务配置项 新服务配置项 备注
type type 需要从aliyun_odps改为dataHub
aliyun_access_id access_id 云账号accessid
aliyun_access_key access_key 云账号accesskey
aliyun_odps_hub_endpoint endpoint Datahub服务域名,需要改为新服务的域名
aliyun_odps_endpoint 不再需要
buffer_chunk_limit buffer_chunk_limit 不需要变化,但是新配置不能超过3MB
buffer_queue_limit buffer_queue_limit 不需要变化
flush_interval flush_interval 不需要变化
project project_name datahub的Project,非odps project
table topic_name datahub的topic,非odps table
fields column_names 指定需要采集的列
partition 不再需要
time_format 不再需要
shard_number 不再需要
enable_fast_crc 不再需要
retry_time retry_time 重试次数
retry_interval retry_interval 重试间隔
abandon_mode 不再需要

新增配置

新服务配置项 备注
dirty_data_continue true/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件
dirty_data_file 指定脏数据文件的位置
put_data_batch_size 每1000条record写一次DataHub
shard_id 指定shard_id写入,默认round-robin方式写入
shard_keys 指定用作分区key,用key值hash后作为写入shard的索引

[TODO] 能否放一个新老的diff文件example

使用Logstash

通过Logstash插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

  • 更换,安装新插件包
  • 根据配置文件对比,修改现有配置文件
  • 使用新配置文件重新启动Logstash进程

插件包更换

新版Logstash插件使用文档

配置对比

input部分配置不需更改,更改output部分配置即可。

老服务配置项 新服务配置项 备注
type type 需要从aliyun_odps改为dataHub
aliyun_access_id access_id 云账号accessid
aliyun_access_key access_key 云账号accesskey
aliyun_odps_hub_endpoint endpoint Datahub服务域名,需要改为新服务的域名
aliyun_odps_endpoint 不再需要
value_field 不再需要
project project_name datahub的Project,非odps project
table topic_name datahub的topic,非odps table
partition 不再需要
partition_time_format 不再需要
shard_number 不再需要
batch_size 通过logstash启动参数设置 logstash -f <上述配置文件地址> -b 256 (256即为每次batch大小)
batch_timeout 不再需要

新增配置

新服务配置项 备注
dirty_data_continue true/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件
dirty_data_file 指定脏数据文件的位置
put_data_batch_size 每1000条record写一次DataHub
shard_keys 数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keys和shard_id都未指定,默认轮询落shard
shard_id 所有数据落指定的shard,注意shard_keys和shard_id都未指定,默认轮询落shard
retry_times 重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1
retry_interval 下一次重试的间隔,单位为秒,默认值为5

使用Apache Flume

通过Flume工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

  • 更换,安装新Flume工具插件
  • 根据配置文件对比,修改现有配置文件
  • 使用新配置文件重新启动Flume进程

插件更新

新版Flume工具文档

配置对比

老服务配置项 新服务配置项 备注
a1.sinks.k1.type a1.sinks.k1.type 从com.aliyun.odps.flume.sink.OdpsSink改为com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.accessID a1.sinks.k1.datahub.accessID 云账号accessid
a1.sinks.k1.accessKey a1.sinks.k1.datahub.accessKey 云账号accesskey
a1.sinks.k1.odps.endPoint a1.sinks.k1.datahub.endPoint Datahub服务域名,需要改为新服务的域名
aliyun_odps_endpoint 不再需要
a1.sinks.k1.odps.project a1.sinks.k1.datahub.project datahub的Project,非odps project
a1.sinks.k1.odps.table a1.sinks.k1.datahub.topic datahub的topic,非odps table
a1.sinks.k1.odps.partition 不再需要
a1.sinks.k1.batchSize a1.sinks.k1.batchSize 批次大小
a1.sinks.k1.serializer a1.sinks.k1.serializer 无变化
a1.sinks.k1.serializer.delimiter a1.sinks.k1.serializer.delimiter 无变化
a1.sinks.k1.serializer.fieldnames a1.sinks.k1.serializer.fieldnames 无变化
a1.sinks.k1.serializer.charset a1.sinks.k1.serializer.charset 无变化
a1.sinks.k1.serializer.delimiter a1.sinks.k1.serializer.delimiter 无变化
a1.sinks.k1.shard.number 不再需要
a1.sinks.k1.shard.maxTimeOut a1.sinks.k1.shard.maxTimeOut 无变化
a1.sinks.k1.autoCreatePartition 不再需要

使用OGG

通过OGG工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤

  • 更换,安装新OGG工具插件
  • 根据配置文件对比,修改现有配置文件
  • 使用新配置文件重新启动OGG进程

插件更新

新版OGG工具文档

配置对比

老服务配置项 新服务配置项 备注
gg.handlerlist gg.handlerlist 不需修改,仍然为ggdatahub
gg.handler.ggdatahub.type gg.handler.ggdatahub.type 不需修改,仍然为com.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.classpath gg.classpath YOUR_DATAHUB_HANDLER_DIRECTORY/datahub_lib/*改为{YOUR_HOME}/datahub-ogg-plugin/lib/*

除以上配置外,其他DataHub相关配置均独立到configure.xml文件配置,具体含义请参看新版OGG工具文档

 

原文链接

 

本文转载自:http://click.aliyun.com/m/23857/  

_夜枫
粉丝 10
博文 506
码字总数 0
作品 0
朝阳
后端工程师
私信 提问
通过Datahub将本地的CSV文件导入Tablestore

前言 Tablestore是一款NoSQL多模型数据库,可提供海量结构化数据存储以及快速的查询和分析服务。如何将数据导入Tablestore,可以通过SDK/API、控制台、命令行工具直接写入、或者使用离线数据...

平苼
05/22
0
0
【大数据】odps数据迁移方式

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/binggetong/article/details/82762289 1、按迁移工具分类 一共分为2种:Tunnel批量数据通道、DataHub实时通道...

Elsa晓冰
2018/09/18
0
0
玩转大数据系列之一:数据采集与同步

数据的采集和同步,是先将数据从设备、或者本地数据源采集、同步到阿里云上,然后在阿里云上对数据进行分析和处理,最终完成您的业务要求。本文向您介绍阿里云各产品的数据采集和同步的操作实...

阿里云云栖社区
01/07
224
0
MaxCompute小文件问题优化方案

小文件背景知识 小文件定义 分布式文件系统按块Block存放,文件大小比块大小小的文件(默认块大小为64M),叫做小文件。 如何判断存在小文件数量多的问题 查看文件数量 判断小文件数量多的标...

云花
2018/12/20
0
0
订阅MySQL binlog 将数据插入到阿里云Datahub

使用阿里云的 DTS (https://www.aliyun.com/product/dts?spm=5176.8142029.388261.50.zjmBFA) 订阅binlog增量数据,将数据插入到 阿里云 Datahub (https://help.aliyun.com/document_detail......

ipaloma
2017/04/14
20
0

没有更多内容

加载失败,请刷新页面

加载更多

教你玩转Linux—添加批量用户

添加和删除用户对每位Linux系统管理员都是轻而易举的事,比较棘手的是如果要添加几十个、上百个甚至上千个用户时,我们不太可能还使用useradd一个一个地添加,必然要找一种简便的创建大量用户...

xiangyunyan
38分钟前
6
0
返回提示信息,如:xxx创建成功!

【服务端】在输出的方法块中,加入要输出的字段(qcm_batch_id) QCMUserType.cs: public struct QCM_Custom_Create_Batch_Out_Tag { public BASCoreType.Cmn_Out_T......

_Somuns
38分钟前
6
0
Aliyun Serverless VSCode Extension v1.12.0 发布

Aliyun Serverless VSCode Extension 是阿里云 Serverless 产品 函数计算 Function Compute 的 VSCode 插件,该插件结合了函数计算 Fun 工具以及函数计算 SDK ,是一款 VSCode 图形化开发调试...

阿里云官方博客
39分钟前
6
0
程序员如何培养解决复杂问题的能力?

今天在上网时候,突然看到了这篇文章,感觉非常的适合现在的自己去思考下,可能也适用在座的读者。程序员不仅仅是敲代码,更是一个复合能力的结合体,也不仅仅停留在技术和代码阶段。你想要成...

哥本哈根的小哥
42分钟前
8
0
市场变化驱动产品思维升级

宜信科技中心财富管理产品部负责人Bob,与大家一起聊聊个性化推荐产品功能的设计和B端产品的功能策划方式。 拓展阅读:回归架构本质,重新理解微服务 智慧金融时代,大数据和AI如何为业务赋能...

宜信技术学院
43分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部