全部課程
發(fā)布時(shí)間: 2018-01-10 11:39:28
?RDD的屬性:
1)一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
2)一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
3)RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。
4)一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
5)一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
創(chuàng)建RDD:?
1)由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2)由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://hdp01:9000/words.txt")
RDD編程API
TransformationRDD中的所有轉(zhuǎn)換都是延遲加載的,也就是說(shuō),它們并不會(huì)直接計(jì)算結(jié)果。相反的,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。
常用的Transformation:
?
Action:?
WordCount中的RDD:
RDD的依賴關(guān)系:
RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
???
窄依賴:
窄依賴指的是每一個(gè)父RDD的Partition最多被子RDD的一個(gè)Partition使用
總結(jié):窄依賴我們形象的比喻為獨(dú)生子女
寬依賴:
寬依賴指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)父RDD的Partition
總結(jié):窄依賴我們形象的比喻為超生
Lineage:
RDD只支持粗粒度轉(zhuǎn)換,即在大量記錄上執(zhí)行的單個(gè)操作。將創(chuàng)建RDD的一系列Lineage(即血統(tǒng))記錄下來(lái),以便恢復(fù)丟失的分區(qū)。RDD的Lineage會(huì)記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當(dāng)該RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),它可以根據(jù)這些信息來(lái)重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。
RDD的緩存:
Spark速度非??斓脑蛑唬褪窃诓煌僮髦锌梢栽趦?nèi)存中持久化或緩存?zhèn)€數(shù)據(jù)集。當(dāng)持久化某個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將把計(jì)算的分片結(jié)果保存在內(nèi)存中,并在對(duì)此RDD或衍生出的RDD進(jìn)行的其他動(dòng)作中重用。這使得后續(xù)的動(dòng)作變得更加迅速。RDD相關(guān)的持久化和緩存,是Spark最重要的特征之一??梢哉f(shuō),緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵。
RDD緩存方式:
RDD通過(guò)persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。?
通過(guò)查看源碼發(fā)現(xiàn)cache最終也是調(diào)用了persist方法,默認(rèn)的存儲(chǔ)級(jí)別都是僅在內(nèi)存存儲(chǔ)一份,Spark的存儲(chǔ)級(jí)別還有好多種,存儲(chǔ)級(jí)別在object StorageLevel中定義的。?
緩存有可能丟失,或者存儲(chǔ)存儲(chǔ)于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除,RDD的緩存容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算的正確執(zhí)行。通過(guò)基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算,由于RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。
DAG的生成:
DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算。對(duì)于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來(lái)的計(jì)算,因此寬依賴是劃分Stage的依據(jù)。??