全部課程
發(fā)布時(shí)間: 2018-01-09 00:33:38
目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項(xiàng)目,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架。Spark基于內(nèi)存計(jì)算,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部署在大量廉價(jià)硬件之上,形成集群。Spark得到了眾多大數(shù)據(jù)公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優(yōu)酷土豆。當(dāng)前百度的Spark已應(yīng)用于鳳巢、大搜索、直達(dá)號(hào)、百度大數(shù)據(jù)等業(yè)務(wù);阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模,是當(dāng)前已知的世界上較大的Spark集群。?
Spark Core:
包含Spark的基本功能,包含任務(wù)調(diào)度,內(nèi)存管理,容錯(cuò)機(jī)制等
內(nèi)部定義了RDDs(彈性分布式數(shù)據(jù)集)
提供了很多APIs來(lái)創(chuàng)建和操作這些RDDs
應(yīng)用場(chǎng)景,為其他組件提供底層的服務(wù)
Spark SQL:
是Spark處理結(jié)構(gòu)化數(shù)據(jù)的庫(kù),就像Hive SQL,Mysql一樣
應(yīng)用場(chǎng)景,企業(yè)中用來(lái)做報(bào)表統(tǒng)計(jì)
Spark Streaming:
是實(shí)時(shí)數(shù)據(jù)流處理組件,類似Storm
Spark Streaming提供了API來(lái)操作實(shí)時(shí)流數(shù)據(jù)
應(yīng)用場(chǎng)景,企業(yè)中用來(lái)從Kafka接收數(shù)據(jù)做實(shí)時(shí)統(tǒng)計(jì)
MLlib:
一個(gè)包含通用機(jī)器學(xué)習(xí)功能的包,Machine learning lib
包含分類,聚類,回歸等,還包括模型評(píng)估和數(shù)據(jù)導(dǎo)入。
MLlib提供的上面這些方法,都支持集群上的橫向擴(kuò)展。
應(yīng)用場(chǎng)景,機(jī)器學(xué)習(xí)。
Graphx:
是處理圖的庫(kù)(例如,社交網(wǎng)絡(luò)圖),并進(jìn)行圖的并行計(jì)算。
像Spark Streaming,Spark SQL一樣,它也繼承了RDD API。
它提供了各種圖的操作,和常用的圖算法,例如PangeRank算法。
應(yīng)用場(chǎng)景,圖計(jì)算。
Cluster Managers:
就是集群管理,Spark自帶一個(gè)集群管理是單獨(dú)調(diào)度器。
常見(jiàn)集群管理包括Hadoop YARN,Apache Mesos
基于MapReduce的計(jì)算引擎通常會(huì)將中間結(jié)果輸出到磁盤上,進(jìn)行存儲(chǔ)和容錯(cuò)。
出于任務(wù)管道承接的,考慮,當(dāng)一些查詢翻譯到MapReduce任務(wù)時(shí),往往會(huì)產(chǎn)生多個(gè)Stage,而這些串聯(lián)的Stage又依賴于底層文件系統(tǒng)(如HDFS)來(lái)存儲(chǔ)每一個(gè)Stage的輸出結(jié)果HadoopSpark Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態(tài)系統(tǒng),以彌補(bǔ)MapReduce的不足。?
Spark特點(diǎn):
快:與Hadoop的MapReduce相比,Spark基于內(nèi)存的運(yùn)算要快100倍以上,基于硬盤的運(yùn)算也要快10倍以上。Spark實(shí)現(xiàn)了高效的DAG執(zhí)行引擎,可以通過(guò)基于內(nèi)存來(lái)高效處理數(shù)據(jù)流 ??
易用:
Spark支持Java、Python和Scala的API,還支持超過(guò)80種高級(jí)算法,使用戶可以快速構(gòu)建不同的應(yīng)用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來(lái)驗(yàn)證解決問(wèn)題的方法。?
通用:
Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理、交互式查詢(Spark SQL)、實(shí)時(shí)流處理(Spark Streaming)、機(jī)器學(xué)習(xí)(Spark MLlib)和圖計(jì)算(GraphX)。這些不同類型的處理都可以在同一個(gè)應(yīng)用中無(wú)縫使用。Spark統(tǒng)一的解決方案非常具有吸引力,畢竟任何公司都想用統(tǒng)一的平臺(tái)去處理遇到的問(wèn)題,減少開(kāi)發(fā)和維護(hù)的人力成本和部署平臺(tái)的物力成本。?
兼容性:
Spark可以非常方便地與其他的開(kāi)源產(chǎn)品進(jìn)行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調(diào)度器,并且可以處理所有Hadoop支持的數(shù)據(jù),包括HDFS、HBase和Cassandra等。這對(duì)于已經(jīng)部署Hadoop集群的用戶特別重要,因?yàn)椴恍枰鋈魏螖?shù)據(jù)遷移就可以使用Spark的強(qiáng)大處理能力。Spark也可以不依賴于第三方的資源管理和調(diào)度器,它實(shí)現(xiàn)了Standalone作為其內(nèi)置的資源管理和調(diào)度框架,這樣進(jìn)一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。?
Spark集群安裝:
機(jī)器部署
準(zhǔn)備兩臺(tái)以上Linux服務(wù)器,安裝好JDK1.8
下載Spark安裝包
http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
上傳解壓安裝包
上傳spark-2.2.0-bin-hadoop2.7.tgz安裝包到Linux上
解壓安裝包到指定位置
[hadoop@hdp01 ~]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C apps
[hadoop@hdp01 ~]$ cd apps
[hadoop@hdp01 apps]$ mv spark-2.2.0-bin-hadoop2.7 spark
配置Spark:
進(jìn)入到Spark安裝目錄
[hadoop@hdp01 apps]$ cd spark/
進(jìn)入conf目錄并重命名并修改spark-env.sh.template文件
[hadoop@hdp01 apps]$cd conf/
[hadoop@hdp01 apps]$mv spark-env.sh.template spark-env.sh
[hadoop@hdp01 apps]$vi spark-env.sh
在該配置文件中添加如下配置
export JAVA_HOME=/opt/jdk1.8.0_121
export SPARK_MASTER_IP=hdp08
export SPARK_MASTER_PORT=7077
保存退出
重命名并修改slaves.template文件
[hadoop@hdp01 conf]$ mv slaves.template slaves
[hadoop@hdp01 conf]$ vi slaves
在該文件中添加子節(jié)點(diǎn)所在的位置(Worker節(jié)點(diǎn))
hdp05
hdp06
hdp07
保存退出
將配置好的Spark拷貝到其他節(jié)點(diǎn)上
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp05:/home/hadoop/apps
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps
Spark集群配置完畢,目前是1個(gè)Master,3個(gè)Work,在hdp08上啟動(dòng)Spark集群
[hadoop@hdp01 spark]$ sbin/start-all.sh
啟動(dòng)后執(zhí)行jps命令,主節(jié)點(diǎn)上有Master進(jìn)程,其他子節(jié)點(diǎn)上有Work進(jìn)行,登錄Spark管理界面查看集群狀態(tài)(主節(jié)點(diǎn)):http://hdp08:8080/
到此為止,Spark集群安裝完畢,但是有一個(gè)很大的問(wèn)題,那就是Master節(jié)點(diǎn)存在單點(diǎn)故障,要解決此問(wèn)題,就要借助zookeeper,并且啟動(dòng)至少兩個(gè)Master節(jié)點(diǎn)來(lái)實(shí)現(xiàn)高可靠,配置方式比較簡(jiǎn)單:
Spark集群規(guī)劃:hdp01,hdp02是Master;hdp05,hdp06,hdp07是Worker
安裝配置zk集群,并啟動(dòng)zk集群
停止spark所有服務(wù),修改配置文件spark-env.sh,在該配置文件中刪掉SPARK_MASTER_IP并添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp05,hdp06,hdp07 - Dspark.deploy.zookeeper.dir=/spark"
1.在hdp01節(jié)點(diǎn)上修改slaves配置文件內(nèi)容指定worker節(jié)點(diǎn)
2.在hdp01上執(zhí)行sbin/start-all.sh腳本,然后在hdp02上執(zhí)行sbin/start-master.sh啟動(dòng)第二個(gè)Master
執(zhí)行Spark程序:
spark-shell是Spark自帶的交互式Shell程序,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用scala編寫spark程序。
啟動(dòng)spark shell:
bin/spark-shell \
--master spark://hdp08:7077 \
--executor-memory 1g \
--total-executor-cores 2
參數(shù)說(shuō)明:
--master spark://node1.togogo.cn:7077 指定Master的地址
--executor-memory 512M 指定每個(gè)worker可用內(nèi)存為512M,也可以指定為2g
--total-executor-cores 2 指定整個(gè)集群使用的cup核數(shù)為2個(gè)
注意:
如果啟動(dòng)spark shell時(shí)沒(méi)有指定master地址,但是也可以正常啟動(dòng)spark shell和執(zhí)行spark shell中的程序,其實(shí)是啟動(dòng)了spark的local模式,該模式僅在本機(jī)啟動(dòng)一個(gè)進(jìn)程,沒(méi)有與集群建立聯(lián)系。
Spark Shell中已經(jīng)默認(rèn)將SparkContext類初始化為對(duì)象sc。用戶代碼如果需要用到,則直接應(yīng)用sc即可
在spark shell中編寫WordCount程序:
1.首先啟動(dòng)hdfs
2.向hdfs上傳一個(gè)文件到hdfs://hdp01:9000/ README.md
3.在spark shell中用scala語(yǔ)言編寫spark程序
sc.textFile("hdfs://hdp08:9000/spark/README.md").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hdp08:9000/spark/out")
4.使用hdfs命令查看結(jié)果
[hadoop@hdp01 spark]$ hadoop fs -ls /spark/out
說(shuō)明:
sc是SparkContext對(duì)象,該對(duì)象時(shí)提交spark程序的入口
textFile(hdfs://hdp01:9000/spark/README.md)是hdfs中讀取數(shù)據(jù)
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構(gòu)成元組
reduceByKey(_+_)按照key進(jìn)行reduce,并將value累加
saveAsTextFile("hdfs://hdp01:9000/spark/out")將結(jié)果寫入到hdfs中
注意一:scala中的下劃線含義
1、 作為“通配符”,類似Java中的*。如import scala.math._
2、 _*作為一個(gè)整體,告訴編譯器你希望將某個(gè)參數(shù)當(dāng)作參數(shù)序列處理!例如val s = sum(1 to 5:_*)就是將1 to 5當(dāng)作參數(shù)序列處理。
3、 指代一個(gè)集合中的每個(gè)元素。例如我們要在一個(gè)Array a中篩出偶數(shù),并乘以2,可以用以下辦法:
a.filter(_%2==0).map(2*_)。
又如要對(duì)緩沖數(shù)組ArrayBuffer b排序,可以這樣:
val bSorted = b.sorted(_
4、 在元組中,可以用方法_1, _2, _3訪問(wèn)組員。如a._2。其中句點(diǎn)可以用空格替代。
5、 使用模式匹配可以用來(lái)獲取元組的組員,例如
val (first, second, third) = t
但如果不是所有的部件都需要,那么可以在不需要的部件位置上使用_。比如上一例中val (first, second, _) = t
6、 還有一點(diǎn),下劃線_代表的是某一類型的默認(rèn)值。
對(duì)于Int來(lái)說(shuō),它是0。
對(duì)于Double來(lái)說(shuō),它是0.0
對(duì)于引用類型,它是null。
注意二:map()與flatMap()區(qū)別
aap()是將函數(shù)用于RDD中的每個(gè)元素,將返回值構(gòu)成新的RDD。
flatmap()是將函數(shù)應(yīng)用于RDD中的每個(gè)元素,將返回的迭代器的所有內(nèi)容構(gòu)成新的RDD,這樣就得到了一個(gè)由各列表中的元素組成的RDD,而不是一個(gè)列表組成的RDD。
有些拗口,看看例子就明白了。
val rdd = sc.parallelize(List("coffee panda","happy panda","happiest panda party"))
輸入
rdd.map(x=>x).collect
結(jié)果
res9: Array[String] = Array(coffee panda, happy panda, happiest panda party)
輸入
rdd.flatMap(x=>x.split(" ")).collect
結(jié)果
res8: Array[String] = Array(coffee, panda, happy, panda, happiest, panda, party)
flatMap說(shuō)明白就是先map然后再flat,再來(lái)看個(gè)例子
val rdd1 = sc.parallelize(List(1,2,3,3))
scala> rdd1.map(x=>x+1).collect
res10: Array[Int] = Array(2, 3, 4, 4)
scala> rdd1.flatMap(x=>x.to(3)).collect
res11: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
---------------------------------------------------------------------------------------------------------------------------
var li=List(1,2,3,4)
var res =li.flatMap(x=> x match {
case 3 => List(3.1,3.2)
case _ =>List(x*2)
})
println(res)
li= List(1,2,3,4)
var res2 =li.map(x=> x match {
case 3 =>List(3.1,3.2)
case _ =>x*2
})
println(res2)
//output=>
List(2,4, 3.1,3.2, 8)
List(2,4, List(3.1,3.2), 8)
Program exited.
這個(gè)過(guò)程就像是先 map, 然后再將 map 出來(lái)的這些列表首尾相接 (flatten).
在IDEA中編寫WordCount程序:
spark shell僅在測(cè)試和驗(yàn)證我們的程序時(shí)使用的較多,在生產(chǎn)環(huán)境中,通常會(huì)在IDE中編制程序,然后打成jar包,然后提交到集群。
2.新建一個(gè)scala class,類型為Object
3.編寫spark程序
package net.togogo.scalasparkdemo
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
def main(args: Array[String]) {
//創(chuàng)建SparkConf()并設(shè)置App名稱
val conf = new SparkConf().setAppName("WC")
//創(chuàng)建SparkContext,該對(duì)象是提交spark App的入口
val sc = new SparkContext(conf)
//使用sc創(chuàng)建RDD并執(zhí)行相應(yīng)的transformation和action
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _, 1).sortBy(_._2, false).saveAsTextFile(args(1))
//停止sc,結(jié)束該任務(wù)
sc.stop()
}
}
?4.導(dǎo)出jar包
5.首先啟動(dòng)hdfs和Spark集群
啟動(dòng)hdfs
start-dfs.sh
啟動(dòng)spark
/sbin/start-all.sh
6.使用spark-submit命令提交Spark應(yīng)用(注意參數(shù)的順序)
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.sparkscalaproject.WordCountDemo \
--master spark://hdp08:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/home/hadoop/wcdemo.jar \
hdfs://hdp08:9000/work/README.md \
hdfs://hdp08:9000/work/out3
查看程序執(zhí)行結(jié)果