通過示例和用例介紹 Apache Spark
已發表: 2022-03-11我第一次聽說 Spark 是在 2013 年底,當時我對編寫 Spark 的語言 Scala 產生了興趣。 一段時間後,我做了一個有趣的數據科學項目,試圖預測泰坦尼克號上的生存情況。 事實證明,這是進一步介紹 Spark 概念和編程的好方法。 我強烈推薦任何有抱負的 Spark 開發人員尋找入門的地方。
如今,Spark 已被亞馬遜、eBay 和雅虎等主要參與者採用! 許多組織在具有數千個節點的集群上運行 Spark。 根據 Spark FAQ,已知最大的集群有超過 8000 個節點。 事實上,Spark 是一項非常值得關注和學習的技術。
本文介紹了 Spark,包括用例和示例。 它包含來自 Apache Spark 網站以及《Learning Spark - Lightning-Fast Big Data Analysis》一書的信息。
什麼是 Apache Spark? 一個介紹
Spark 是一個標榜為“閃電般快速的集群計算”的 Apache 項目。 它擁有一個蓬勃發展的開源社區,是目前最活躍的 Apache 項目。
Spark 提供了更快、更通用的數據處理平台。 Spark 讓您在內存中運行程序的速度比 Hadoop 快 100 倍,或者在磁盤上快 10 倍。 去年,Spark 通過在十分之一的機器上以 3 倍的速度完成 100 TB Daytona GraySort 競賽,接管了 Hadoop,它還成為了排序 PB 的最快開源引擎。
Spark 還可以讓您更快地編寫代碼,因為您可以使用 80 多個高級運算符。 為了證明這一點,讓我們看一下“Hello World!” 大數據:字數示例。 用 Java 為 MapReduce 編寫的它有大約 50 行代碼,而在 Spark(和 Scala)中,您可以像這樣簡單地做到這一點:
sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")
學習如何使用 Apache Spark 的另一個重要方面是它提供的開箱即用的交互式 shell (REPL)。 使用 REPL,可以測試每一行代碼的結果,而無需首先編寫代碼並執行整個作業。 因此,工作代碼的路徑要短得多,並且可以進行臨時數據分析。
Spark 的其他主要功能包括:
- 目前提供 Scala、Java 和 Python 的 API,並正在支持其他語言(例如 R)
- 與 Hadoop 生態系統和數據源(HDFS、Amazon S3、Hive、HBase、Cassandra 等)很好地集成
- 可以在由 Hadoop YARN 或 Apache Mesos 管理的集群上運行,也可以獨立運行
Spark 核心由一組功能強大的高級庫補充,這些庫可以在同一應用程序中無縫使用。 這些庫目前包括 SparkSQL、Spark Streaming、MLlib(用於機器學習)和 GraphX,本文將進一步詳細介紹這些庫。 其他 Spark 庫和擴展目前也在開發中。
火花核心
Spark Core 是大規模並行和分佈式數據處理的基礎引擎。 它負責:
- 內存管理和故障恢復
- 在集群上調度、分發和監控作業
- 與存儲系統交互
Spark 引入了 RDD(彈性分佈式數據集)的概念,這是一個不可變的容錯分佈式對象集合,可以並行操作。 RDD 可以包含任何類型的對象,它是通過加載外部數據集或從驅動程序分發集合來創建的。
RDD 支持兩種類型的操作:
- 轉換是在 RDD 上執行並產生包含結果的新 RDD 的操作(例如 map、filter、join、union 等)。
- 操作是在 RDD 上運行計算後返回值的操作(例如 reduce、count、first 等)。
Spark 中的轉換是“惰性的”,這意味著它們不會立即計算結果。 相反,他們只是“記住”要執行的操作和要執行操作的數據集(例如,文件)。 只有在調用動作並將結果返回給驅動程序時才會實際計算轉換。 這種設計使 Spark 能夠更高效地運行。 例如,如果一個大文件以各種方式被轉換並傳遞給第一個操作,Spark 將只處理並返回第一行的結果,而不是對整個文件進行工作。
默認情況下,每個轉換後的 RDD 可能會在您每次對其運行操作時重新計算。 但是,您也可以使用 persist 或 cache 方法將 RDD 持久化到內存中,在這種情況下,Spark 會將元素保留在集群中,以便下次查詢時更快地訪問它。
火花SQL
SparkSQL 是一個 Spark 組件,支持通過 SQL 或 Hive 查詢語言查詢數據。 它最初是作為在 Spark 之上運行的 Apache Hive 端口(代替 MapReduce),現在與 Spark 堆棧集成。 除了提供對各種數據源的支持外,它還可以將 SQL 查詢與代碼轉換結合起來,從而形成一個非常強大的工具。 以下是 Hive 兼容查詢的示例:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
火花流
Spark Streaming 支持流數據的實時處理,例如生產網絡服務器日誌文件(例如 Apache Flume 和 HDFS/S3)、Twitter 等社交媒體以及 Kafka 等各種消息隊列。 在底層,Spark Streaming 接收輸入數據流並將數據分成批次。 接下來,它們由 Spark 引擎處理並批量生成最終結果流,如下圖所示。

Spark Streaming API 與 Spark Core 的 API 非常匹配,使程序員可以輕鬆地在批處理和流數據的世界中工作。
MLlib
MLlib 是一個機器學習庫,它提供了各種算法,旨在擴展集群以進行分類、回歸、聚類、協同過濾等(有關該主題的更多信息,請查看 Toptal 關於機器學習的文章)。 其中一些算法也適用於流數據,例如使用普通最小二乘或 k-means 聚類的線性回歸(以及更多的方法)。 Apache Mahout(Hadoop 的機器學習庫)已經遠離 MapReduce,並在 Spark MLlib 上聯手。
圖X
GraphX 是一個用於操作圖形和執行圖形並行操作的庫。 它為 ETL、探索性分析和迭代圖計算提供了一個統一的工具。 除了用於圖形操作的內置操作外,它還提供了一個常見的圖形算法庫,例如 PageRank。
如何使用 Apache Spark:事件檢測用例
既然我們已經回答了“什麼是 Apache Spark?”這個問題,讓我們想想它可以最有效地用於解決什麼樣的問題或挑戰。
我最近看到一篇關於通過分析 Twitter 流來檢測地震的實驗的文章。 有趣的是,這項技術可能會比日本氣象廳更快地通知您日本發生地震。 儘管他們在文章中使用了不同的技術,但我認為這是一個很好的示例,可以讓我們了解如何使用簡化的代碼片段而不使用膠水代碼來使用 Spark。
首先,我們必須過濾那些看起來相關的推文,比如“地震”或“震動”。 為此,我們可以輕鬆地使用 Spark Streaming,如下所示:
TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
然後,我們必須對推文進行一些語義分析,以確定它們是否似乎引用了當前的地震事件。 諸如“地震!”之類的推文例如,“現在它在震動”會被認為是正匹配,而諸如“參加地震會議”或“昨天的地震很可怕”之類的推文則不會。 該論文的作者為此使用了支持向量機 (SVM)。 我們將在這裡做同樣的事情,但也可以嘗試流媒體版本。 MLlib 生成的代碼示例如下所示:
// We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)
如果我們對模型的預測率感到滿意,我們可以進入下一階段,並在發現地震時做出反應。 為了檢測一個,我們需要在定義的時間窗口(如文章中所述)內一定數量(即密度)的正面推文。 請注意,對於啟用了 Twitter 位置服務的推文,我們還將提取地震的位置。 有了這些知識,我們可以使用 SparkSQL 並查詢現有的 Hive 表(存儲有興趣接收地震通知的用戶)以檢索他們的電子郵件地址並向他們發送個性化的警告電子郵件,如下所示:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
其他 Apache Spark 用例
當然,Spark 的潛在用例遠遠超出了檢測地震的範圍。
以下是需要處理大數據的速度、種類和數量的其他用例的快速(但肯定遠非詳盡!)示例,Spark 非常適合這些用例:
在遊戲行業中,處理和發現潛在的實時遊戲內事件的模式並能夠立即做出響應是一種可以產生利潤豐厚的業務的能力,例如玩家保留、定向廣告、汽車- 複雜程度的調整,等等。
在電子商務行業,實時交易信息可以傳遞給流式聚類算法(如 k-means)或協同過濾(如 ALS)。 結果甚至可以與其他非結構化數據源(例如客戶評論或產品評論)相結合,並用於隨著時間的推移不斷改進和調整建議以適應新趨勢。
在金融或安全行業,Spark 堆棧可應用於欺詐或入侵檢測系統或基於風險的身份驗證。 它可以通過收集大量存檔日誌,將其與外部數據源(例如有關數據洩露和受損帳戶的信息(例如,參見 https://haveibeenpwned.com/)和來自連接的信息/請求,例如 IP 地理位置或時間。
結論
總而言之,Spark 有助於簡化處理大量實時或存檔數據(包括結構化和非結構化數據)的挑戰性和計算密集型任務,無縫集成相關的複雜功能,例如機器學習和圖形算法。 Spark 將大數據處理帶給大眾。 看看這個!