Apache PySpark 中的數據框:綜合教程 [附示例]

已發表: 2020-02-27

今天,我們將學習Apache PySpark 中的 DataFrame Pyspark 是 2020 年的頂級數據科學工具之一。它在 Apache Spark 中被命名為分佈式行集合的列。 它與 Excel 表格中的表或列非常相似,也類似於關係數據庫的表。 PySpark DataFrame 也有類似 RDD 的特點,分別是:

分佈式: DataFrame和RDD的本質都是分佈式的

惰性評估:如果不執行操作,則不執行任務

不可變的本質: RDD/DataFrame 的另一個類似特性是一旦創建就無法更改。 但是可以應用轉換來轉換 RDD / DataFrame。

目錄

數據幀的優勢

1. 支持多種語言,如Java、Scala、R、Python,在API支持方面很有用。 對多種語言的 API 支持有助於使用多種編程語言。

2. DataFrame 支持多種數據源和格式,這有助於方便地使用不同的數據源和格式。

3. DataFrame 最好的部分之一是它甚至可以處理 PB 級的數據,這是處理如此海量數據的非凡能力。

4. Apache Spark 通過對 Spark DataFrame 中的觀察,快速了解 DataFrame 的 schema。 在命名列下,組織了 Spark DataFrame 的觀察。 這樣,查詢執行的計劃就被優化了。

5.海量的半結構化和結構化數據可以被快速處理,因為它被設計為DataFrames。

Apache Spark 設置

Apache Spark 應該先在機器中設置,然後才能開始用於 DataFrame 操作。 數據可以在DataFrame的支持下進行操作,因為它支持各種DataFrame操作。 在這裡,我們將討論一些常見的 DataFrame 操作。

SparkContext的創建是Apache編程的第一步。 為了在集群中執行操作,需要 SparkContext。 SparkContext 告訴如何訪問集群。 它還向 Spark 顯示獲取集群的位置。

閱讀:深度學習框架

然後建立 Apache Cluster 連接。 如果使用 Spark Shell,它的創建已經完成。 另一種方式是配置設置,可以提供、初始化和導入以創建 SparkContext。

可以使用此代碼進行創建:

從 pyspark 導入 SparkContext

sc = SparkContext()

從 CSV 文件創建 DataFrame

必須在 python 的 shell 中指定一個新庫,以便可以讀取 CSV 文件。 為此,第一步是下載最新版本的 Spark-CSV 包並在 Spark 的主目錄中提取包。 之後,我們將打開 PySpark 的 shell,包必須包含在內。

$ ./bin/pyspark –packages com.databricks:spark-csv_2.10:1.3.0

現在 DataFrame 將在從 CSV 文件中讀取數據後創建。

train = sqlContext.load(source=”com.databricks.spark.csv”, path = 'PATH/train.csv', header = True,inferSchema = True)

test = sqlContext.load(source=”com.databricks.spark.csv”, path = 'PATH/test-comb.csv', header = True,inferSchema = True)

測試 CSV 文件和訓練 CSV 文件位於名為 PATH 的文件夾位置。 如果 CSV 文件中有 Header,那麼它將顯示為 True。 要知道數據框每一列中的數據類型,我們將使用 inferSchema = True 選項。 通過使用 inferSchema = True 選項,SQL 上下文將自動檢測數據框每一列的數據類型。 如果我們不將 inferSchema 設置為 true,則所有列都將被讀取為字符串。

閱讀:用於機器學習的 Python 庫

數據幀的操作

現在在這裡我們將看到如何操作數據框:

  • 了解列的數據類型

printSchema 將用於查看列類型及其數據類型。 現在將通過應用 printSchema() 以樹格式打印模式。

train.printSchema()

輸出:

|– User_ID:整數(可為空=真)

|– Product_ID:字符串(可為空 = true)

|– 性別:字符串(可為空=真)

|– 年齡:字符串(可為空=真)

|– 職業:整數(可為空=真)

|– City_Category:字符串(可為空=真)

|– Stay_In_Current_City_Years:字符串(可為空 = true)

|– Marital_Status:整數(可為空=真)

|– Product_Category_1:整數(可為空 = true)

|– Product_Category_2:整數(可為空 = true)

|– Product_Category_3:整數(可為空 = true)

|– 購買:整數(可為空=真)

讀取csv的文件後,我們可以看到我們準確的得到了數據類型或者數據框中每一列的schema。

  • 顯示前 n 個觀察值

要查看第 n 個觀察結果,可以使用 head 操作。 Pandas 的頭部操作與 PySpark 的頭部操作相同。

火車頭(5)

輸出:

[行(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status= 0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),

行(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),

Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),

行(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=12, Product_Category_2=14, Product_Category_3=無, 購買=1057),

行(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0 , Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]

現在我們將使用 show 操作以更好的方式查看結果,因為結果將以行的格式出現。 我們還可以使用參數 truncate = True 來截斷結果。

train.show(2,truncate=真)

輸出:

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

|User_ID|Product_ID|性別| 年齡|職業|城市_類別|停留_在_當前_城市_年|婚姻_狀態|產品_類別_1|產品_類別_2|產品_類別_3|購買|

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

|1000001| P00069042| F|0-17| 10| 一個| 2| 0| 3| 空| 空| 8370|

|1000001| P00248942| F|0-17| 10| 一個| 2| 0| 1| 6| 14| 15200|

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

僅顯示前 2 行

  • 計算 DataFrame 的行數

要計算數據框中的行數,我們可以使用計數的操作。 現在我們將通過應用計數操作來計算測試文件和訓練文件的行數。

train.count(),test.count()

輸出:

(233598, 550069)

我們在測試和訓練中分別有 233598、550069 行。

  • 從測試和訓練文件中獲取列數和列名

類似於 pandas 的 DataFrame 中對列的操作,我們將使用列操作來獲取列的名稱。 現在首先我們將打印編號。 列的名稱和列的名稱,來自測試文件和訓練文件。

len(train.columns),train.columns

輸出:

12 ['User_ID'、'Product_ID'、'Gender'、'Age'、'Occupation'、'City_Category'、'Stay_In_Current_City_Years'、'Marital_Status'、'Product_Category_1'、'Product_Category_2'、'Product_Category_3'、'Purchase']

現在我們正在為測試文件做類似的事情。

len(test.columns),test.columns

輸出:

13 [”, 'User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Comb ']

經過上面的輸出,我們可以看到訓練文件有12列,測試文件有13列。 從上面的輸出中,我們可以檢查我們在測試文件中有 13 列,在訓練文件中有 12 列。 “Comb”列是測試文件中唯一的單列,並且沒有“Purchase”不存在於測試文件中。 測試文件中還有一列我們可以看到沒有任何列名。

  • 在 DataFrame 的數值列中獲取匯總統計信息,例如計數、最大值、最小值、標準偏差、平均值

在 DataFrame 中,我們將使用稱為 describe 操作的操作。 我們可以通過describe操作對數值列進行計算,得到統計匯總。 所有數值列都會在 DataFrame 中計算,我們在匯總統計的計算中沒有指定列名。

train.describe().show()

輸出:

+———-+——————+——————–+——————-+——————+——————+——————+—— ————+

|總結| 用戶ID| 職業| 婚姻狀況|Product_Category_1|Product_Category_2|Product_Category_3| 購買|

+———-+——————+——————–+——————-+——————+——————+——————+—— ————+

| 計數| 550068| 550068| 550068| 550068| 376430| 166821| 550068|

| 均值|1003028.8424013031|8.076706879876669|0.40965298835780306| 5.404270017525106| 9.842329251122386|12.668243206790512| 9263.968712959126|

| 標准開發|1727.5915855308265|6.522660487341778| 0.4917701263173273|3.9362113692014082| 5.086589648693526| 4.125337631575267|5023.0653938206015|

| 分鐘| 1000001| 0| 0| 1| 2| 3| 12|

| 最大值| 1006040| 20| 1| 20| 18| 18| 23961|

+———-+——————+——————–+——————-+——————+——————+——————+—— ————+

在描述操作中,這是我們在指定字符串列名或分類列名時得到的。

train.describe('Product_ID').show()

輸出:

+——-+——-+

|總結|產品ID|

+——-+——-+

| 計數| 550068|

| 意思| 空|

| 標准開發| 空|

| 分鐘| P00000142|

| 最大值| P0099942|

+——-+——-+

基於ASCII,計算出最大值和最小值。 描述操作用於處理字符串類型列。

  • 選擇 DataFrame 的列

我們將在選擇操作中使用列的名稱來選擇列。 我們將使用逗號分隔列的名稱。 現在我們將看看如何從訓練文件中選擇“Age”和“User_ID”。

  • train.select('User_ID','Age').show(5)
  • 輸出:
  • +——-+——-+
  • |用戶ID| 年齡|
  • +——-+——-+
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000002| 55+|
  • +——-+——-+
  • 尋找不同的產品編號在測試文件和訓練文件中

計算 DataFrame 的編號。 對於不同的行,我們將使用distinct操作。 現在在這裡我們將應用distinct運算來計算 no。 測試和訓練文件中的不同產品。

train.select('Product_ID').distinct().count(),test.select('Product_ID').distinct().count()

輸出:

(3633, 3492)

我們在測試和訓練文件中分別有 3492 和 3633 個不同的產品。 現在我們知道,在訓練文件中,我們比測試文件有更多不同的值,因為我們可以從輸出結果中了解到。 現在我們將使用減法運算找出訓練文件中不存在但測試文件中存在的 Product_ID 類別。 同樣的事情也可以對分類的所有特徵做。

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))

diff_cat_in_train_test.distinct().count()# 用於不同的計數

輸出:

46

所以從上面的結果,我們可以知道有 47 個不同的類別,它們在訓練文件中不存在,但在測試文件中存在。 數據將被跳過或從測試文件中收集,該文件不存在於火車文件中。

  • 計算分類列的成對頻率?

讓我們使用can crosstab操作來計算DataFrame中列的成對頻率。 現在讓我們通過交叉表操作計算火車 DataFrame 中的“性別”和“年齡”列

train.crosstab('年齡', '性別').show()

輸出:

+——-+——+——+

|年齡_性別| F| 米|

+——-+——+——+

| 0-17| 5083| 10019|

| 46-50|13199| 32502|

| 18-25|24628| 75032|

| 36-45|27170| 82843|

| 55+| 5083| 16421|

| 51-55| 9894| 28607|

| 26-35|50752|168835|

+——-+——+——+

Gender 的不同值是列名,Age 的不同量是行名,從上面的結果中可以看出。 在表中,如果沒有發生,該對的計數將為零。

如何獲取具有唯一行的 DataFrame?

要查找唯一行而不包含重複行,我們將使用dropDuplicates操作。 它將通過刪除 DataFrame 的重複行來獲得沒有任何重複行的 Dataframe。 請在此處查看以了解如何執行 dropDuplicates 過程以獲取列的所有唯一行。

train.select('年齡','性別').dropDuplicates().show()

輸出:

+—–+——+

| 年齡|性別|

+—–+——+

|51-55| F|

|51-55| 米|

|26-35| F|

|26-35| 米|

|36-45| F|

|36-45| 米|

|46-50| F|

|46-50| 米|

| 55+| F|

| 55+| 米|

|18-25| F|

| 0-17| F|

|18-25| 米|

| 0-17| 米|

+—–+——+

  • 如何刪除行將空值?

如果要刪除所有具有空值的行,那麼我們可以使用稱為dropna操作的操作。 要從 DataFrame 中刪除行,它考慮了三個選項。

  • 子集 - 它是刪除具有空值的列的操作要考慮的所有列名稱的列表。
  • Thresh - 這有助於刪除具有小於 thresh 非空值的行。 默認情況下,這裡沒有指定任何內容。
  • 如何——它可以用於兩種類型——全部或任何。 通過使用any,如果行中的任何值為空,它將刪除該行。 通過使用 all,如果所有行的值為空,它將減少行。

現在,我們將一一使用所有這些選項,通過使用默認選項(例如子集、閾值、無用於如何、無、任何)來刪除為空的行。

train.dropna().count()

輸出:

166821

  • 如何用常量號填充 DataFrame 的空值?

用常量 no 填充空值。 我們將在這裡使用fillna操作。 fillna操作需要考慮兩個參數來填充空值。

  • 子集:在這裡,需要指定要考慮填充值的列。
  • value:這裡我們可以提一下要替換成什麼值的數量,可以是所有列中的任何數據類型,如string、float、int。

在這裡,我們將填充“-1”來代替 train DataFrame 中的空值。

train.fillna(-1).show(2)

輸出:

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

|User_ID|Product_ID|性別| 年齡|職業|城市_類別|停留_在_當前_城市_年|婚姻_狀態|產品_類別_1|產品_類別_2|產品_類別_3|購買|

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

|1000001| P00069042| F|0-17| 10| 一個| 2| 0| 3| -1| -1| 8370|

|1000001| P00248942| F|0-17| 10| 一個| 2| 0| 1| 6| 14| 15200|

+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+

僅顯示前 2 行

結論

PySpark 在人工智能和機器學習領域的發展勢頭強勁。 PySpark 用於解決現實世界的機器學習問題。 您可以從外部和現有的不同數據源創建 RDD,並對其進行所有類型的轉換。 希望本文內容豐富,能夠讓您深入了解 PySpark 數據幀。

如果您想了解 PySpark 和其他數據科學工具,請查看 IIIT-B 和 upGrad 的數據科學 PG 文憑,該文憑專為在職專業人士而設,提供 10 多個案例研究和項目、實用的實踐研討會、指導行業專家,與行業導師一對一交流,400 多個小時的學習和頂級公司的工作協助。

PySpark 是否比 Pandas 更高效?

是的,PySpark 比 Pandas 更快,它甚至在基準測試中勝過 Pandas。 基本而言,Pandas 在單台機器上執行操作,而 PySpark 在多台機器上執行操作。 如果您正在使用具有龐大數據集的機器學習應用程序,PySpark 是理想的選擇,因為它可以比 Pandas 快 100 倍地執行操作。 由於 JVM,Scala 編程語言在數據分析和處理方面比 Python 快 10 倍。 當使用 Python 編程代碼調用 Spark 庫時,結果是平庸的。

使用 Apache PySpark 有哪些缺點?

Spark 沒有自己的文件管理系統。 由於運行 Spark 所需的額外內存成本很高,內存計算的成本可能高得令人望而卻步。 在將 Apache Spark 與 Hadoop 結合使用時,開發人員會遇到壓縮文件的困難。 數據在 Spark 中分批迭代,每次迭代都是獨立計劃和處理的。 在 Apache Spark 中,數據以預定的時間間隔分成更小的批次。 因此,Apache 將不支持基於記錄的窗口條件。 相反,它提供了基於時間的窗口標準。

Datasets、DataFrame 和 RDD 有何不同?

RDD 是分散在多台計算機上的數據項的集群集合。 數據通過 RDD 表示,RDD 是 Java 或 Scala 對象的集合。 DataFrame 是結構化為命名列的數據集合,分佈在許多服務器上。 在關係數據庫中,它在概念上等同於表。 Dataset 是一個數據幀 API 擴展,它提供了 RDD API 的類型安全、面向對象的編程接口能力。 DataFrame 是數據的分佈式集合,類似於 RDD,並且不可變。 數據被結構化為命名列,類似於關係數據庫中的表,而不是 RDD。 當涉及到像分組數據這樣的簡單任務時,RDD 比 Dataframes 和 Datasets 都慢。 它有一個用於執行聚合任務的簡單 API。 它可以比 RDD 和數據集更快地聚合數據。