Apache PySpark 中的数据框:综合教程 [附示例]
已发表: 2020-02-27今天,我们将学习Apache PySpark 中的 DataFrame 。 Pyspark 是 2020 年的顶级数据科学工具之一。它在 Apache Spark 中被命名为分布式行集合的列。 它与 Excel 表格中的表或列非常相似,也类似于关系数据库的表。 PySpark DataFrame 也有类似 RDD 的特点,分别是:
分布式: DataFrame和RDD的本质都是分布式的
惰性评估:如果不执行操作,则不执行任务
不可变的本质: RDD/DataFrame 的另一个类似特性是一旦创建就无法更改。 但是可以应用转换来转换 RDD / DataFrame。
目录
数据帧的优势
1. 支持多种语言,如Java、Scala、R、Python,在API支持方面很有用。 对多种语言的 API 支持有助于使用多种编程语言。
2. DataFrame 支持多种数据源和格式,这有助于方便地使用不同的数据源和格式。
3. DataFrame 最好的部分之一是它甚至可以处理 PB 级的数据,这是处理如此海量数据的非凡能力。
4. Apache Spark 通过对 Spark DataFrame 中的观察,快速了解 DataFrame 的 schema。 在命名列下,组织了 Spark DataFrame 的观察。 这样,查询执行的计划就被优化了。
5.海量的半结构化和结构化数据可以被快速处理,因为它被设计为DataFrames。
Apache Spark 设置
Apache Spark 应该先在机器中设置,然后才能开始用于 DataFrame 操作。 数据可以在DataFrame的支持下进行操作,因为它支持各种DataFrame操作。 在这里,我们将讨论一些常见的 DataFrame 操作。
SparkContext的创建是Apache编程的第一步。 为了在集群中执行操作,需要 SparkContext。 SparkContext 告诉如何访问集群。 它还向 Spark 显示获取集群的位置。
阅读:深度学习框架
然后建立 Apache Cluster 连接。 如果使用 Spark Shell,它的创建已经完成。 另一种方式是配置设置,可以提供、初始化和导入以创建 SparkContext。
可以使用此代码进行创建:
从 pyspark 导入 SparkContext
sc = SparkContext()
从 CSV 文件创建 DataFrame
必须在 python 的 shell 中指定一个新库,以便可以读取 CSV 文件。 为此,第一步是下载最新版本的 Spark-CSV 包并在 Spark 的主目录中提取包。 之后,我们将打开 PySpark 的 shell,包必须包含在内。
$ ./bin/pyspark –packages com.databricks:spark-csv_2.10:1.3.0
现在 DataFrame 将在从 CSV 文件中读取数据后创建。
train = sqlContext.load(source=”com.databricks.spark.csv”, path = 'PATH/train.csv', header = True,inferSchema = True)
test = sqlContext.load(source=”com.databricks.spark.csv”, path = 'PATH/test-comb.csv', header = True,inferSchema = True)
测试 CSV 文件和训练 CSV 文件位于名为 PATH 的文件夹位置。 如果 CSV 文件中有 Header,那么它将显示为 True。 要知道数据框每一列中的数据类型,我们将使用 inferSchema = True 选项。 通过使用 inferSchema = True 选项,SQL 上下文将自动检测数据框每一列的数据类型。 如果我们不将 inferSchema 设置为 true,则所有列都将被读取为字符串。
阅读:用于机器学习的 Python 库
数据帧的操作
现在在这里我们将看到如何操作数据框:
- 了解列的数据类型
printSchema 将用于查看列类型及其数据类型。 现在将通过应用 printSchema() 以树格式打印模式。
train.printSchema()
输出:
根
|– User_ID:整数(可为空=真)
|– Product_ID:字符串(可为空 = true)
|– 性别:字符串(可为空=真)
|– 年龄:字符串(可为空=真)
|– 职业:整数(可为空=真)
|– City_Category:字符串(可为空=真)
|– Stay_In_Current_City_Years:字符串(可为空 = true)
|– Marital_Status:整数(可为空=真)
|– Product_Category_1:整数(可为空 = true)
|– Product_Category_2:整数(可为空 = true)
|– Product_Category_3:整数(可为空 = true)
|– 购买:整数(可为空=真)
读取csv的文件后,我们可以看到我们准确的得到了数据类型或者数据框中每一列的schema。
- 显示前 n 个观察值
要查看第 n 个观察结果,可以使用 head 操作。 Pandas 的头部操作与 PySpark 的头部操作相同。
火车头(5)
输出:
[行(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status= 0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
行(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
行(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0 , Product_Category_1=12, Product_Category_2=14, Product_Category_3=无, 购买=1057),
行(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0 , Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]
现在我们将使用 show 操作以更好的方式查看结果,因为结果将以行的格式出现。 我们还可以使用参数 truncate = True 来截断结果。
train.show(2,truncate=真)
输出:
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
|User_ID|Product_ID|性别| 年龄|职业|城市_类别|停留_在_当前_城市_年|婚姻_状态|产品_类别_1|产品_类别_2|产品_类别_3|购买|
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
|1000001| P00069042| F|0-17| 10| 一个| 2| 0| 3| 空| 空| 8370|
|1000001| P00248942| F|0-17| 10| 一个| 2| 0| 1| 6| 14| 15200|
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
仅显示前 2 行
- 计算 DataFrame 的行数
要计算数据框中的行数,我们可以使用计数的操作。 现在我们将通过应用计数操作来计算测试文件和训练文件的行数。
train.count(),test.count()
输出:
(233598, 550069)
我们在测试和训练中分别有 233598、550069 行。
- 从测试和训练文件中获取列数和列名
类似于 pandas 的 DataFrame 中对列的操作,我们将使用列操作来获取列的名称。 现在首先我们将打印编号。 列的名称和列的名称,来自测试文件和训练文件。
len(train.columns),train.columns
输出:
12 ['User_ID'、'Product_ID'、'Gender'、'Age'、'Occupation'、'City_Category'、'Stay_In_Current_City_Years'、'Marital_Status'、'Product_Category_1'、'Product_Category_2'、'Product_Category_3'、'Purchase']
现在我们正在为测试文件做类似的事情。
len(test.columns),test.columns
输出:
13 [”, 'User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Comb ']
经过上面的输出,我们可以看到训练文件有12列,测试文件有13列。 从上面的输出中,我们可以检查我们在测试文件中有 13 列,在训练文件中有 12 列。 “Comb”列是测试文件中唯一的单列,并且没有“Purchase”不存在于测试文件中。 测试文件中还有一列我们可以看到没有任何列名。
- 在 DataFrame 的数值列中获取汇总统计信息,例如计数、最大值、最小值、标准偏差、平均值
在 DataFrame 中,我们将使用称为 describe 操作的操作。 我们可以通过describe操作对数值列进行计算,得到统计汇总。 所有数值列都会在 DataFrame 中计算,我们在汇总统计的计算中没有指定列名。
train.describe().show()
输出:
+———-+——————+——————–+——————-+——————+——————+——————+—— ————+
|总结| 用户ID| 职业| 婚姻状况|Product_Category_1|Product_Category_2|Product_Category_3| 购买|

+———-+——————+——————–+——————-+——————+——————+——————+—— ————+
| 计数| 550068| 550068| 550068| 550068| 376430| 166821| 550068|
| 均值|1003028.8424013031|8.076706879876669|0.40965298835780306| 5.404270017525106| 9.842329251122386|12.668243206790512| 9263.968712959126|
| 标准开发|1727.5915855308265|6.522660487341778| 0.4917701263173273|3.9362113692014082| 5.086589648693526| 4.125337631575267|5023.0653938206015|
| 分钟| 1000001| 0| 0| 1| 2| 3| 12|
| 最大值| 1006040| 20| 1| 20| 18| 18| 23961|
+———-+——————+——————–+——————-+——————+——————+——————+—— ————+
在描述操作中,这是我们在指定字符串列名或分类列名时得到的。
train.describe('Product_ID').show()
输出:
+——-+——-+
|总结|产品ID|
+——-+——-+
| 计数| 550068|
| 意思| 空|
| 标准开发| 空|
| 分钟| P00000142|
| 最大值| P0099942|
+——-+——-+
基于ASCII,计算出最大值和最小值。 描述操作用于处理字符串类型列。
- 选择 DataFrame 的列
我们将在选择操作中使用列的名称来选择列。 我们将使用逗号分隔列的名称。 现在我们将看看如何从训练文件中选择“Age”和“User_ID”。
- train.select('User_ID','Age').show(5)
- 输出:
- +——-+——-+
- |用户ID| 年龄|
- +——-+——-+
- |1000001|0-17|
- |1000001|0-17|
- |1000001|0-17|
- |1000001|0-17|
- |1000002| 55+|
- +——-+——-+
- 寻找不同的产品编号在测试文件和训练文件中
计算 DataFrame 的编号。 对于不同的行,我们将使用distinct操作。 现在在这里我们将应用不同的操作来计算no。 测试和训练文件中的不同产品。
train.select('Product_ID').distinct().count(),test.select('Product_ID').distinct().count()
输出:
(3633, 3492)
我们在测试和训练文件中分别有 3492 和 3633 个不同的产品。 现在我们知道,在训练文件中,我们比测试文件有更多不同的值,因为我们可以从输出结果中了解到。 现在我们将使用减法运算找出训练文件中不存在但测试文件中存在的 Product_ID 类别。 同样的事情也可以对分类的所有特征做。
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# 用于不同的计数
输出:
46
所以从上面的结果,我们可以知道有 47 个不同的类别,它们在训练文件中不存在,但在测试文件中存在。 数据将被跳过或从测试文件中收集,该文件不存在于火车文件中。
- 计算分类列的成对频率?
让我们使用can crosstab操作来计算DataFrame中列的成对频率。 现在让我们通过交叉表操作计算火车 DataFrame 中的“性别”和“年龄”列。
train.crosstab('年龄', '性别').show()
输出:
+——-+——+——+
|年龄_性别| F| 米|
+——-+——+——+
| 0-17| 5083| 10019|
| 46-50|13199| 32502|
| 18-25|24628| 75032|
| 36-45|27170| 82843|
| 55+| 5083| 16421|
| 51-55| 9894| 28607|
| 26-35|50752|168835|
+——-+——+——+
Gender 的不同值是列名,Age 的不同量是行名,从上面的结果中可以看出。 在表中,如果没有发生,该对的计数将为零。
如何获取具有唯一行的 DataFrame?
要查找唯一行而不包含重复行,我们将使用dropDuplicates操作。 它将通过删除 DataFrame 的重复行来获得没有任何重复行的 Dataframe。 请在此处查看以了解如何执行 dropDuplicates 过程以获取列的所有唯一行。
train.select('年龄','性别').dropDuplicates().show()
输出:
+—–+——+
| 年龄|性别|
+—–+——+
|51-55| F|
|51-55| 米|
|26-35| F|
|26-35| 米|
|36-45| F|
|36-45| 米|
|46-50| F|
|46-50| 米|
| 55+| F|
| 55+| 米|
|18-25| F|
| 0-17| F|
|18-25| 米|
| 0-17| 米|
+—–+——+
- 如何删除行将空值?
如果要删除所有具有空值的行,那么我们可以使用称为dropna操作的操作。 要从 DataFrame 中删除行,它考虑了三个选项。
- 子集 - 它是删除具有空值的列的操作要考虑的所有列名称的列表。
- Thresh - 这有助于删除具有小于 thresh 非空值的行。 默认情况下,这里没有指定任何内容。
- 如何——它可以用于两种类型——全部或任何。 通过使用any,如果行中的任何值为空,它将删除该行。 通过使用 all,如果所有行的值为空,它将减少行。
现在,我们将一一使用所有这些选项,通过使用默认选项(例如子集、阈值、无用于如何、无、任何)来删除为空的行。
train.dropna().count()
输出:
166821
- 如何用常量号填充 DataFrame 的空值?
用常量 no 填充空值。 我们将在这里使用fillna操作。 fillna操作需要考虑两个参数来填充空值。
- 子集:在这里,需要指定要考虑填充值的列。
- value:这里我们可以提一下要替换成什么值的数量,可以是所有列中的任何数据类型,如string、float、int。
在这里,我们将填充“-1”来代替 train DataFrame 中的空值。
train.fillna(-1).show(2)
输出:
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
|User_ID|Product_ID|性别| 年龄|职业|城市_类别|停留_在_当前_城市_年|婚姻_状态|产品_类别_1|产品_类别_2|产品_类别_3|购买|
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
|1000001| P00069042| F|0-17| 10| 一个| 2| 0| 3| -1| -1| 8370|
|1000001| P00248942| F|0-17| 10| 一个| 2| 0| 1| 6| 14| 15200|
+——-+——-+——+——+——-+————-+——————–+————–+————+ ——————+——————+——–+
仅显示前 2 行
结论
PySpark 在人工智能和机器学习领域的发展势头强劲。 PySpark 用于解决现实世界的机器学习问题。 您可以从外部和现有的不同数据源创建 RDD,并对其进行所有类型的转换。 希望本文内容丰富,能够让您深入了解 PySpark 数据帧。
如果您想了解 PySpark 和其他数据科学工具,请查看 IIIT-B 和 upGrad 的数据科学 PG 文凭,该文凭专为在职专业人士而设,提供 10 多个案例研究和项目、实用的实践研讨会、指导行业专家,与行业导师一对一交流,400 多个小时的学习和顶级公司的工作协助。
PySpark 是否比 Pandas 更高效?
是的,PySpark 比 Pandas 更快,它甚至在基准测试中胜过 Pandas。 基本而言,Pandas 在单台机器上执行操作,而 PySpark 在多台机器上执行操作。 如果您正在使用具有庞大数据集的机器学习应用程序,PySpark 是理想的选择,因为它可以比 Pandas 快 100 倍地执行操作。 由于 JVM,Scala 编程语言在数据分析和处理方面比 Python 快 10 倍。 当使用 Python 编程代码调用 Spark 库时,结果是平庸的。
使用 Apache PySpark 有哪些缺点?
Spark 没有自己的文件管理系统。 由于运行 Spark 所需的额外内存成本很高,内存计算的成本可能高得令人望而却步。 在将 Apache Spark 与 Hadoop 结合使用时,开发人员会遇到压缩文件的困难。 数据在 Spark 中分批迭代,每次迭代都是独立计划和处理的。 在 Apache Spark 中,数据以预定的时间间隔分成更小的批次。 因此,Apache 将不支持基于记录的窗口条件。 相反,它提供了基于时间的窗口标准。
Datasets、DataFrame 和 RDD 有何不同?
RDD 是分散在多台计算机上的数据项的集群集合。 数据通过 RDD 表示,RDD 是 Java 或 Scala 对象的集合。 DataFrame 是结构化为命名列的数据集合,分布在许多服务器上。 在关系数据库中,它在概念上等同于表。 Dataset 是一个数据帧 API 扩展,它提供了 RDD API 的类型安全、面向对象的编程接口能力。 DataFrame 是数据的分布式集合,类似于 RDD,并且不可变。 数据被结构化为命名列,类似于关系数据库中的表,而不是 RDD。 当涉及到像分组数据这样的简单任务时,RDD 比 Dataframes 和 Datasets 都慢。 它有一个用于执行聚合任务的简单 API。 它可以比 RDD 和数据集更快地聚合数据。