通过Datax将CSV文件导入Hbase,导入之前的CSV文件大小和导入之后的Hadoop分布式文件大小对比引入的思考

由于项目需要做系统之间的离线数据同步,因为实时性要求不高,因此考虑采用了阿里的datax来进行同步。在同步之前,将数据导出未csv文件,因为需要估算将来的hbase运行的hadoop的分布式文件系统需要占用多少磁盘空间,因此想到了需要做几组测试。几个目的:
1、估算需要的hadoop的分布式文件系统需要占用的磁盘空间大小。
2、估算需要同步的时间
3、整个过程中的主要瓶颈是什么?
4、通过系统参数调优,优化整个数据同步过程。
当然,你需要一定的准备工作,需要有一个hadoop,hbase运行正常,hive和hbase已经整合完成。

使用python临时生成一些测试数据:

# -*- coding=UTF-8 -*-
'''
Created on Nov 11, 2018
临时生成需要测试的数据
@author: hadoop
'''
import pandas as pd
import numpy as np
import datetime
def pt(comments):
print('===Step:' + comments + '|' + datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S') + '===========================================')
def difference(left, right, on):
"""
difference of two dataframes
:param left: left dataframe
:param right: right dataframe
:param on: join key
:return: difference dataframe
"""
df = pd.merge(left, right, how='left', on=on)
left_columns = left.columns
col_y = df.columns[left_columns.size]
df = df[df[col_y].isnull()]
df = df.ix[:, 0:left_columns.size]
df.columns = left_columns
return df
pt('start')
#50-->25M
#60-->25M
#70-->25M
arrlength = 10000000
# lambda x: 0.0 if pd.isnull(x) else x      自己调整测试的数据量大小
rng = pd.date_range('1980-11-22', periods=arrlength, freq='S')
yehaibo = pd.date_range(start='1980-1-1', periods=arrlength, freq='D')
dfpcs = pd.DataFrame({'personid': np.random.choice(['normal', 'pending', 'die'], arrlength),
'pcscode': np.random.randint(43000000, 43999900, arrlength),
'age': np.random.randint(1, 100, size=arrlength),
'createtime': rng,
'partition': np.random.randint(1, 18, arrlength)
})
# xjcode
pt('lambda')
lambdaxj = lambda x: str(x)[0:6] + '00'  # 4:length
lambdasj = lambda x: str(x)[0:4] + '0000'  # 4:length
dfxjcode = pd.DataFrame({'fxjcode': dfpcs['pcscode'].map(lambdaxj)})
dfsjcode = pd.DataFrame({'sjcode': dfpcs['pcscode'].map(lambdasj)})
dfconcat = pd.concat([dfpcs, dfxjcode, dfsjcode], axis=1)
# print dfconcat
# pt('shape')
# print dfconcat.shape
# pt('')
# print dfconcat.ix[1:3, ['pcscode', 'age', 'sjcode']]
# pt('')
# print dfconcat[dfconcat['personid'] == 'pending']
# pt('not')
# print dfconcat[~(dfconcat['personid'] == 'pending')]
# pt('not in (-~)')
# print dfconcat[-(dfconcat['personid'].isin(['pending'])) & (dfconcat['age'] > 20)]
# pt('pl/sql  like')
# # print dfconcat[dfconcat['personid'].str.contains("^endd+$")]
# print dfconcat[dfconcat['personid'].str.contains("end")]
# pt('pl/sql substr')
# lambdasubstr = lambda x: str(x)[0:4] + '00'
# print dfconcat[dfconcat['personid'].map(lambdasubstr) == '430500']
# pt('')
# # /home/hadoop/pandas/eclipse-workspace/TestPandas/yehaibo
# dfgroupby = dfconcat.groupby(['personid', 'fxjcode']).count()
# print dfgroupby
# pt('pl/sql ')
# dfindex = dfconcat.set_index('createtime')
# print dfindex
# # print dfconcat[dfconcat.truncate(before='2017-11-22 00:01:38')]
# s = pd.Series(np.random.randn(10).cumsum(), index=np.arange(0, 100, 10))
# s.plot()
# df = pd.DataFrame(np.random.randn(10, 4).cumsum(0), columns=['A', 'B', 'C', 'D'], index=np.arange(0, 100, 10))
# df.plot()
#
# # print  df2.iloc[:,0:1]
# # print '439932'.map(lambdaxj)
# # pt('1')
# # print dfpcs
# # pt('2')
print(dfconcat)
# 将来datax就是直接读取这个文件,导入hbase
dfconcat.to_csv('/home/hadoop/PycharmProjects/GenerateCSVFilesForPressureTest/pcs' + str(arrlength) + '.csv', index=True, header=False)
# dfgroupby.to_csv('groupby.csv', index=True, header=True)
pt('end')

开始跑之前,队当前的csv文件大小,以及gzip压缩后的大小进行记录,后面好估算hadoop磁盘文件的大小

[hadoop@master GenerateCSVFilesForPressureTest]$ tar czf pcs.tar.gz pcs10000000.csv
[hadoop@master GenerateCSVFilesForPressureTest]$ ls -alh
total 805M
drwxrwxr-x.  3 hadoop hadoop  147 Sep 25 14:20 .
drwxrwxr-x. 19 hadoop hadoop 4.0K Sep 24 15:10 ..
lrwxrwxrwx.  1 hadoop hadoop   29 Sep 24 15:21 datax -> /home/hadoop/Datax3/datax/bin
-rw-rw-r--.  1 hadoop hadoop 3.2K Sep 24 15:18 generatedata.py
-rw-rw-r--.  1 hadoop hadoop 3.5K Sep 25 10:51 g.py
drwxrwxr-x.  2 hadoop hadoop  105 Sep 25 11:13 .idea
-rw-rw-r--.  1 hadoop hadoop 636M Sep 25 10:53 pcs10000000.csv            4.5倍(gzip压缩)                  13.5倍(未压缩)
-rw-rw-r--.  1 hadoop hadoop 6.2M Sep 25 10:30 pcs100000.csv
-rw-rw-r--.  1 hadoop hadoop 6.1K Sep 24 15:18 pcs100.csv
-rw-rw-r--.  1 hadoop hadoop 163M Sep 25 14:21 pcs.tar.gz                 17倍(gzip压缩)                   49.3倍(未压缩)

进行转化的datax脚本

[hadoop@master datax]$ cat t2h10000000.json
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/home/hadoop/PycharmProjects/GenerateCSVFilesForPressureTest/pcs10000000.csv",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "String"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.rootdir": "hdfs://master:9000/hbase_db",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,Slave1,Slave2"
},
"table": "rkxx",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type":"string"
}
],
"column": [
{
"index": 1,
"name": "rkxx:age",
"type": "string"
},
{
"index": 2,
"name": "rkxx:createday",
"type": "string"
},
{
"index":3,
"name": "rkxx:pcscode",
"type": "string"
},
{
"index":4,
"name": "rkxx:status",
"type": "string"
},
{
"index":5,
"name": "rkxx:fxjcode",
"type": "string"
},
{
"index":6,
"name": "rkxx:sjcode",
"type": "string"
}
],
"versionColumn":{
"index": -1,
"value":"123456789"
},
"encoding": "utf-8"
}
}
}
]
}
}
[hadoop@master datax]$

[hadoop@master bin]$ whereis python
python: /usr/bin/python /usr/bin/python2.7 /usr/lib/python2.7 /usr/lib64/python2.7 /etc/python /usr/include/python2.7 /home/hadoop/anaconda3/bin/python /home/hadoop/anaconda3/bin/python3.6-config /home/hadoop/anaconda3/bin/python3.6m /home/hadoop/anaconda3/bin/python3.6 /home/hadoop/anaconda3/bin/python3.6m-config /usr/share/man/man1/python.1.gz
[hadoop@master bin]$ /usr/bin/python2.7 datax.py ./t2h100000.json

2019-09-25 11:11:13.776 [job-0] INFO JobContainer -
任务启动时刻 : 2019-09-25 10:57:13
任务结束时刻 : 2019-09-25 11:11:13
任务总计耗时 : 840s
任务平均流量 : 588.29KB/s
记录写入速度 : 11904rec/s 1秒1万条
读出记录总数 : 10000000
读写失败总数 : 0

CPU 磁盘是瓶颈

[hadoop@master aftp]$ hadoop fs -du -h /
8.1 G /hbase_db
54 /hdfs
0 /rkxx
4.6 M /spark
368.3 M /tmp
968.5 M /user
2.3 K /yehaibo

[hadoop@master aftp]$ hadoop fs -du -h / ----1 小时以后 hadoop会自己进行容量大小的优化
2.8 G /hbase_db
54 /hdfs
0 /rkxx
4.6 M /spark
368.6 M /tmp
968.5 M /user
2.3 K /yehaibo

创建一个Hive的外部表:

CREATE EXTERNAL TABLE hisrkxx(key String, age String,createday String,fxjcode String,pcscode String,sjcode String,status String)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,rkxx:age,rkxx:createday,rkxx:fxjcode,rkxx:pcscode,rkxx:sjcode,rkxx:status","hbase.table.default.storage.type"="binary")
TBLPROPERTIES("hbase.table.name" = "rkxx");
hive> select count(key) from hisrkxx;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = hadoop_20190925111602_22d0f64f-30cf-4b15-a163-a1fd7c41c743
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1569374479979_0006, Tracking URL = http://Master:8088/proxy/application_1569374479979_0006/
Kill Command = /home/hadoop/svr/hadoop/bin/hadoop job  -kill job_1569374479979_0006
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
2019-09-25 11:16:27,343 Stage-1 map = 0%,  reduce = 0%
2019-09-25 11:17:11,870 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 41.85 sec
2019-09-25 11:17:21,138 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 45.19 sec
2019-09-25 11:17:33,837 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 52.0 sec
2019-09-25 11:17:41,332 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 54.59 sec
MapReduce Total cumulative CPU time: 54 seconds 590 msec
Ended Job = job_1569374479979_0006
MapReduce Jobs Launched:
Stage-Stage-1: Map: 3  Reduce: 1   Cumulative CPU: 54.59 sec   HDFS Read: 18489 HDFS Write: 108 SUCCESS
Total MapReduce CPU Time Spent: 54 seconds 590 msec
OK
10000000                                                                  **1千万数据**
Time taken: 100.433 seconds, Fetched: 1 row(s)
hive>

通过对比可以看出:
1千万数据 csv文件大小:636M gzip压缩后:163M
导入hbase后,hadoop是:
8.1G hadoop磁盘空间,1个小时后,hadoop自己优化为2.8G hadoop磁盘空间

前后的CSV和hadoop分布式文件系统的容量倍数对比:
hadoop优化稳定后增加4.5倍(gzip压缩) max增加13.5倍(未压缩)
hadoop优化稳定后增加17倍(gzip压缩) max增加49.3倍(未压缩)

整个过程中的瓶颈:
磁盘IO 和 CPU是重大瓶颈,基本上就卡在这两个因素上面了。

调优思路:
1、hadoop文件系统调优
2、转换机器需要用高端的磁盘,CPU
3、hbase参数调优
4、datax配置参数调优
5、如果调优后的时间满足不了业务要求,考虑替换为在业务发生的时候,消费消息队列(kafka,MQ)直接写入Hbase的方式。