OSS数据湖实践——EMR + Flink + OSS案例

本文介绍使用Flink大数据分析引擎,基于EMR,利用OSS云存储数据,实现一个分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通# ) I f YE-- Y W ) S VMapReduce服务和OSS服务。
• 已完C U z s : , C )成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件。
• 相关更多配O c { % / E % u i置请参考OSS入门文档。

步骤一:数据上传至oQ | J z Y _ss

hadoop fs -put course2.csv oss://your-bucket-name/

步骤a F T 0二:编写处理代码,及打包


package org.myorg.quickstart
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._{ , = Q ` | H 2
import org.apache.flink.table.api.TableEnvironment
object OSSExample {
def main(args: Array[String]) {
// set up the batch executio` a en enviroj W - Q I D P Fnment
case class Course(Id : Int, SuS 8 T Fbject : String, Level : String)
val env = ExecutionEnvis +  , Z 2 ] 3ronmenv H Ot.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val data: Dam / 5 A x ; * PtaSet[(Long, String, SV h x  String)] = env.readCsvFile("oss://your-bucket-nQ $ = I 4 samev 1 # * ( 6 C/course.csv")
val  course = tableEnv.fromDataSet[(Long, String, Stri2 L a e k a 6 Ong)](data, 'id, a B l @ 2 Y'^ @ p a ?subject, 'level)
val  counts = course.groupBy("subject, levei f 5 g Y v * ml").select(" ] 4 B c a 0subject, level, level.count as cnt")
val  maxcounts = counts.groupD 4 mBy(x # P t m"subject").selecU a * dt("subjed w f c a K oct as subject1, cnt.max as cnt1")
val result = maxcounts.lW i P 6eftOuterJoin(counts, "cnt=cnt1").select("subject, level, cnt")
result.to5 c 0 ^ 8 o H ;DataSet[(String, StrZ ) / L % X Hing, Long)].print()
}
}

IDEA Build -> Buil/ ` 7 H b 4 F ud Artifact ->Build 打包为OS6 Z x ~ o n PSFlinkExample jar包

步骤三:上传jar包到Hadoop 或者OSS

把jar 上传到集群header节点,然后使用以下命令

hadoop fs -put OSSExample.jarh _ / oss://your-bucket-name/

步骤/ 8 I . ( 3 U四:创建FLink作业job,运行作业

OSS数据湖实践——EMR + Flink + OSS案例

run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm[ S U i # a F X Y flink-oss-sample -c org.myorg.quN 2 f C { ] Cickstart.OSSExample  ossref://your-bucket-name/OSSFlinkExample.jar

步骤五:查看作业运行是否成功及查看运行结果

OSS数据湖实践——EMR + Flink + OSS案例
OSS数据湖实践——EMR + Flink + OSS案例

总结

通过以上步骤,可以了解spark 处理OSS数据源的整个过F B 6 ! A程,这将对后续其他任务作业开发带来初步的参考。