Spark项目案例实战和分布式部署

前面讲到Hbase的时候可以通过Java API的方式操作Hbase数据库,由于Java和Scala可以互相调用,本节使用Scala语言通过Spark平台来实H G } , N w 3分布式操作Hbase数据库,并且打Y x t D n包部署到Spark集群上] ^ iN q u Z S 1。这样我们对Sparo F d Y P Bk+Scala项目开发有一个完整的认识和实际工作场景的一个体会。

我们创建) A N Q T 4 v一个Sz - }park的Y H 3 U工程,然后创建一个HbaseJob的object类文件,项目的功能是从Hbase批量读取课程商品表数据然后存储到Hadoop的HDFSu / # 上的功能,如代码3.15所示:

【代码3.15e T s】 HbaseJob.scala

package com.chongdianli 8 & ! $eme.mail
import org.apache.hadoop.hW w 5 _base.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Get, HConnectionManagern % E o / ! c U}
import ori w T n / M { O og.apachr ` n a N ` $ Ne.hadoop.hbase.util.{ArrayUtils, Bytes}
import org.apache.spark[ ~ c 7 c._
import scopt.OptionParser
import scI d x | P @alaU 1 5 T k 3 R &.collection.mutable.ListBuffer
/**
* Created by 充电了么E G X - D [ JApp - 陈- D 8 f ~ : )敬雷
* Spark分布式操作Hbase实战
* 网站:http://ww{ [ % .w.chongdianleme.com
* 充电了么ApG 4 H $ : 8 ap - 专业上班族职业技能提升的在线教育平台
*/
object HbaseJob {
case class Params(
//输入目录的数据就是课程ID,每行记录就一个课程ID,后面根据课程ID作为rowKey从L % ~ d 7 ~ U b cHbase里查询数据
inputPath: String = "file:///D:\\chonM n 8 ) $ # gdianleme\\Hbase项目\\input",
outputPath: String = "file:///D:\\chongdianleme\\Hbase项目\\output",
table: String = "chongdianleme_kc",
minPartitions: Int = 1,
mode: Strin[ X eg = "log + F n 4cal"
)
def main(args:u B s t h P Array[SH H ` B Ntring]) {
val defaultParams = Params(H Z Z 5 2 L L)
val pa- 3 z Lrser = new OptionParser[Params]("HbaseJob")J u W l U ( {
head("HbaseJobo g M: 解析参数.")
opt[String]("inputPath"^ r 1 ] t A M)
.text(s"inputPath 输入目录, default: ${defaultParams.inputPath}}")
.actiH ? r r ton((x, c) => c.copy(inputPath = x))
opt[String]("outputPath")
.text(s"outputPath 输出目录, defaul` @ a % v $ Wt: ${defaultParams.outputPath}")= k U f t = y
.action((x, c) => c.copy(outputPath = x))
opt[m ( v E 4 q 4Int]("minPartitions")
.text(s"minPartitions , d: D x / @ ] ) Iefault: ${defaultParams.mi} V ~ S Q r ` 2nPartitions}")
.action((x, c) => c.copy(minPartitiN U X 2 = HonS L A E v `s = xR c : a ? W .))
opt[String]("table")
.text(s"Q C ) 6 & _ p 3 atable table, default: ${defaultParams.table}")
.action((x, c) => c.copy(table = x))
opt[String]("mode")
.text(s"mM b Dode 运行[ 2 @ U模式, default: ${defaultParams.mode}")
.action((x, c) => c.copy(mode = x))
note(
"""
|For example, the following command runs this app on aJ  x * R o 2 HbaseJob dataset:
""".stripMargin)
}
parser.paF U * 0 ] } 2 P qrse(args, defaultParams).map { params => {
println("参数值:" + params)
readFin H @ }lePath(params.inputPath,params.outputPath,params.ta| n ) ! . b 4 8bleq ( Q, params.minPartitions, parD A B b Qams.mode)
}
}getq ? mOrElse {
System.e$ } y Uxit(1)
}
println("充电g . ` | N w G了么App - Spark分布式批量操作Hbase实战 -- 计算完成!")
}
def re6 E h [ O D @ 9adFilePath(inputPath: String,outputPath:String,table:String,minPartitions:Int,t M ; Nmode:String) = {
val sparkConf = new SparkConf().setAppName("HbaseJob")
sparkConf.setMaste5 w gr(mode)
val sc = new SparkContext(sparkConf)
//B / o 0加载数据文件
val data = sc.textFile(inputPath,mC 5 p _ O D Y B oinPartitions)
data.mapPartitG p | 6 Z % 3 Kions(batch(_,table)).saveAsTextFile(# ! U )outputPath)
sc.stop()
}
def batch(keys: Iv J i l  O V A Qterator[String],hbaseTable:String) = {
val lineList = ListBt o { ! ] # auffer[Sg t 3 7tring]()
import sc) F , u G )ala.collection.JavaConversions._
val conf = H6 Y K ` [ 2 8 D (BaseConfiguration.create()
//每批数据创建一个Hbase连接,多条数据操作共享这个连接
va- A . c _l connection = HConnectionManager.createConnection(conf)
//获取表
valJ D ? M F ^ A table = connection.C J Q `getTable(hbaseTable)
keys.foreach(rowKey=>{
try {
//根据rowKey主键也就是课程ID查询数据b 8 K T i F Z b 4
val get = new Get(rowKey.getBytes())
//指定需要获取的列F 6 7 *蔟和列
get.addColumn("k5 * A `cname".getBytes(), "name".getBytes())
get.addColumn("saleinfoK t V [ M".getBytes(), "price".getBytes())
get.addColumn("saleinfo".getBytes(), "issale".getBytes())
val result = ta( e A . # Gble.get(get)
var nameRS= result.getValue("kH r . ccname".getBytes(} ? f | y # O)W @ z  ),"name".getBytes())
var kcName =7 R X b "";
if(nameRS != null&&nameRS.length > 0){
kcName = new String(nameRS);
}
val priceRS = result.getValue("saleinfo".getBytes, "price".getBytes)Q L g { } o & 3
var price = ""
if (priceRSa 6 N Y b D L n ^ != null && priceRS.l% : ength > 0)
price = new String(priceRS)
val issaleRS = result.getValue("saleinfo".getBytes, "issale".getBytes)
var issale = ""
if (issaleRS != null &a$ { ~ R ∓& issaleRS.length > 0)
issale = new String(iss( $ UaleRS)
lineList += rowKey+"\001"+ kcName + "\001"+ price + "\001"+issale
} catch {
case e: Exception => e.printStackTrace()
}
})
//每批数据操作完毕,别忘了关闭表和数据库连接l 3 H
table.close()
connection.cl[ $ = P z :ose()
lineList.toIterator
}
}

代码开发完成后,我们看看怎么部署到Spark集群上去运行,运行的方式和我们的T : y 5 KSp( $ Y 7 & V ]ark集群怎么部署的有关,Spark集群部署有三种方式:单独Standalone集群部署,Spark on yarn部署,local本地模式三种灵活部署方式,前两种都是分布式部署,后面的是单机方式。一C A ^ O般大数据部门都有Had4 . y k T y ;oop集群,所以推荐Spark o` 2 j O M [n Yarn部署,这样更方便服务器资7 x J 8 ~ 2源的统一管理和分配。

Spark on Yarn部署非常简单,主要是把Spark的包解压就可以用了,每台服务器上放一份,并且放在相同的目录下。步骤S j ^ s如下:

1)配置scala环境变量

#解压Scala的包,然后vim /etc/profile

export SCALA_HOME=/home/hadM ` N v D & / . ioop/software/scala-2.11.8

2)解压tar xvzf spark--bin-hadoop.tgz,L Y i c E )每台hadoop服务器L 6 q z h上放在同一个目录下

不用任何配置值即可,用spark-submit提交就行。

Spark环境部署好之后,把我们的操作Hbase项目编译打包,一个是项目本身的jar,另一个是项目依赖的jar集合,分别上传到任意一台服务器] w T T $ w T g 3就行,不要每台服务器都传,在哪台服务器运行就在哪台服务器上上传就行,依赖的jar包放在这个目录/home/hado8 _ zop/chongdianleme/chongdiM K 8 anleme-spark-task-1.0.0/lib/下,项目本身的jar包放在这里目录下/home/h4 { w r 2 w P sa. q E x 9 V Edoop/chongdianleme/,然后通过spa: u h 0 9 urk-submit提交如下脚本即可:

hE 1 Q , sadoop fs -rmr /ods/kc/dim/ods_kc_dimz g *_hbase/;

/hn : ` $ & 1 ,ome/hadoop/software/spark21/bin/sparkP B 0 * - ^-submit --jars $(echo /home/hadoop/chongdY j X yianleme/chongdianleme-spark-task-1.0.0/lib/*.v U W } t z 9 ] ,jar | tr ' ' ',') --master yarn --queue hadoop --num-ex z tecutors 1 --driver-memory 1g --executor-memory 1g --w W + u P u v Lexecut2 - . +or-cores 1 --class com.chongdianleme.mail.u j LHbaseJob /home/hadoop/chongdianleme/hbase-task.jar --inputPa: s l lth /mid/kc/dim/mid_kc_dim_kcidlist/ --outputPath /ods/kc/dim/ods_kc_dim_] y Fhbase/ --table chongdianleme_kc --minPartitions 6 --mode y1 1 xarn

其中hk b X jadoop fsO ! { R -rmr /ods/kc/dim/ods_kc_dim_hbase/;是为了下次执行这个任务避免输出目录已经存在,我们提前先把输出先删掉,U 3 D执行完之后输出目录会重新生成。

脚本参数说明:

--jars 是你程序依赖的所有jar存放的目录

--master 是指定在哪里跑,在Hadoop的Yarn上跑写Yarn,本地方式2 M 9 h M W写Local。

-- queue 如果是Yarn方式,指定分配到哪个队列的资源上。

-P L A 4 z t- num-executors 指定跑几个Task。

--driver.maxResultSize drive] F 9 ! i u qr的最大内存设置,默r q u e j认1G比较小。超过了会OOM,可以根据情况设置大一j g o些。

-- executor-memory 每个Task分配内存。

-- executor-coresA i ^ 6 V f d 2 x每个Task分配几个虚拟CPU。

-- class 你的程序的入口类,后面跟jar包,在后面是Java或Scala的main函数的业务参数。

这就是我们从编程,编译打包、部署到服务器如何分布式运行的完整过程,后面章节讲的Spark分布式机器学习也是这么来打包和部署的。

总结s w }

除了Spark项目案例实2 D - k v I 7战和分布式部署,其它深度学习框架也有不错的开源实现,比如MXNet,后面请大家关注充电了么app,课程,微信群,更多内容请看新书《分布式机器学习实战(人工智能科学与技术丛书)》
其它深度学习框架也有不错的开源实现,比如MXNet,后面请大家关注充电了么app,课程,微信群,更多内容请看新书《分布式机器学习实战(人工智能科学与技术N I * z Y ]丛书)》

【新书介绍】
《分布式机器学习实战》(人工智能科学与技术丛书)【陈敬雷编著】【清华大学出版社】
新书特色:深入浅出,逐步讲解分布式机器学习的框架及应用配套个性化推荐算法系统、人脸识别、对话机器人等实战项目

【新书介绍视频】
分布式机器学习实战(人工智= = A l N U )能科学与技术丛书)新书【陈敬雷】

视频特色:重点对新书进行介绍,最新前沿技术热点剖析,技术职业规划建议!听完此课你对人工智能领域u Y 0 V [ # 5将有一个崭新的技术视野!职业发展也将有更加清晰的认识!

【精品课程】
《分布式机器学习实战》大数据人工智能A@ @ 1 A a rI专家级精品课程

【免费体验视频】:

人工智能百万年薪成长路线/从Python到最新热点技术

从Python编程零基础小白入门到人? o 6 %工智能高级实战系列课

视频特色: 本系列专家级精品课有对应的配套书籍《分布式机器学习实战》,精品{ @ J l课和书籍可以互补式学习,彼此相互^ @ y补充,g % i 6 b M L ( `大大提高了学习效率。本系列课和书籍是以分布式机器学习为主线,并对其依赖的大数据技术做了详细介绍,之后对目前主流的分布式机器学习框架和算法进行重点讲解,本系列课和书籍侧重实战,最后讲几个+ ! 7 4 I工业级的系统实战项目给大家。 课程核心内容有互联网公司大数据和人工智能那些事、大数据算法系统架构、大数据基础、Python编程、Java编程、Scala- # y a , Y编程、Dod 0 Y b { } O w gcker容器、Mahout分布式机器学习平台、Spark分布式机器学习平台、分布式深度学习框架和神经网络算法、自然语言处理算法、工业级完整系统实战(推荐算法系统实] p r s v K战、人脸识别实战、对. q ) } a 7话机器人实战)、就业/面试技巧/职业生涯规划/职业晋升指导等内容。

【充电了么公司介绍】

充电了么App是专注上班族职业培训充电学习的在线教育平台。

专注工作职业技能提升和学习,提高工作效率,带来经济效益!今天你充X m Y , p D电了么?

充电了么官网
http://www.chongdianleme.com/

充电了么App官网下载地址
https://a.app.qq.com/o/simple.jsp?pkgname=com.charged.app

功能特色如下:

【全行业职位】 - 专注职场上班族职业技能提升

覆盖所有行业和职位,不管你是上班族,高管,还是创业都有你要学习的视频和文章。其中大数据智能AI、区块链、深度学习是互联网一线工业级的实战经验。

除了专业技能学习,还有通用职场技能,比如企业管理、股权激励和设计、职业生涯规划、社交礼仪、沟通技巧、演讲技巧、开会技巧、发邮件技巧、工作压力如何放松、人脉关系等等,全方位提高你的专业水平和整体素质。

【牛人课堂】 - 学习牛人的工作经验E I r i 3 F D + C

1.智能个性化引擎:

海量视M 7 a t 9频课9 o L ^ n n程,覆盖所有行业、所有职位,通过不同行业职位的技能词偏好挖掘分析,智能匹配你目前职位最感兴趣的技能学习课程。

2.听课全网搜索

输入关键词搜索海量视频课程,应有尽有,总有适合你的课程。

3.听课播放详情

视频播放详情,除了播放当0 T Z / u 7 F前视频,更有相关视频课程和文章阅读,S B a Z s 5对某个技? q 2 F Q | ; _知识点强化,让你轻松成为某个领域的资深专家。

【精品阅读】 - 技能文章兴趣阅读

1.个性化阅读引擎:

千万级文章阅读,覆盖所有R = M 4行业、所有职位,通过不同行业职位的技能词偏好挖掘分析,智能匹配你目前职位最感兴趣的技能学习文章。

2.阅读全网搜索

输入关键词搜索海量文章阅读,应有尽有,总有你感兴趣的技K W ` k Y l ^ H F能学习文章。

【机器人老师】 - 个人提升趣味学习

基于搜索引擎和智能深度学习训练,为您打造更懂你的机器人老师,用自然语言和机器人! d { Q F e老师聊天学习,寓教于乐,高效学习,快乐人生。

【精短课程】 - 高效学习知识

海量精短牛人课程,满足你的时间碎片化学? b = $ o y #习,b p P快速提高某个技能知识点