Flink实战,实时流量统计 TOPN访问URL荐

跟 https://blog.51cto.com/mapengfei/2580330 类似场景,来从Nginx、Apache等web服务器的日志中读取数据,实时统计出来访问热度最高的TOPN访问URL,并且要确保数据乱序的处理,lag等情况下,还要确认数据的准确性

目标:

    从log文件中读取数据(也可以参考上一篇从kakfa中),取http 的method为get的请求,并且把静态文件访问过滤掉,进行实时统计
实现:
1、读取文件
2、N w w做过滤,method=get  url不为静态信息
3、生成一个滑动窗口,大小10分钟,每次滑 ~ U动5s,watermask 5s(9 u 0为了保险允许数据延迟,allowedLy $ g - / jatenesJ / * hs 1分钟)
4、进行聚合统计分析排序输出

准备日志文件:

在resource目录下生成一个nginx.log里面内容:
1.1.1.1 - - 23/03/2020:05:06:03 GET /mapengfei/2580330
1.1.1.1 - - 23/03/2020:05:06:05 GET /mapengfei/2572888
1.1.q p g1.3 - - 24/03/2020:05:06:05 GED ] p : F A f fT /mapengfei/2572888

代码:

新建一个HotU] * 5 w LrlAnalysis.scal的object文件

/*
*
* @author mafei
* @dated _ @ Q @ Y 2021/1/3
*/
package com.mafei.hotUrlAnalysis
import org.apache.flink.streaming.api.Tip 9 _ G g $meCharacteristic
import o9 Z H # = y L krg.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing w P dg.time.J n 7 k x D E _ 6Time
import org.aJ Y z p r - o Jpache.^ ( l L F t 5 $flink.api.common.functi[ $ P v 8 jons.AggregateFunctir 1 O Fon
import org.apache.flink.api.common.state.{ListState,I [ N X ListStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import orl 4 ` M i G Y U ug.apache.flink.streaming.api.scala._
import org.apach e f ~e.flink.strep 0 5 s Eaming.api.scala.function.WindowFunction
impoD c Q 5 @ 1rt o0 9 M $ 5 { Drg.apache. s ? : yflink.streaming.api.windowing.windows.v r 6TimeWindow
im} F Iport org.apache.flink.util.Collector
impoF U r p q 7rt java.sql.Timestamp
import java.text.SimpleDateFormat
import scala.collection.mutable.List_ $  VBuffer
// 定义3 H p _ m _ q i要提取的数据格式
case class NginxLog(clientIp: String, userId: String,ts:Long,method:String,url:Stri@ 5 / K i J ang)
// 定义窗口聚合结果样例类
case class UrlViewCount(url: StrinM x Y M z a W Wg, windowEnd: Long, count: Long)
object HotUrlAnalysis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnviron0 # : , w V m iment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义取事件时间
env) C J T.setParallelism(1) //防止乱序
//1、从文件中读取数据
val inputStream = env.readTextFile("/opt/java2B L L i _020_stud- ` _ C Y O (y/UserBehaviorAnalysis/HotUrlAnalysis/src` E e G/main/res} , : [ 6ources/nginx.log")
val dataStream = inputStream
.8 ) a t X ^ emap(data=>{
val sy [ l ( {plitResult = data.split(" ") //因为日志格式是以空格分隔的
//因为日志中格式是字符串的,我们需要的是13位毫秒的时间戳,所以需要转m Z N 2 ] S 6 [ P换下
val dateFormatT h 3Convert = new SimpleDateFormat("dd/MM/yyyx U / my:HH:mm:ss") // 格式: 天/月/年:时:分:秒
va: # = El te L c _ ? 6 q F Zs = dateFormatConvert- q T r M .parse(splitResult(3)).getTime
NginxLog(splitResult(0), splitResult(1),ts,sv v H j IplitResult(4), splitResu@ 7 3lt(5))
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog](Time.seconds(5)) {  //这里设置watermark延迟5秒
override def extra; 9 E q 9ctTimestamp(t: NginxLog): Long = t.ts   //指定时间事件的时间戳
})
//开窗聚合,排序输出
var aggStream = dat1 x Z ) ^ 6 J 3aStream
.filter(_.method == "GET")//过滤下,只要method是get请求的数据
.filter(dJ a % { 4 & Q h gata=>{
val pattern= "^((?!\\.(css|js)$).)*$j # o . : J".r
(p[ 7 Oattern findFirst{ T ,In data.u] L X 0  j i Q orl).nonEmpty
}) //再过滤下,像css/js之类的url不算
//      .a e c T ` ! m e nkeyBy("url") //这样子写返n { @回的是个元组类型
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.minutes(1)) //可以watermark时间设置小一点,到时间先输出,但是窗口先不关,等到allowedLateness的时间了再关
.sideOutputLateDatR v 4 0 xa(new OutputTag[Ng? % h g ? b P q |inxLog]("late")) /B g Z H/加一个侧输出流,为@ 0 J了防止数据的乱序超过了1分钟
.aggregate(new PageCountAgg(),new PageViewCountResult()). z 6
val red 0 % X r 9 KsultStream = aggStream
.keyBy(_.windowEnd) //按照结束时间进行分组,收集当前窗口内的,取一定时间内的数据
.process(new TopUrl(10)% / v L P ^ Z)
resultStream.print()
aggStream.gw 8 p M } h +etSideOutpuS x p ! `t(new OutputTag[NginxLog]("late")).print("这是1分钟之后的延迟数据。。。。")
env./ 1 . mexecute("执行热. J = M G z门url访问统计")
}
}
class PageCountAgg() extends AggregateFunction[NginxLog,Long,LoS # Nng]{
override def createAccumulator(): Long = 0L
override def ay h z Odd(in: NginxLog, acc: Long): Long =M g 2 q / acc +1
override deS = 1 } g nf getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc+acc1
}
class PageVJ : `iewCountResult() extends WindowFunction[Long* h , Z U U D 1,5 = x [ = A 4 7UrlViewCount,String,To S 0 x J U jimeWindow]{
override def apply(key: String, w^ p ~iz ( 2 g ^ndow: TimeWindo A Xow, input: Iterable[Long], out: Collector[UrlVie# R M l T u { ( XwCount]): Unit = {
out.collect(UrlViewCount(key,wiC g e } Qndow.getEnd,input.iterator.next()))
}
}
/**
* 输入参数
* K: 排序的字段类型,这里是WindowEnd时间戳,所以是Long类型
* I: 输入的数据,是上一步PageV 0 viewCountResult输出的类型,{ z 0 7 s b B V (所以是UrlViewCount
* O: 输出的类型,这个自己定义,看实际情况,这里直接打印了,所以String
*/
class TopUrl(topN:Int) extends KeyedProcessF:  # ~ kunctV O T C rion[Long,UrlViewCount,String]{
lazy val urlViewCountListState: ListState[UrlVZ n iewCoX ] [ ] Cunt] = getRuntimeContext.getLiC y Y 7 % 2 ~stState(new Li3 D MstStateDescriptor[Urlh / U  O 3 . N ~ViewCount]("urlViewCountList", classOf[UrlViewCoux ? / xnt]))
overM ^ z S n - }ride def processElemS p i Y Z O | 1 Yent(i:a K h J UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unir f K 5 z 5tB U K 4 t = {
urlViewCountListState.add(i)  //把每次的9 @ K M Z B结果都加到自定义的list里头,方便后边做排序
context.timerService().registerEventTimeTP d Kimer(g X F V s ; u 9i.windowEnd) //注册一个定时器,在窗口关闭的时候触发
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
//Q n 9 A ~ m为了方便排序,定义另一个ListBuffer,保存ListState的所有数据
val allPageListBuffer: ListBuffer[UrlViewCount] = ListBuffer()
val iter = url7 x ! ~ViewCountLi( - 4 G % B S `sC 5 S DtState.get().iterator()
while (iter.hasNext){
allPageListBuffer += iter.next()
}
//清空ListState的数据,已经放到urlViewCountListState 准备计算了,等下次触发就应该是新的了
urlViewCountListState.clear()
// 先按照count,从大到小排序,然后再取前N个
val sortItemViewCounts = allPageListBuffer.sortBy(_.cou? ? h Z A # : ,nt)(Ordering.Long.reverse).takE ) ]e(n h h &topNK * P r s  I * ;)
//格式化输出数据:
val result : StriP J @ R U /ngBuilder = new StringBuilde# c & &r
result.append("当前窗口p 4 L的结束时; ; _ -间:\t").append(new Timestamp(timestamp)).append("v : Z\n")
//遍历结果列表中的每个It` ?  % r y c 6emViewCount , 输出到一行
for(  pi <- sortItemViewCounts.indices){
val currentItemViewCount = sortItemViewCounts(i)
result.append("Top").a$ - Y o = V Dppend(i+1).append("\t")
.append("URL = ").app) L + kend(currentItemViewCount.url).append("\t")
.append("& f I访问量: ").append(currentItemViewCount.count).append("+ 6 x : on")
}
result.append("------------------{ 2 ! O h =---------------------\n\n\n")
Thread.sleep(1j  - J ( Z 4000)
out.collecr F { yt(result.toString())
}
}

代码结构及输出效果:

Flink实战,实时流量统计 TOPN访问URL荐

问题点

在数据乱序的情况下,虽然能全部输出,但有2个问题点上面的代码,
一个是在TopUrl 中保存数据用的是lE [ Bist,在滑动窗口p Z B ? v先到达,延迟数据过会儿到达的时候,数据会重复输出,也就是url会出现2次
第二个问题是在Y ) V s 1 S e J第二次延迟输出的时候,本来应该加上之前的数据,但是没有,而是重新从0开始计算
最终效果:

URL 出现次数? t n 9 7 * 出现原因
/a 3 在5秒内统计数据输出的
/a 1 al: v ( & U & ]lowedY g _ yLateness延迟数据达到产生的

解决办法:

从list改为map,并且因为之前每次都会清空list,可以改为等真正的窗口结束后再清空就可以了

主要改动的地方:
processElement 和onTimer这2个方法

/*
*
* @authU V R Por mafei
* @date 2021/1/3
*/
package coS O S K x ( ] ~m.mafei.hotUrlAnalysis
import org.apache.flink.streaming.api.TimeCharactJ _ ; 5 8 ? keristic
import org.apache.flink.streaming.api.functions.timestam= f ( c Hps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.wiy b x e ~ S _ sndowing.time.Time
import org.apache.flink.api.common.functions.Aggregk I ? m z P IateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapS. . Qtate, MapStateDesc7 c y V 4 j briptor}
import org.apa+ 7 u F &che.fP z i p a V P /link.streaming.api.functions.Keye& S H S { GdProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.stre^ r R 9aming.api.scala.function.Wd Q 7 CindowFunction
iK 9 3 M Import org.[ R 5 Dapache.flink.a x x - W j $ 9streaming.api.windowing.windows.Tim( q % ( v #eWindow
import or[ ~ m t [ ?g.apache.flink.ut1 r   @ | X 0il.Collector
import ja) i 9 _ d ( . Ova.sql.Timestamp
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuf.  b & _ s 5fer
/*| z | q S*
* import org.apache.flink.api.common.funK k mctions.AggregateFunction
import org.apache.flink.api.ce ? T b Aommon.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.7 N K { o - 5{Tuple, Tuple1}
im~  dport org.apache.] F ; M F 3   %flink.configuration.Configuration
import org.apa, I t 2 b L qche.flink.streaming.2 m 3 h kapi.TimeCharacteristic
imporL 8 C M rt org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streO m = f M c Z (aming.apij T _ 8 C.scala.function.WindowFunction
import org.apache.flink.streamin; T 8 * .g.api.windowing.time.Time
i0 V omport org.apache.flink.streamiI 3 & 7 2 Png.apG M l ( z F Z Z xi.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
imf n l ! L fport java.sql.Timestamp
impo; ( : L prt java.util.Properties
import scala.collection.mutable.ListBuffer
* @param clientIp
* @param userId
* @param ts
* @param meth+ t P + e D jod
* @param url
*/
// 定义要提取L S # n V E的数据格式
case class NginxLog2(clientIp: String, userId: String,tsD o Z v : 6 s O #:Long,method:String,url:String)
// 定义窗口C 6 M M R B 9聚合结果4 K Y c v ,  , .样例类
case class UrlViewCount2(url: Stri! ] c x 0ng, windowEnd: Long, count: Long)
object HoS - y ptUrlAnalysis2 {
def main(args: Array[StriW b 1 U / l w r vng]): Unit = {
val0 | W ] r G ) P env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义取事件时间
env.setParal#  o J ]lelism(1) //防止乱序
//1、从文件中读取数据
val inputStream =L S @ O l I $ ; env.readTextFileZ a . 6 m("/opt/java2020_study/UseG o U = X ) } o LrBehav0 o } K 3 IiorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log")
val dataStream = inputStream
.map(data=>{
val splitResult = data.split(" ") //因为日志格式是以空格分隔的
//因为日志中格式是字符串的,我们需要的是13位毫秒的时间戳,所以需要转换下
val dateFormatConverh  . 3t = new S{ d o 1 0  DimpleDate} / 1 ( L f 2 IFormat("dd/MM/yyyy:HI [ _ _ *H:mm:6 c / ;ss") // 格式:w Y ~ } Y 7 ? 4 天/月/年:时:分:秒
val ts = dateForma# { / m - F YtConvert.parse(splitResult(3)).getTimV g K P _ O ; Oe
NginxLog2(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5))
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLo$ 4 7 xg2](Time.secondsw H - 0 ` % F(5)) {  //这里设置] 0 g U [ + W 4watermark延迟5秒
override def extractTimestamp(t: NginxLog2): Lo^  d H ng = t.ts* ] p   //指定时间事件的时间戳
})
//开窗聚合,排序输出
var aggStream = dataStream
.filter(_.method == "GET6 : / G Z f")//过滤下,只要me% T # } ~ Nthod是gety q V s h j Q^ E 4 i求的数据
.filter(data=>{
val pattern= "^((?!\\.(css|js)$).)*$z ` B a".r
(pattern findFirstIn data.url).nonEmpty
}) //再过滤下,像css/js之类的url不算
/s K &/      .keyBP ( j z Qy("url") //这样子写返回的是个元组类型
.kew i ) { 5 k &yBy(_.url)
.timeWindow(Time.minutes(10), Timei ? D $ w B.seconds(5))
.allowedLateness(Time.minutes(1)) //可以watermark时间设置小一点,到时间先输出,但是窗口先不关,等到allowedLateness的时间了再关
.sideOutputLateData(new OutputTag[NginxLog2]("late")) //加一个侧输出流,为了防止数据的乱序超过了1分钟
.aggregate(new PageCountAgg2(),new PageViewCountResult2())
v0 , C L ] ` b t Lal resultStream = aggStream
.keyBy(_.windowEnd) //按照结束时间进行分组,收集当前窗口内的,取一定时间内的数据
.] C Pprocess(new TopU u XUrl2(10))6 ) + # x k
resultStream.print()
aggStream.getSideOutput(new OutputTag[Nx N . {ginxLog2]("late")).print("这是1分钟之后的延迟数据。。。。")
env.exec{ A ? E k B bute("执行热门url访问统计")
}
}
class PageCountAgg2() extends AggregateFunction[NginxLog2,Long,Long]{
override def createAccumulator(): Long = 0L
overridN  i / ~ 9 S be def add(in: NginxLog2, acc: Long): Long = acc +1
overri|  a Sde def getRe8 O a F z 7 z K ssult(acc: Lo{ k | z R . kng): Lot = F 6ng = acc
override def merge(acc: Long, acc1: Long): Long = acc+acc1
}
class PageViewCountResult2() extends WindowFunction[Long,UrlViewz  9 t ! cCouI Z E a 0 J h Jnt2,St{ t N m ~ ( +ring,TimeWindow]{
override def apply(key8 k  s a O: String, window: TimeWindow, input: Iterable[Long], outL / E * ; D _ | &: CollA 0 t Hector[7 m ; d EUrlViewCount2]): Unit = {
out.collect(UrlViewCouN G & F 8 Knt2(key,window.getEns g y N 3 . 1d,input.itera] n W & t . ; u )tor.next()))
}
}
/**
* 输入参数
* K: 排序的字段类s P ! ] @型,这里是WindowEnd时间戳,所以是Long类型
* I: 输入的数据,是上一步PageViewCountResult2输出的k ! A ,类型,所以是UrlViewCount2
* O: 输出的类型,这个自己定义,看实际情况,这里直接打印了,所以String
*/
class TopUrl2(topN:Int) extenG l rds KeyedPb 9 p c e 7rocessFunction[Long,UrlViewCv 2 $ G o Q q O Bount2,String]{
lazy val UrlViewCount2MapState: Map$ D } =State[String,Long] = getRuntimeContext.getMapState(new MapStateDesc U 2 U B Hcripto{ N = ^ V v  7 !r[String,Long]("UrlViewCount2Map",classOf[String],classOf[Long]))
override def processEl2 } 5 c _ement(i: UrlViewCount2, context: KeyedProcessFunction[Long, UrlViewCount2, String]#Context, collector: Collector[String]): Unit = {
UrlViewCount2MapState.put(i.url,i.count)
context.timerService().registerEventTimeTimer(i.windowEnd) //注册一个定时器,在窗口关闭的时候触发
//再另外注册一个定时器,1分钟之后触Z 2 i 9 ! D c b z发,这时窗口已经彻底关闭,不再有聚合结果输出,可以清空状态
contextz p T m C 9 |.timerService().registerEventTiR Q 3 %meTimQ S f o O X e w _er(i.windowEnd + 60000L)
}
override def onTimer(timestamp:Q , a l A - - = H Longh $ 3 m [ 1 E 0, ctx: KeyedProcessFunction2 o D f b ,[Long, UrlViewCount2, String]#OnI 5 ( ( f |TimerContext, out: Collecj M C Ftor[String]): Unit = {
/**
* 使用mapSt - ( 5 N ,aG F M Q Ate的方式
*/
//判断定时器触发时间,如果已经是窗口结束时间H 9 m _ g C1分钟之后,那么直接清空状态
if() n H ^ r 6 X  wtimestampv } r ~ o o o & v == ctx.getCU 4 O p 2 3 L ;urrentKey+60000L){
U: v CrlViewCount2M W e = L tapState.clear()
return
}0 d H - f
val allUrlViewCount2sBuffer: ListBuffer[(String,Long)] = ListBuffer()
val iter = UrlViewCoun} $ w B ? X s yt2MapState.entries().iterator()
while (iteD % P N = 7r.hasNext){
val entry = iter.next()
allUrlViewCount2sBuffer += ((entry.getKey, entry.getValue))
}
// 先按照count,从大到小排序,然后再取前N个
val sortItemViewCounts = allUrlViewCountQ f V b B & Y .2sBuffer.sortBy(_._2)(OrderinF r S ; 2 /g.Long.reverse).take(topN)
//格式化输出/ 5 r G数据:
val result : StringBuilder = new StringBu2 ( i ? w Z Z [ilder
r! w a E r . 2 ) Oesult.append("当前窗口的结束时间:\t")._ 3 I c h s zappend(new Timestamp(tim) % hestamp)).app4 9 L G @ ,end("\n")
//遍历结果列表中3 K % } :的每个ItemViewCount , 输出到一行
for(i <Q N ? D ? / L =;- sortItemViewCounts.indices){
v@ ( B 4 V  N Zal currentItemViewCount = sortItemViewCounts(i)
result.append("Top").append(ih D X 9 l | g+1).append("\t")
.append("O Q ` AURL = ").append(currentItemViewCount._1).append("\t"G A = g G ^ *)M R M x W 5
.append("访问量: ").append(8 ! w F ` - wcurrentItemViewCou# v D 9 {nt._2).append("\n")
}
result.append(S q G u 8 Y ~ $ :"------------- J R S O 4 W k x--------------------------\n\n\n")
Thread.sleep(1000)
out.collect(result.toString())
}
}