大数据搬站step by step

IDC / ECS自建和云数据库之间的数据搬站

1 IDC -> MaxCompute / EMR

【方案】:使用“独享数据集成资源组”,绑定可以连通用户IDC的用户VPC,然后提工单,由阿里云数据集成开发人员在独享数据集成资源组上配置路由,使独享数据集成资源组可以访问IDC内数据源。

1.1 IDC Hadoop(HDFS / Hive)-> EMR

【HDFS读取】:https://help.aliyun.com/knowledge_detail/137721.html
【HDFS写入】https://help.aliyun.com/knowledge_detail/137759.html

1.12  前期准备

  • 创建好EMR集群,详细文档请参见:E-MapReduce集群创建
  • 现在自建Hadoop迁移到E-MapReduce可以通过OSS进行过度,或者使用阿里云高速通道产品建立线下IDC和线上E-MapReduce所在VPC网络的连通。
  • 配置DataWorks独享数据集成资源组,详细文档请参见:独享资源模式

本文以在华北2(北京)区域创建项目为例,同时启动Dataworks的相关服务。

2.12  数据准备

IDC集群测试数据准备,本例中HIVE建表语句如下:

CREATE TABLE IF NOT EXISTS test(
id INT,
name STRING,
hobby STRING,
region STRING
)
PARTITIONED BY(pt string)
row format delimited
fields terminated by ','
lines terminated by 'n'
;

插入测试数据:

insert into
test PARTITION(pt =1) values(1,'xiaoming','book','beijing'),(2,'lilei','TV','nanjing'),(3,'lihua','music','hebei'),(4,'xiaoma','draw','henan'),(5,'laoli','piano','heinan');

2.13 IDC自建Hadoop数据源连接
Dataworks数据集成-数据源管理-新增数据源-HDFS
大数据搬站step by step
   
使用“独享数据集成资源组”,绑定可以连通用户IDC的用户VPC,然后提工单,由阿里云数据集成开发人员在独 享数据集成资源组上配置路由,使独享数据集成资源组可以访问IDC内数据源。

EMR的Hadoop数据源连接
Dataworks数据集成-数据源管理-新增数据源-HDFS
大数据搬站step by step

大数据搬站step by step

2.14 在EMR的Hadoop上创建目录来存放ECS集群同步过来的数据

hadoop fs -mkdir /emr_test

2.15 在DataWorks中创建数据同步节点
    配置数据同步节点并点击转换为脚本模式,配置MaxCompute Reader 和 HDFS Writer 脚本可参考官方文档:配置HDFS Reader 和 配置HDFS Writer
本文档中配置如下:

{
"type": "job",
"steps": [
{
"stepType": "hdfs",
"parameter": {
"path": "/user/hive/warehouse/gitdatabase.db/test/",
"datasource": "IDC_EMR",
"column": [
"*"
],
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "hdfs",
"parameter": {
"path": "/emr_test/",
"fileName": "emr_test",
"datasource": "IDC_EMR",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "hobby",
"type": "string"
},
{
"name": "region",
"type": "string"
},
{
"name": "col5",
"type": "date"
}
],
"writeMode": "append",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,
"throttle": true,
"mbps": "10"
}
}
}

配置独享资源组:
大数据搬站step by step

2.16 脚本按需求配置并保存,运行节点:
大数据搬站step by step

查看EMR集群hdfs数据是否同步成功:hadoop fs -cat /emr_test/*
大数据搬站step by step

1.2 IDC Kafka -> EMR Kafka

【Kafka读取】:https://help.aliyun.com/knowledge_detail/137745.html
【Kafka写入】https://help.aliyun.com/knowledge_detail/145510.html

1.21  环境准备

  • 已完成阿里云EMR服务自动化搭建Kafka集群,详细文档请参见:E-MapReduce
  • 数据工场DataWorks,详细文档请参见:DataWorks
  • 购买独享数据集成资源,使独享资源组可以访问您的云资源,详细文档请参见:独享资源组模式
    IDC自建需要与线上E-MapReduce所在网络的连通。

为保证您可以顺利登录EMR集群Header主机,及DataWorks可以顺利和EMR集群Header主机通信,请您首 先配置EMR集群Header主机安全组,放行TCP 22及TCP 9092端口。
本文中在华北2(北京)区域创建项目,同时启动DataWorks相关服务。通过独享集成资源组绑定到与Kafka对应的VPC,点击专有网络绑定,选择与Kafka对应的交换机进行连接。

1.22  准备IDC kafka测试数据
创建测试Topic
执行如下命令创建测试所使用的Topic testkafka。

[root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
Created topic "testkafka".

执行如下命令查看已创建的Topic。

[root@emr-header-1 ~]# kafka-topics.sh  --list --zookeeper emr-header-1:2181/kafka-1.0.1
__consumer_offsets
_emr-client-metrics
_schemas
connect-configs
connect-offsets
connect-status
testkafka

写入测试数据
您可以执行如下命令,模拟生产者向Topic testkafka中写入数据。由于Kafka用于处理流式数据,您可以持续不断的向其中写入数据。为保证测试结果,建议您写入10条以上的数据。

[root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
>123
>abc
>

为验证写入数据生效,您可以同时再打开一个SSH窗口,执行如下命令,模拟消费者验证数据是否已成功写入Kafka。当数据写入成功时,您可以看到已写入的数据。

[root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
123
abc

1.23  在EMR kafka创建topic来放同步过来的数据:

[root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic emrkafka  --create
Created topic "testkafka".

 
1.24  数据同步
       a. 在您的业务流程中右键单击数据集成,选择新建 > 数据集成 > 离线同步。
       b. 新建数据同步节点后,您需要选择数据来源的数据源为kafka,数据去向的数据源为kafka,完成上述配置后,请单击下图框中的按钮,转换为脚本模式
       c. 完成脚本配置后,请首先切换任务资源组为您创建的独享资源组,然后单击运行。

1.25  确认数据导入    
       a. 完成运行后,您可以在运行日志中查看运行结果,如下为成功运行的日志。
       b. 查看EMR Kafka数据是否成功写入。

1.3 IDC MySQL -> MaxCompute

【MySQL读取】
https://help.aliyun.com/knowledge_detail/137725.html  
【MaxCompute写入】
https://help.aliyun.com/knowledge_detail/137466.html

1.31  环境准备

  • 大数据计算服务MaxCompute,详细文档请参见:MaxCompute
  • 数据工场DataWorks,详细文档请参见:DataWorks
  • 购买独享数据集成资源,使独享数据资源可以访问您IDC的数据源,详细文档请参见:独享资源模式

1.32  数据采集
本文通过开通数据网关资源,使用DMS来管理IDC的MySQL。通过数据集成-数据源-新增数据源MySQL JDBC Url连接。

1.33 在MaxCompute中创建数据表存放同步过来的数据。本实例建表:idc_my。

1.34  数据同步
配置MySQL数据同步节点
大数据搬站step by step

配置数据集成任务时,将默认资源组配置为需要的独享数据集成资源。
通过向导模式配置任务时,在通道控制 > 任务资源组下拉框中,选择相应的独享数据集成资源。
大数据搬站step by step

大数据搬站step by step

确认当前节点的配置无误后,单击左上角的保存。
关闭当前任务,返回业务流程配置面板。

1.35 运行业务流程
右键单击rds_数据同步节点,选择查看日志。
当日志中出现如下字样,表示同步节点运行成功,并成功同步数据。
大数据搬站step by step
然后在数据开发页面,单击左侧导航栏中的临时查询,进入临时查询面板,再确认数据是否已经导入MaxCompute中。

2 云上ECS自建 -> MaxCompute / EMR

【方案】:使用“独享数据集成资源组”,绑定可以自建数据源所在用户VPC,然后提工单,由阿里云数据集成开发人员在独享数据集成资源组上配置路由,使独享数据集成资源组可以访问ESC内数据源。

2.1 ECS Hadoop(HDFS / Hive)-> EMR

  • 【HDFS读取】:https://help.aliyun.com/knowledge_detail/137721.html
  • 【HDFS写入】https://help.aliyun.com/knowledge_detail/137759.html

2.11  前提条件

  • 配置DataWorks独享数据集成资源组,详细文档请参见:独享资源模式
  • 创建好EMR集群。
  • 由于 VPC 实现用户专有网络之间的逻辑隔离,E-MapReduce 建议使用 VPC 网络。
  • 经典网络与 VPC 网络打通
    如果 ECS 自建 Hadoop,需要通过 ECS 的classiclink的方式将经典网络和 VPC 网络打通,参见建立 ClassicLink 连接。
  • VPC 网络之间连通
    数据迁移一般需要较高的网络带宽连通,建议新旧集群尽量处在同一个区域的同一个可用区内。

本文以在华北2(北京)区域创建项目为例,同时启动Dataworks的相关服务。

2.12  数据准备
ECS集群测试数据准备
本例中HIVE建表语句如下:

CREATE TABLE IF NOT EXISTS test(
id INT,
name STRING,
hobby STRING,
region STRING
)
PARTITIONED BY(pt string)
row format delimited
fields terminated by ','
lines terminated by 'n'
;

插入测试数据:

insert into
test PARTITION(pt =1) values(1,'xiaoming','book','beijing'),(2,'lilei','TV','nanjing'),(3,'lihua','music','hebei'),(4,'xiaoma','draw','henan'),(5,'laoli','piano','heinan');

2.13 ECS自建Hadoop数据源连接
Dataworks数据集成-数据源管理-新增数据源-HDFS
大数据搬站step by step

使用“独享数据集成资源组”,绑定可以自建数据源所在用户VPC,然后提工单,由阿里云数据集成开发人员在独享数据集成资源组上配置路由,使独享数据集成资源组可以访问ESC内数据源。

EMR的Hadoop数据源连接
Dataworks数据集成-数据源管理-新增数据源-HDFS
大数据搬站step by step
大数据搬站step by step

2.14 在EMR的Hadoop上创建目录来存放ECS集群同步过来的数据

hadoop fs -mkdir /emr_test

2.15 在DataWorks中创建数据同步节点
配置数据同步节点并点击转换为脚本模式, 配置MaxCompute Reader 和 HDFS Writer 脚本可参考官方文档:配置HDFS Reader 和 配置HDFS Writer
本文档中配置如下:

{
"type": "job",
"steps": [
{
"stepType": "hdfs",
"parameter": {
"path": "/user/hive/warehouse/gitdatabase.db/test/",
"datasource": "IDC_EMR",
"column": [
"*"
],
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "hdfs",
"parameter": {
"path": "/emr_test/",
"fileName": "emr_test",
"datasource": "IDC_EMR",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "hobby",
"type": "string"
},
{
"name": "region",
"type": "string"
},
{
"name": "col5",
"type": "date"
}
],
"writeMode": "append",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,
"throttle": true,
"mbps": "10"
}
}
}

配置独享资源组:
大数据搬站step by step

2.16 脚本按需求配置并保存,运行节点:
大数据搬站step by step

查看EMR集群hdfs数据是否同步成功:hadoop fs -cat /emr_test/*
大数据搬站step by step

2.2 ECS MySQL -> MaxCompute

【MySQL读取】
https://help.aliyun.com/knowledge_detail/137725.html
【MaxCompute写入】
https://help.aliyun.com/knowledge_detail/137466.html

2.21 环境准备

  • 大数据计算服务MaxCompute,详细文档请参见:MaxCompute
  • 数据工场DataWorks,详细文档请参见:DataWorks
  • 购买独享数据集成资源,使Dataworks可以访问您的云资源,详细文档请参见:独享资源组模式
    本文以在华北2(北京)区域创建项目为例,同时启动Dataworks的相关服务。经典网络ECS上自建的数据源和DataWorks在同已区域为例。

2.22 数据采集
(1)以项目管理员身份登录DataWorks控制台,单击相应工作空间后的进入数据集成。
(2)单击左侧导航栏中的数据源,即可跳转至工作空间管理 > 数据源管理页面。
(3)单击数据源管理页面右上角的新增数据源。
(4)在新增数据源对话框中,选择数据源类型为MySQL。
(5)填写MySQL数据源的各配置项。 
MySQL数据源类型需要用连接串模式。
大数据搬站step by step
(6)单击测试连通性。
(7)连通性测试通过后,单击完成。

2.23 在MaxCompute中建立数据表,存放数据同步过来的数据。本次同步建表:test_tab。

2.24 新建数据同步任务
在DataWorks上新建数据同步节点,详情请参见数据同步节点。
大数据搬站step by step

大数据搬站step by step

配置数据集成任务时,将默认资源组配置为需要的独享数据集成资源。
大数据搬站step by step

完成上述配置后,单击运行即可。运行成功日志示例如下所示。
大数据搬站step by step

2.25  确认数据是否成功导入MaxCompute
大数据搬站step by step

3 云上DB  -> MaxCompute / EMR

【方案】直接创建云上DB数据源,然后向导模式配置同步任务即可

3.1 RDS / Mongo -> MaxCompute

【创建RDS数据源】:https://help.aliyun.com/knowledge_detail/137690.html
【创建MongoDB数据源】:https://help.aliyun.com/knowledge_detail/137675.html
【MaxCompute写入】
https://help.aliyun.com/knowledge_detail/137466.html

3.11  环境准备

  • 大数据计算服务MaxCompute,详细文档请参见:MaxCompute
  • 数据工场DataWorks,详细文档请参见:DataWorks
    3.12 数据采集

新增MySQL数据源
(6) 以项目管理员身份登录DataWorks控制台,单击相应工作空间后的进入数据集成。
(7) 单击左侧导航栏中的数据源,即可跳转至工作空间管理 > 数据源管理页面。
(8) 单击数据源管理页面右上角的新增数据源。
(9) 在新增数据源对话框中,选择数据源类型为MySQL。
(10) 填写MySQL数据源的各配置项。
MySQL数据源类型包括阿里云实例模式和连接串模式。
以新增MySQL > 阿里云实例模式类型的数据源为例。
大数据搬站step by step
大数据搬站step by step

(6)  单击测试连通性。
(7)  连通性测试通过后,单击完成。

新增MongoDB数据源
(1) 单击左侧导航栏中的数据源,即可跳转至工作空间管理 > 数据源管理页面。
(2) 单击数据源管理页面右上角的新增数据源。
(3) 在新增数据源对话框中,选择数据源类型为MongoDB。
(4) 填写MongoDB数据源的各配置项。

MongoDB数据源类型包括实例模式(阿里云数据源)和连接串模式。
实例模式(阿里云数据源):通常使用经典网络类型,同地区的经典网络可以连通,跨地区的经典网络不保证可以连通。
连接串模式:通常使用公网类型,可能产生一定的费用。
以新增MongDB > 实例模式(阿里云数据源)类型的数据源,经典网络为例。
大数据搬站step by step
大数据搬站step by step

(5) 单击测试连通性。
(6)测试连通性通过后,单击完成。

3.13 登录MongoDB的DMS控制台,本示例使用的数据库为admin,集合为abc。
大数据搬站step by step

3.14 新建表
(1)在数据开发页面打开新建的业务流程,右键单击MaxCompute,选择新建 > 表。
(2)在新建表对话框中,输入表名,单击提交。
此处需要创建2张表(msl_maxc和mon_maxc),分别存储同步过来的MySQL日志数据和MongoDB日志数据。

3.15 新建数据同步任务
在DataWorks上新建数据同步节点,详情请参见数据同步节点。
大数据搬站step by step

配置MySQL数据同步节点
大数据搬站step by step

配置MongoDB的数据同步节点。(不支持向导模式,此例为脚本模式)

  {
"type": "job",
"steps": [
{
"stepType": "mongodb",
"parameter": {
"datasource": "mon_maxc",
"column": [
{
"name": "store.bicycle.color",
"type": "document.string"
}
],
"collectionName": "abc"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"tableType": null,
"partition": "",
"truncate": true,
"datasource": "odps_first",
"column": [
"*"
],
"guid": null,
"emptyAsNull": false,
"table": "mon_maxc"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 2,
"throttle": false
}
}
}

3.15 完成上述配置后,单击运行即可。运行成功日志示例如下所示。
大数据搬站step by step

确认数据是否成功导入MaxCompute
大数据搬站step by step