全部課程
發(fā)布時(shí)間: 2018-01-17 23:25:42
?
根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機(jī)器學(xué)習(xí))以及Graphx完美融合。
為什么要學(xué)習(xí)Spark Streaming?
1.易用
2.容錯(cuò)?
3.易整合到Spark體系?
Spark streaming與Storm的對比:
DStream??:
什么是DStream:
Discretized Stream是Spark Streaming的基礎(chǔ)抽象,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實(shí)現(xiàn)上,DStream是一系列連續(xù)的RDD來表示。每個(gè)RDD含有一段時(shí)間間隔內(nèi)的數(shù)據(jù),如下圖:
計(jì)算過程由Spark engine來完成
DStream相關(guān)操作:
DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關(guān)的原語。
Transformations on DStreams:
??????
特殊的Transformations
1.UpdateStateByKey Operation
UpdateStateByKey原語用于記錄歷史記錄,上文中Word Count示例中就用到了該特性。若不用UpdateStateByKey來更新狀態(tài),那么每次數(shù)據(jù)進(jìn)來后分析完成后,結(jié)果輸出后將不在保存
2.Transform Operation
Transform原語允許DStream上執(zhí)行任意的RDD-to-RDD函數(shù)。通過該函數(shù)可以方便的擴(kuò)展Spark API。此外,MLlib(機(jī)器學(xué)習(xí))以及Graphx也是通過本函數(shù)來進(jìn)行結(jié)合的。
3.Window Operations
Window Operations有點(diǎn)類似于Storm中的State,可以設(shè)置窗口的大小和滑動(dòng)窗口的間隔來動(dòng)態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)
Output Operations on DStreamsOutput Operations可以將DStream的數(shù)據(jù)輸出到外部的數(shù)據(jù)庫或文件系統(tǒng),當(dāng)某個(gè)Output Operations原語被調(diào)用時(shí)(與RDD的Action相同),streaming程序才會開始真正的計(jì)算過程。??
實(shí)戰(zhàn)用:
Spark Streaming實(shí)現(xiàn)實(shí)時(shí)WordCount
架構(gòu)圖:?
1.安裝并啟動(dòng)生成者
首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具
yum install -y nc
啟動(dòng)一個(gè)服務(wù)端并監(jiān)聽9999端口
nc -lk 9999
2.編寫Spark Streaming程序
package net.togogo.stream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
object NetworkWordCount {
def main(args: Array[String]) {
//設(shè)置日志級別
LoggerLevels.setStreamingLogLevels()
//創(chuàng)建SparkConf并設(shè)置為本地模式運(yùn)行
val conf = new SparkConf().setAppName("NetworkWordCount")
//設(shè)置DStream批次時(shí)間間隔為2秒
val ssc = new StreamingContext(conf, Seconds(2))
//通過網(wǎng)絡(luò)讀取數(shù)據(jù)
val lines = ssc.socketTextStream("hdp08", 9999)
//將讀到的數(shù)據(jù)用空格切成單詞
val words = lines.flatMap(_.split(" "))
//將單詞和1組成一個(gè)pair
val pairs = words.map(word => (word, 1))
//按單詞進(jìn)行分組求相同單詞出現(xiàn)的次數(shù)
val wordCounts = pairs.reduceByKey(_ + _)
//打印結(jié)果到控制臺
wordCounts.print()
//開始計(jì)算
ssc.start()
//等待停止
ssc.awaitTermination()
}
}
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.sparkdemo.stream.NetworkWordCount \
--master spark://hdp08:7077 \
/home/hadoop/sparkdemo.jar
3.啟動(dòng)Spark Streaming程序
4.在Linux端命令行中輸入單詞
[hadoop@hdp08 ~]$ nc -lk 9999
hello hello spark spark
5.在控制臺中查看結(jié)果
問題:結(jié)果每次在Linux段輸入的單詞次數(shù)都被正確的統(tǒng)計(jì)出來,但是結(jié)果不能累加!如果需要累加需要使用updateStateByKey(func)來更新狀態(tài),下面給出一個(gè)例子:
package net.togogo.stream
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Seconds}
object NetworkUpdateStateWordCount {
/**
* String : 單詞 hello
* Seq[Int] :單詞在當(dāng)前批次出現(xiàn)的次數(shù)
* Option[Int] : 歷史結(jié)果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
}
def main(args: Array[String]) {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("NetworkUpdateStateWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
//做checkpoint 寫入共享存儲中
ssc.checkpoint("hdfs://hdp08:9000/work/wcout")
val lines = ssc.socketTextStream("hdp08", 9999)
//reduceByKey 結(jié)果不累加
//val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
//updateStateByKey結(jié)果可以累加但是需要傳入一個(gè)自定義的累加函數(shù):updateFunc
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.sparkdemo.stream. NetworkUpdateStateWordCount \
--master spark://hdp08:7077 \
/home/hadoop/sparkdemo.jar
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.stream.NetworkUpdateStateWordCount \
--master spark://hdp08:7077 \
/home/hadoop/schema.jar???
上一篇: {大數(shù)據(jù)}Kafka