Spark On MaxCompute如何访问Phonix数据

简介:如何使用Spark On MaxCompute连接Phonix,将Hbase的数据写入MaxCompute的对应表中,目前没有对应的案例,为了满足用户的需求。本文主要讲解使用Spark连接Phonix访问Hbase的数据再写入到MaxCompute方案实践。该方案的验证是使用hbase1.1对应Phonix为4.12.0。本文从阿里云Hbac F z X e k ^ - @se版本的选择、确认VPC、vswitchID、设置白0 e . ,名单和访问方式,Phonix4.12.0的` Z H客户端安装,在客户端实现Phonix表的创建和写入,Spark代码在本地IDEA的编写以及pom文件以及vpcList的配置,打包上传jar包并进行冒烟Q [ M w D J l E测试。

一、购买Hbase1.1并设置对应资源

1.1购买hbase

hbase主要版本为2.0与1.1,这边选择对应hbase对应的版本为1.1 Hbase与Hbase2.0版本的区别 HBase1.1版本 1.1版本基于HBase社区1.1.2版本开发。 HBase2.0版本 2.0版本是基于社区2018年发布的HBase2.0.0版本开发的全2 , n 3新版本。同样,在此基础上,做了大量的改进和优化,吸收了众多阿里内部成功经验,比社区G - ~ EHBase版本具有更好的稳定性和性能。

Spark On MaxCompute如何访问Phonix数据

1.2确认VPC,Q } z NvsWitchID

确保测试联通性的可以方便可行,该hbase的VPCId,vsWitchID尽量与购买的独享集成资源组的为一致的

Spark On MaxCompute如何访问Phonix数据

1.3设置hbase白名单,其中DataWorks白名单如下,个人ECS也可添加

Spark On MaxCompute如何访问Phonix数据

根据文档链接选择对应的DataWorks的region下的白n T X o H |名单进行添加

Spark On MaxCompute如何访问Phonix数据

1.4查看hbase对应的版本和访问地址

打开数据库链接的按钮,可以查看到Hbase的主版本以及Hbase的专有网络访, P i K ;问地址,以及是否开通公网访问的方式进行连接。

Spark On MaxCompute如何访问Phonix数据

二、安装Phonix客户端,并创建表和插入数据

2.1安装客户端

登陆客户端执行命令

./bin/sqlline.py 172.16.0.= i U |13,172.16.0.15,1723 p b T F -.16.0.12:2181

Spark On MaxCompute如何访问Phonix数据

F 6 S . n /建表:

CREATE TABLE IF NOT EXISTS us^ r Uers_phonix

id INT ,

username STRING,

password STRING

) ;

插入数据:

UPSERT INTO usew N , N $ k drs (id, username, password) VALUES (1, 'admin8 y 9 3 % d t', 'Letmein');

2.2查看是否创建和插入成功

在客户端执行命令,查看? c V G N } 4当前表与数据是否上传成功

select * from users;

Spark On MaxCompute如何访问Phonix数据

三、编写对应代码逻辑

3.1编写代码逻辑

在IDEA按照对应得Pom文件进行配9 : ] 7 b置本地得开发环境,将代码涉及到得配置信息填写完整,进行编写测试,这里可以先使用Hbase得公网访问链接进行测试,代码逻[ - C 3 a M _ #辑验证D } } q ; ?成功后可调整配置参数,具体代码如下

package com.git.phonix

import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.SparkSession

import org.apache.phoenix.spark._

/**

* 本实例适用于Phoenix 4.x版本

*/

object SparkOnPhoenix4xSparkSession {

def main(args: Array[String]): Unit = {

//HBase集群的ZK链接地址。

//格式为:xxx-002.hbase.rds.aliyuncs.8 P n u fcom,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181

val zkAddress = args(0)

//Phoenix侧的表名,需要在Phoeni0 ] e Y C 7 W ;x侧提前创建。Phoenix表创建可以参考:https://? c ) lhelp.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW

val phoenixTableName = args(1)

//Spark侧的表名。

val ODPSTableName = args(2)

val sp& E } * ^arkSession = SparkSession

.builderH + ]()

.appNameI * = C { Q g l Y("SparkSQL-on-MaxCompute")

.config("spark.sql.broadcasr ] q S n 7 s ) 4tTimeout", 20 * 60)f b U

.config("spark.sql.crossJoin.enabled", true)

.config("odps.exec.dynamic.partition.mode", "n& + konstrictU k E 5 / 0 O")

//.confi_ ? J )g("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数

.config("spark.hadoop.odps.project.name", "***")

.config("spark.hadoop.odps.access.id", "C ! j ? $***")

.con& F c m o t P tfig("spark.hadoop.odps.access.key", "***")

//.config("spark.hadoop.odps.end.r A d O ; k P #p+ O q e w ( U Voint", "http://service.cn.maxcompute.alq H Ciyun.com6 k a r [/api")

.config("spark.hadoop.odps.end.point", "http://service.cn-beijingN + Z G x T ~ W ;.may E T R Y H _ Q pxcompute.aliyun-inc.com/api")# O $ !

.config("spark.sql.catalogImpleme ? / A M v mntation", "odp. f m n E r : Ns")

.getOrCrA t 5eate()

//第一种插入方1 ( Q # O ! g N

var df = sparkSession.read.format("org.apache.phoenix.spark").option("table",A I i U phoenix4 V [ y T 0 RTableName).optio] E 0 q X ~n("zkUrl",zkAddress).load()

df.show()

df.write: ( ! j.mode("overwrite").insertInto(ODPSTableName)

}

}

3.2对应Pom文件

pom文件中分为Spark依赖,与ali-phoenix-sN ~ * z &park相关的依赖,由于涉及到ODPS的jar包,会在集群中引起jar冲突,所以要将ODPS的u ~ x包排除掉

<?xml version="1.0 J 4" encoding="UTF-8"?&g^ [ = h ( # } w ct;

<!--

Licensed under the Apache License, Version 2T M |.0 (the "License");

you may not use this file except in compliance with the License.

You may obtain a copy of the License at

httl L & Z T {p://www.apache.org/licenses/LICENSE-2.0

Unless re: k : q 7 Fquif ; $ / F % Zred by applicable law or agreed to in writing, sofN ^ 5 Z . j X Y ,tware

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitat9 { iions under the License. See accompanying LICENSE fis $ 6 p = c h _le.

-->

<project xmlns="http:/$ D M C ^ :/maven.apache.org/POM/4.[ e 4 g0.0"

xmlns:xsi="http://www.w3.org3 # u 2 t/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/PO8 U u | J 0 .M/4.0.0 http://V I cmaven.apache.org/xsd/maven-4.0.0.xsd">

<modes P O E ~ o U GlVersion>4.0.0</modelVersion>

<properties>

<sparR } ` s : - - _ &k.ve- n X & , A f } Jrsion>2.3.0</spark.version>

<cupid.sdk.version>3.3.8-public</cupid.sS $ S } t ` ^dk. [ C Iversion>

<scala.version>2.11.8</scala.version>

<scala.binary.version>2.11</scala.binary.version>

<phoenix.version>4.12.0-HBasv B l @e-1.1</phoenb ` T g uix.version7 [ ? R + S>

</properties>

<groupId: ; 6 m>com.aliyun.odps<L 3 Y 5 u V a;/groupId>

<art2 M R L =ifactId>Spark-Phonix</artifactId>

<version>1.0.0-SNAPSHOT</version>

&lJ U qt;packagiP G a z $ @ v ` +ng>jar</packaging>

<dependencies>

<dependenc- % H i l L * X hy>

<groupId>org.jpmml</groupId>

<artifactId>pmml-model</artifactId>

<version>1.3.8</version>

</dependency>

<dependency>

<groupId>org.jpmml</groupId>

<artifactId>pmml-evaluator</artifactId>

<version>1.3.10</version>

</dependency>

<dependency>

<grou[ ] _pId>org.apache.spK u , t 6 9 N Rark</groupId>

<artifactId>spark-corK $ v d / je_${scala.binary.version}</artifactId>

<version>${spark.versionf V 3 ] w ;}</version>

<scope>provided</scope>s 4 B & d C

<exclusions&gk G 0 e r ] 6t;

<exclusion&b 0 ) Fgt;

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

</exclusion>

<exclusion>

<groupId>org.scala-lang</groupId&gi e T c 3 Q z 1 *t;

<artifactId>sX X D $ 6 C , j Zcalap</artifactIs V I .d>

</= s p R } + Lexclusion>

</exclusions>

</dependency>

<depe% L 6 ~ P M v n tndency>

<groupId>org.apache.spark</groupId>J Y / [ B;

<artifactId>spark-sq_ p - :l_F X E${scala.binary.version}</artifactId>

<versi- P Y / * ) ^on>${spark.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupIdy y I>org.apache.spark</groupId>

<artifactId>spark-mllib_${scala.binary.version}</artifactId>

<versiA k } 6 i B hon>${spark.version! f %}</version>

<scope>provided&w [ d 3 , Vlt;/scd 8 Y ] S # K _ope>

</dependency>

<dependency>

<groA ` O z h YupId>org.apache.spark</groupId>

<artifactId>spark-streaming_${scala.binary.ver# c & }sion}</artifactId>

<version>${spark.version}</version>

<scope>provided</F 6 * K l G $ ,scope>

</dependency>

<dependency>

<groupId>com.aliyun.odps</groupId>

<artifactId>cupid-sdk</artifactId>

<version>${cupid.sdk.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>com.aliyun.phoenix</groupId>

<artifactId>ali-phoenix-core</artifactId>

<vers^ B _ * 2 3 ^ H 5ion>r m ; L R : 4 0;4.12.0-AliHBase-1.1-0.8</vP , jersion>

<w ) ( + f 4 : !;exclusions>

<exclusion>

<groupId>com.aliyun.odps</y G 8 + TgroupId>

<artifR a C V uactId>odps-sdk-mapred</artifactId>

</exclusion&j A V Ugt;

<exclusion>

<groupId>com.aliyun.odps</groupId>

<artifa X ~ j l { 5 t @act- ( b : m 3Id>odps-sdk-commons</E d I tartifactId>

</exclusion>

</exclusiq Q Q 6 Bons>

</dependency>

<dependency>

<gp l roupId>com.aliyun$ E x l.p$ s {hoenix</groupId>

<artifactId>ali-phoenix-spark</f t G 4 W Z A 9 /artifactId>

<version>R S S ( W;4.12.0-AliHBase-1.1-0.8</version>

<exclusions>

<exclusion>

<groupQ w * C U j HId>com.aliyun.phoenix</groupId>

<9 k . P * | AartifactId>ali-phoenix-core</artifacn i r h y BtIS p ^ K zd>

</exclusion&B / A = } _ 1gt;

</exclusions>

</depen( u = Udency>

</dependencies>

<build>

<plugins>

<plugz ) ; B 2 = *in>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-s. x 1 [ ` d 9 }hade-pluginR [ & X I G</artifactId>

<version>2.4.3</v0 l i *ersion>

<executx . N ^ Qions>

<execution>

<phase>p { . e wackage<) # ( Z 9 # w /;/phase>

<goals>

&lk * ^ g O F 4 `t;goal>shade</goal>

</goals>

<configuration&! t c : L Hgt;

<minimizeJar>false</minimizeJar>

<shadedArtifactAttached>true</shadeg E 9 b HdArtifactAttached>

<artifactSet>

&lC * P U 5 l st;incq X n 5 , I s g xludes>

<x 5 b;!-- Include here the dependencies you

want to be packed in your fat jari 6 H f . 4 3 ! d -->

<include>*:*</include>

</includes>

</artifactSet>

<filters>

<filtz ` ter&9 M N - ) = mgt;

<artifact>*:*</artifact>

<excludes>

&% a z k C + j _lt;exclude>META-INF/*.SF</exclude>

<ex# . f ] D 4clude>META-INF/*.DSA</exclude>

<exclude>META-INF/*.RSA</exclude>

<exclude>**/log4j.properties</exclude>

</excludes>

</filter>

</filters>

<transformers&go { j Z x [ a tt;

<transformer

implementation="org.apache.mavJ F ! q ` = {en.plugins.shade.resource.AppendingTransformer">

<resource>reference.conf</resource>

</transformer>

<transformer

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

<resource>META-INF/w - [services/org.apache.spark.sql.sources.DataSourceRegister</$ / L d 7 Y j (resource>

</transformer>

</transformers>

<B n * 5 w (/configuration>

</execution>

</executions>

</plugin>

<plugin>

<groupId>net.alchim31.maven</groupId>

&] } - 0 t 8 + zlt;artifactId>scala-maven-plugin</artifactId>

<version>3.3.2</version6 z @ M>

<exed D pcutions&g$ ` ? mt;

<execution>

<id>scala-compile-first</id>

<phase>proce@ P Ass-resources</phase>

&l$ / 1 n mt;goals>

<o B P rgoal>compile</n e r f p n Ggoal>

</goals>

</exech z R ^ s r 1 /ution>

<execution>

<id>scala-test-compile-firp n zst</id>

<phase>process-test-resources</phase>

<goals>

<goal>testCompile</goal&gV ( I 1 q M lt;

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

四、打包上传到DataWorks进行冒烟z / q 0 ( ( } y X测试

4.1创建要传入的MaxCompute表

CREATE TAB~ . e D P n 6 J %LE IF NOT EXISTS users_phonix

id INT ,

username STRING; 2 b c / . :,

password STRING

) ;

4.2打包上传到MaxCompute

在IDEA打包要打成shaded包,将/ c [ _ )所有的依G q {赖包,打入jar包中,由于DatadWork界面方式上传jar包有50M的限制,因此[ X 0 { v采用MaxCompute客户端进行jar包

Spark On MaxCompute如何访问Phonix数据

4.3选择对应的project环境,查看上传资源,并点击添加到数据开E c o | 6 g D } /

进入DataWorks界面选择左l . R W ( v h f x侧资源图标d l l ` % C r I Y,选择对应的环境位开发换进,输入删除文件时的文件名称进行搜索,列表中展示该资源已经上传成,点击提交到数据开发

Spark On MaxCompute如何访问Phonix数据

点击提交按钮

Spark On MaxCompute如何访问Phonix数据

4.4配置对应的vpcList参数并提交任务测试

其中的配置vpcList文件的配置信息如o ; ^下,可具体根据个人hbase的链接,进行配置

{

"regionI] i u f S W ! ! Id":"o ; p w & y k zcn-beijing",

"vpcs":[

{

"vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",

"zones":[

{

"urls":[

{

"domain":"172.16.0.12",

"port":2181

},

{

"domain"L + I =:"172.16.0.13",

"port":2181

},

{

"domz W # x Cain":"172.16.0.15",

"port":2181

},

{

"domain":"172.16.0.14",

"port":218g l G1

},

{

"domain":"172.16.0.12",

"port":16000

},

{

"domain":"172.16.0.13",

"port":18 ) J | h6000

},

{

"doma? * G U kin":"172.16.0.15",

"portr j / s #":16000

},

{

"domain":"172.16.0.14",

"port":16000

},, L b C F = q

{

"domain9 { C"P : a | P 0 U Z p:"z i y e 6 ? G ; 1172.16A j u ].0.12",

"port":16020

},

{

"domain":"172.16.0.13",

"port":16020

},

{

+ V ] % V V ^ K"domain":"172.16.0.15",

"port":16020

},

{

"domain":"172.16.0.14",

"port":16020

}

]

}

]

}

]

}

Spark任务提交任务的配置参数7 e e ,主类,以及# % O I D R B .对应的参数 该参数主要为3个参数第一个为Phonix的链接,第二个为Phonix的表名称,第三个为传入的MaxCompute表

Spark On MaxCompute如何访问Phonix数据

点击冒烟测试按钮,可以看到任务执行成功

Spark On MaxCompute如何访问Phonix数据

在临时查询节点中执行查询p | k : p # C D语句,可以得到数据已经写入MaxCompute的表中

Spark On MaxCompute如何访问Phonix数据

总结:

使用m & - @ 0 I @ bSpark on MaxCompute访问Phonix的数据,并将数据写入到MaxCompute的表中经过实n q y ! [践,该方案时可行的。但F G a _ h 9 x - 在实践的时有几点注意事项:

1.结合实际使用情况选择对应的Hbase以及Phonix版本,对应的版L C X e本一致,并且所使用的客户端,以及代码依赖都会有所改变。

2.使用公网在IEAD进行本地测试,要注意: } ? sHbase白名单,不仅要设置DataF f $ $ cWorks的白名单,还需将自己本地的地址加入7 l } I E j到白名单中。

3.代码打包时需要将pom中的依赖关系进行梳理l $ . B s $ 4 ,避免ODPS所N K [ f 7 2 D- f ; { [ g -在的包在对应的依赖中,进而引起jar包冲突,并且打包: g 0 L x时打成shaded包,避免缺失遗漏对应的依赖。

作者:耿江涛

本文~ 7 6 t k为阿里云原创内容,未经允许不得转载。