通过示例和用例介绍 Apache Spark

已发表: 2022-03-11

我第一次听说 Spark 是在 2013 年底,当时我对编写 Spark 的语言 Scala 产生了兴趣。 一段时间后,我做了一个有趣的数据科学项目,试图预测泰坦尼克号上的生存情况。 事实证明,这是进一步介绍 Spark 概念和编程的好方法。 我强烈推荐任何有抱负的 Spark 开发人员寻找入门的地方。

如今,Spark 已被亚马逊、eBay 和雅虎等主要参与者采用! 许多组织在具有数千个节点的集群上运行 Spark。 根据 Spark FAQ,已知最大的集群有超过 8000 个节点。 事实上,Spark 是一项非常值得关注和学习的技术。

本文介绍了 Spark,包括用例和示例。 它包含来自 Apache Spark 网站以及《Learning Spark - Lightning-Fast Big Data Analysis》一书的信息。

什么是 Apache Spark? 一个介绍

Spark 是一个标榜为“闪电般快速的集群计算”的 Apache 项目。 它拥有一个蓬勃发展的开源社区,是目前最活跃的 Apache 项目。

Spark 提供了更快、更通用的数据处理平台。 Spark 让您在内存中运行程序的速度比 Hadoop 快 100 倍,或者在磁盘上快 10 倍。 去年,Spark 通过在十分之一的机器上以 3 倍的速度完成 100 TB Daytona GraySort 竞赛,接管了 Hadoop,它还成为了排序 PB 的最快开源引擎。

Spark 还可以让您更快地编写代码,因为您可以使用 80 多个高级运算符。 为了证明这一点,让我们看一下“Hello World!” 大数据:字数示例。 用 Java 为 MapReduce 编写的它有大约 50 行代码,而在 Spark(和 Scala)中,您可以像这样简单地做到这一点:

 sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

学习如何使用 Apache Spark 的另一个重要方面是它提供的开箱即用的交互式 shell (REPL)。 使用 REPL,可以测试每一行代码的结果,而无需首先编写代码并执行整个作业。 因此,工作代码的路径要短得多,并且可以进行临时数据分析。

Spark 的其他主要功能包括:

  • 目前提供 Scala、Java 和 Python 的 API,并正在支持其他语言(例如 R)
  • 与 Hadoop 生态系统和数据源(HDFS、Amazon S3、Hive、HBase、Cassandra 等)很好地集成
  • 可以在由 Hadoop YARN 或 Apache Mesos 管理的集群上运行,也可以独立运行

Spark 核心由一组功能强大的高级库补充,这些库可以在同一应用程序中无缝使用。 这些库目前包括 SparkSQL、Spark Streaming、MLlib(用于机器学习)和 GraphX,本文将进一步详细介绍这些库。 其他 Spark 库和扩展目前也在开发中。

火花库和扩展

火花核心

Spark Core 是大规模并行和分布式数据处理的基础引擎。 它负责:

  • 内存管理和故障恢复
  • 在集群上调度、分发和监控作业
  • 与存储系统交互

Spark 引入了 RDD(弹性分布式数据集)的概念,这是一个不可变的容错分布式对象集合,可以并行操作。 RDD 可以包含任何类型的对象,它是通过加载外部数据集或从驱动程序分发集合来创建的。

RDD 支持两种类型的操作:

  • 转换是在 RDD 上执行并产生包含结果的新 RDD 的操作(例如 map、filter、join、union 等)。
  • 操作是在 RDD 上运行计算后返回值的操作(例如 reduce、count、first 等)。

Spark 中的转换是“惰性的”,这意味着它们不会立即计算结果。 相反,他们只是“记住”要执行的操作和要执行操作的数据集(例如,文件)。 只有在调用动作并将结果返回给驱动程序时才会实际计算转换。 这种设计使 Spark 能够更高效地运行。 例如,如果一个大文件以各种方式被转换并传递给第一个操作,Spark 将只处理并返回第一行的结果,而不是对整个文件进行工作。

默认情况下,每个转换后的 RDD 可能会在您每次对其运行操作时重新计算。 但是,您也可以使用 persist 或 cache 方法将 RDD 持久化到内存中,在这种情况下,Spark 会将元素保留在集群中,以便下次查询时更快地访问它。

火花SQL

SparkSQL 是一个 Spark 组件,支持通过 SQL 或 Hive 查询语言查询数据。 它最初是作为在 Spark 之上运行的 Apache Hive 端口(代替 MapReduce),现在与 Spark 堆栈集成。 除了提供对各种数据源的支持外,它还可以将 SQL 查询与代码转换结合起来,从而形成一个非常强大的工具。 以下是 Hive 兼容查询的示例:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

火花流

Spark Streaming 支持流数据的实时处理,例如生产网络服务器日志文件(例如 Apache Flume 和 HDFS/S3)、Twitter 等社交媒体以及 Kafka 等各种消息队列。 在底层,Spark Streaming 接收输入数据流并将数据分成批次。 接下来,它们由 Spark 引擎处理并批量生成最终结果流,如下图所示。

火花流

Spark Streaming API 与 Spark Core 的 API 非常匹配,使程序员可以轻松地在批处理和流数据的世界中工作。

MLlib

MLlib 是一个机器学习库,它提供了各种算法,旨在扩展集群以进行分类、回归、聚类、协同过滤等(有关该主题的更多信息,请查看 Toptal 关于机器学习的文章)。 其中一些算法也适用于流数据,例如使用普通最小二乘或 k-means 聚类的线性回归(以及更多的方法)。 Apache Mahout(Hadoop 的机器学习库)已经远离 MapReduce,并在 Spark MLlib 上联手。

图X

图x

GraphX 是一个用于操作图形和执行图形并行操作的库。 它为 ETL、探索性分析和迭代图计算提供了一个统一的工具。 除了用于图形操作的内置操作外,它还提供了一个常见的图形算法库,例如 PageRank。

如何使用 Apache Spark:事件检测用例

既然我们已经回答了“什么是 Apache Spark?”这个问题,让我们想想它可以最有效地用于解决什么样的问题或挑战。

我最近看到一篇关于通过分析 Twitter 流来检测地震的实验的文章。 有趣的是,这项技术可能会比日本气象厅更快地通知您日本发生地震。 尽管他们在文章中使用了不同的技术,但我认为这是一个很好的示例,可以让我们了解如何使用简化的代码片段而不使用胶水代码来使用 Spark。

首先,我们必须过滤那些看起来相关的推文,比如“地震”或“震动”。 为此,我们可以轻松地使用 Spark Streaming,如下所示:

 TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

然后,我们必须对推文进行一些语义分析,以确定它们是否似乎引用了当前的地震事件。 诸如“地震!”之类的推文例如,“现在它在震动”会被认为是正匹配,而诸如“参加地震会议”或“昨天的地震很可怕”之类的推文则不会。 该论文的作者为此使用了支持向量机 (SVM)。 我们将在这里做同样的事情,但也可以尝试流媒体版本。 MLlib 生成的代码示例如下所示:

 // We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)

如果我们对模型的预测率感到满意,我们可以进入下一阶段,并在发现地震时做出反应。 为了检测一个,我们需要在定义的时间窗口(如文章中所述)内一定数量(即密度)的正面推文。 请注意,对于启用了 Twitter 位置服务的推文,我们还将提取地震的位置。 有了这些知识,我们可以使用 SparkSQL 并查询现有的 Hive 表(存储有兴趣接收地震通知的用户)以检索他们的电子邮件地址并向他们发送个性化的警告电子邮件,如下所示:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

其他 Apache Spark 用例

当然,Spark 的潜在用例远远超出了检测地震的范围。

以下是需要处理大数据的速度、种类和数量的其他用例的快速(但肯定远非详尽!)示例,Spark 非常适合这些用例:

在游戏行业中,处理和发现潜在的实时游戏内事件的模式并能够立即做出响应是一种可以产生利润丰厚的业务的能力,例如玩家保留、定向广告、汽车- 复杂程度的调整,等等。

在电子商务行业,实时交易信息可以传递给流式聚类算法(如 k-means)或协同过滤(如 ALS)。 结果甚至可以与其他非结构化数据源(例如客户评论或产品评论)相结合,并用于随着时间的推移不断改进和调整建议以适应新趋势。

在金融或安全行业,Spark 堆栈可应用于欺诈或入侵检测系统或基于风险的身份验证。 它可以通过收集大量存档日志,将其与外部数据源(例如有关数据泄露和受损帐户的信息(例如,参见 https://haveibeenpwned.com/)和来自连接的信息/请求,例如 IP 地理位置或时间。

结论

总而言之,Spark 有助于简化处理大量实时或存档数据(包括结构化和非结构化数据)的挑战性和计算密集型任务,无缝集成相关的复杂功能,例如机器学习和图形算法。 Spark 将大数据处理带给大众。 看看这个!