宝贝腿开大点我添添你视频男男,中文字幕熟女人妻av一区二区三区,爱色成人网,大地资源高清播放在线观看在线电影在线观看 ,777米奇影视第四色

集團(tuán)站切換校區(qū)

驗(yàn)證碼已發(fā)送,請查收短信

復(fù)制成功
微信號:togogoi
添加微信好友, 詳細(xì)了解課程
已復(fù)制成功,如果自動(dòng)跳轉(zhuǎn)微信失敗,請前往微信添加好友
打開微信
圖標(biāo)

學(xué)習(xí)文章

當(dāng)前位置:首頁 > >學(xué)習(xí)文章 > >

{大數(shù)據(jù)}Spark Streaming

發(fā)布時(shí)間: 2018-01-17 23:25:42

?

根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機(jī)器學(xué)習(xí))以及Graphx完美融合。


為什么要學(xué)習(xí)Spark Streaming?

1.易用


2.容錯(cuò)?        

3.易整合到Spark體系?


Spark streaming與Storm的對比:


DStream??:

    什么是DStream:

        Discretized Stream是Spark Streaming的基礎(chǔ)抽象,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實(shí)現(xiàn)上,DStream是一系列連續(xù)的RDD來表示。每個(gè)RDD含有一段時(shí)間間隔內(nèi)的數(shù)據(jù),如下圖:

        

計(jì)算過程由Spark engine來完成

DStream相關(guān)操作:

DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關(guān)的原語。

Transformations on DStreams:

??????

特殊的Transformations


1.UpdateStateByKey Operation

UpdateStateByKey原語用于記錄歷史記錄,上文中Word Count示例中就用到了該特性。若不用UpdateStateByKey來更新狀態(tài),那么每次數(shù)據(jù)進(jìn)來后分析完成后,結(jié)果輸出后將不在保存


2.Transform Operation

Transform原語允許DStream上執(zhí)行任意的RDD-to-RDD函數(shù)。通過該函數(shù)可以方便的擴(kuò)展Spark API。此外,MLlib(機(jī)器學(xué)習(xí))以及Graphx也是通過本函數(shù)來進(jìn)行結(jié)合的。


3.Window Operations

Window Operations有點(diǎn)類似于Storm中的State,可以設(shè)置窗口的大小和滑動(dòng)窗口的間隔來動(dòng)態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)

Output Operations on DStreamsOutput Operations可以將DStream的數(shù)據(jù)輸出到外部的數(shù)據(jù)庫或文件系統(tǒng),當(dāng)某個(gè)Output Operations原語被調(diào)用時(shí)(與RDD的Action相同),streaming程序才會開始真正的計(jì)算過程。??


實(shí)戰(zhàn)用:

    Spark Streaming實(shí)現(xiàn)實(shí)時(shí)WordCount

        架構(gòu)圖:?

            

1.安裝并啟動(dòng)生成者

首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具

yum install -y nc

啟動(dòng)一個(gè)服務(wù)端并監(jiān)聽9999端口

nc -lk 9999

2.編寫Spark Streaming程序

?

package net.togogo.stream


import org.apache.spark.streaming.StreamingContext

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Seconds


object NetworkWordCount {

 def main(args: Array[String]) {

   //設(shè)置日志級別

   LoggerLevels.setStreamingLogLevels()

   //創(chuàng)建SparkConf并設(shè)置為本地模式運(yùn)行

   val conf = new SparkConf().setAppName("NetworkWordCount")

   //設(shè)置DStream批次時(shí)間間隔為2秒

   val ssc = new StreamingContext(conf, Seconds(2))

   //通過網(wǎng)絡(luò)讀取數(shù)據(jù)

   val lines = ssc.socketTextStream("hdp08", 9999)

   //將讀到的數(shù)據(jù)用空格切成單詞

   val words = lines.flatMap(_.split(" "))

   //將單詞和1組成一個(gè)pair

   val pairs = words.map(word => (word, 1))

   //按單詞進(jìn)行分組求相同單詞出現(xiàn)的次數(shù)

   val wordCounts = pairs.reduceByKey(_ + _)

   //打印結(jié)果到控制臺

   wordCounts.print()

   //開始計(jì)算

   ssc.start()

   //等待停止

   ssc.awaitTermination()

 }

}

/home/hadoop/apps/spark/bin/spark-submit \

--class net.togogo.sparkdemo.stream.NetworkWordCount \

--master spark://hdp08:7077 \

/home/hadoop/sparkdemo.jar


3.啟動(dòng)Spark Streaming程序


4.在Linux端命令行中輸入單詞

[hadoop@hdp08 ~]$ nc -lk 9999

hello hello spark spark

5.在控制臺中查看結(jié)果

??

問題:結(jié)果每次在Linux段輸入的單詞次數(shù)都被正確的統(tǒng)計(jì)出來,但是結(jié)果不能累加!如果需要累加需要使用updateStateByKey(func)來更新狀態(tài),下面給出一個(gè)例子:

package net.togogo.stream


import org.apache.spark.{HashPartitioner, SparkConf}

import org.apache.spark.streaming.{StreamingContext, Seconds}


object NetworkUpdateStateWordCount {

 /**

   * String : 單詞 hello

   * Seq[Int] :單詞在當(dāng)前批次出現(xiàn)的次數(shù)

   * Option[Int] : 歷史結(jié)果

   */

 val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {

   //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))

   iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}

 }


 def main(args: Array[String]) {

   LoggerLevels.setStreamingLogLevels()

   val conf = new SparkConf().setAppName("NetworkUpdateStateWordCount")

   val ssc = new StreamingContext(conf, Seconds(5))

   //做checkpoint 寫入共享存儲中

   ssc.checkpoint("hdfs://hdp08:9000/work/wcout")

   val lines = ssc.socketTextStream("hdp08", 9999)

   //reduceByKey 結(jié)果不累加

   //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

   //updateStateByKey結(jié)果可以累加但是需要傳入一個(gè)自定義的累加函數(shù):updateFunc

   val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

   results.print()

   ssc.start()

   ssc.awaitTermination()

 }

}

/home/hadoop/apps/spark/bin/spark-submit \

--class net.togogo.sparkdemo.stream. NetworkUpdateStateWordCount \

--master spark://hdp08:7077 \

/home/hadoop/sparkdemo.jar



/home/hadoop/apps/spark/bin/spark-submit \

--class net.togogo.stream.NetworkUpdateStateWordCount \

--master spark://hdp08:7077 \

/home/hadoop/schema.jar???

上一篇: {大數(shù)據(jù)}Kafka

下一篇: {大數(shù)據(jù)}Spark SQL

十九年老品牌
微信咨詢:gz_togogo 咨詢電話:18127429208 咨詢網(wǎng)站客服:在線客服

相關(guān)課程推薦

在線咨詢 ×

您好,請問有什么可以幫您?我們將竭誠提供最優(yōu)質(zhì)服務(wù)!