例と使用例を含むApacheSparkの概要

公開: 2022-03-11

Sparkのことを最初に聞いたのは、Sparkが書かれている言語であるScalaに興味を持った2013年後半です。 しばらくして、私はタイタニック号での生存を予測しようとする楽しいデータサイエンスプロジェクトを行いました。 これは、Sparkの概念とプログラミングをさらに紹介するための優れた方法であることがわかりました。 始める場所を探している意欲的なSpark開発者にはこれを強くお勧めします。

今日、SparkはAmazon、eBay、Yahoo!などの主要なプレーヤーに採用されています。 多くの組織は、数千のノードを持つクラスターでSparkを実行しています。 Spark FAQによると、既知の最大のクラスターには8000を超えるノードがあります。 確かに、Sparkは、注意して学ぶ価値のあるテクノロジーです。

この記事では、ユースケースと例を含むSparkの概要を説明します。 これには、Apache Spark Webサイトの情報と、Learning Spark-Lightning-Fast BigDataAnalysisという本が含まれています。

Apache Sparkとは何ですか? はじめに

Sparkは、「超高速クラスターコンピューティング」として宣伝されているApacheプロジェクトです。 活発なオープンソースコミュニティがあり、現時点で最も活発なApacheプロジェクトです。

Sparkは、より高速でより一般的なデータ処理プラットフォームを提供します。 Sparkを使用すると、Hadoopよりもメモリ内で最大100倍、ディスク上で最大10倍高速にプログラムを実行できます。 昨年、Sparkは100 TB Daytona GraySortコンテストを10分の1のマシン数で3倍速く完了することで、Hadoopを引き継ぎ、ペタバイトをソートするための最速のオープンソースエンジンにもなりました。

また、Sparkを使用すると、80を超える高レベルのオペレーターを自由に使用できるため、コードをより迅速に作成できます。 これを実証するために、「HelloWorld!」を見てみましょう。 ビッグデータの例:単語数の例。 MapReduce用にJavaで記述されているため、約50行のコードが含まれていますが、Spark(およびScala)では、次のように簡単に実行できます。

 sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

Apache Sparkの使用方法を学ぶ際のもう1つの重要な側面は、すぐに使用できるインタラクティブシェル(REPL)です。 REPLを使用すると、最初にコードを記述してジョブ全体を実行しなくても、コードの各行の結果をテストできます。 したがって、動作するコードへのパスははるかに短くなり、アドホックなデータ分析が可能になります。

Sparkのその他の主な機能は次のとおりです。

  • 現在、Scala、Java、PythonでAPIを提供しており、途中で他の言語(Rなど)をサポートしています。
  • Hadoopエコシステムおよびデータソース(HDFS、Amazon S3、Hive、HBase、Cassandraなど)とうまく統合します
  • HadoopYARNまたはApacheMesosによって管理されているクラスターで実行でき、スタンドアロンで実行することもできます

Sparkコアは、同じアプリケーションでシームレスに使用できる強力な高レベルのライブラリのセットによって補完されます。 これらのライブラリには現在、SparkSQL、Spark Streaming、MLlib(機械学習用)、およびGraphXが含まれています。これらのそれぞれについて、この記事で詳しく説明します。 追加のSparkライブラリと拡張機能も現在開発中です。

Sparkライブラリと拡張機能

スパークコア

Spark Coreは、大規模な並列および分散データ処理のベースエンジンです。 それは責任があります:

  • メモリ管理と障害回復
  • クラスターでのジョブのスケジューリング、分散、および監視
  • ストレージシステムとの相互作用

Sparkは、並列で操作できるオブジェクトの不変のフォールトトレラントな分散コレクションであるRDD(Resilient Distributed Dataset)の概念を導入しています。 RDDには任意のタイプのオブジェクトを含めることができ、外部データセットをロードするか、ドライバープログラムからコレクションを配布することによって作成されます。

RDDは、次の2種類の操作をサポートします。

  • 変換は、RDDで実行され、結果を含む新しいRDDを生成する操作(マップ、フィルター、結合、結合など)です。
  • アクションは、RDDで計算を実行した後に値を返す操作(reduce、count、firstなど)です。

Sparkの変換は「遅延」です。つまり、結果をすぐに計算しません。 代わりに、実行する操作と、操作を実行するデータセット(ファイルなど)を「記憶」するだけです。 変換は、アクションが呼び出され、結果がドライバープログラムに返されるときにのみ実際に計算されます。 この設計により、Sparkをより効率的に実行できます。 たとえば、大きなファイルがさまざまな方法で変換されて最初のアクションに渡された場合、Sparkはファイル全体に対して作業を行うのではなく、最初の行の結果のみを処理して返します。

デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算される場合があります。 ただし、persistまたはcacheメソッドを使用してRDDをメモリに永続化することもできます。その場合、Sparkは要素をクラスター上に保持し、次にクエリを実行するときにアクセスを大幅に高速化します。

SparkSQL

SparkSQLは、SQLまたはHiveクエリ言語を介したデータのクエリをサポートするSparkコンポーネントです。 これは、(MapReduceの代わりに)Spark上で実行されるApache Hiveポートとして始まり、現在はSparkスタックと統合されています。 さまざまなデータソースのサポートを提供することに加えて、SQLクエリをコード変換で織り込むことが可能になり、非常に強力なツールになります。 以下は、Hive互換クエリの例です。

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

スパークストリーミング

Spark Streamingは、本番Webサーバーのログファイル(ApacheFlumeやHDFS/ S3など)、Twitterなどのソーシャルメディア、Kafkaなどのさまざまなメッセージングキューなどのストリーミングデータのリアルタイム処理をサポートします。 内部的には、Spark Streamingは入力データストリームを受信し、データをバッチに分割します。 次に、それらはSparkエンジンによって処理され、以下に示すように、結果の最終ストリームをバッチで生成します。

スパークストリーミング

Spark StreamingAPIはSparkCoreのAPIと厳密に一致しているため、プログラマーはバッチデータとストリーミングデータの両方の世界で簡単に作業できます。

MLlib

MLlibは、分類、回帰、クラスタリング、協調フィルタリングなどのためにクラスター上でスケールアウトするように設計されたさまざまなアルゴリズムを提供する機械学習ライブラリです(このトピックの詳細については、機械学習に関するToptalの記事をご覧ください)。 これらのアルゴリズムの一部は、通常の最小二乗法やk-meansクラスタリング(およびその他の方法)を使用した線形回帰など、ストリーミングデータでも機能します。 Apache Mahout(Hadoop用の機械学習ライブラリ)はすでにMapReduceから離れ、SparkMLlibに参加しています。

GraphX

グラフx

GraphXは、グラフを操作し、グラフ並列操作を実行するためのライブラリです。 これは、ETL、探索的分析、および反復グラフ計算のための統一されたツールを提供します。 グラフ操作の組み込み操作とは別に、PageRankなどの一般的なグラフアルゴリズムのライブラリを提供します。

Apache Sparkの使用方法:イベント検出のユースケース

「ApacheSparkとは」という質問に答えたので、どのような問題や課題を最も効果的に使用できるかを考えてみましょう。

最近、ツイッターのストリームを分析して地震を検出する実験についての記事に出くわしました。 興味深いことに、この手法は気象庁よりも早く日本の地震を知らせる可能性が高いことが示されました。 彼らは記事でさまざまなテクノロジーを使用していましたが、単純化されたコードスニペットを使用してグルーコードを使用せずにSparkを使用する方法を確認するのは良い例だと思います。

まず、「地震」や「揺れ」などの関連性があると思われるツイートをフィルタリングする必要があります。 その目的のために、次のようにSparkStreamingを簡単に使用できます。

 TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

次に、ツイートに対してセマンティック分析を実行して、それらが現在の地震の発生を参照しているように見えるかどうかを判断する必要があります。 「地震!」のようなツイートたとえば、「今は揺れている」はポジティブマッチと見なされますが、「地震会議に参加する」や「昨日の地震は怖かった」などのツイートはそうではありません。 この論文の著者は、この目的のためにサポートベクターマシン(SVM)を使用しました。 ここでも同じことを行いますが、ストリーミングバージョンを試すこともできます。 MLlibから得られるコード例は、次のようになります。

 // We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)

モデルの予測率に満足できれば、次の段階に進み、地震を発見するたびに対応することができます。 1つを検出するには、(記事で説明されているように)定義された時間枠内に特定の数(つまり密度)の肯定的なツイートが必要です。 Twitterの位置情報サービスが有効になっているツイートの場合、地震の位置も抽出することに注意してください。 この知識があれば、SparkSQLを使用して、既存のHiveテーブル(地震通知の受信に関心のあるユーザーを格納する)にクエリを実行して、次のように電子メールアドレスを取得し、パーソナライズされた警告電子メールを送信できます。

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

その他のApacheSparkのユースケース

Sparkの潜在的な使用例は、もちろん地震の検出をはるかに超えています。

これは、ビッグデータの速度、多様性、および量を処理する必要がある他のユースケースの簡単な(しかし確かに網羅的ではありません!)サンプリングです。Sparkは非常に適しています。

ゲーム業界では、リアルタイムのゲーム内イベントの潜在的な消防ホースからパターンを処理して発見し、それらに即座に対応できることは、プレーヤーの維持、ターゲットを絞った広告、自動車などの目的で、収益性の高いビジネスを生み出す可能性のある機能です。 -複雑さのレベルの調整など。

電子商取引業界では、リアルタイムのトランザクション情報をk-meansのようなストリーミングクラスタリングアルゴリズムやALSのような協調フィルタリングに渡すことができます。 結果は、顧客のコメントや製品レビューなどの他の非構造化データソースと組み合わせることもでき、新しいトレンドに合わせて推奨事項を継続的に改善および適応させるために使用できます。

金融またはセキュリティ業界では、Sparkスタックを詐欺または侵入検知システムまたはリスクベース認証に適用できます。 大量のアーカイブログを収集し、データ侵害や侵害されたアカウントに関する情報(https://haveibeenpwned.com/などを参照)や接続からの情報などの外部データソースと組み合わせることで、一流の結果を達成できます。 IPジオロケーションや時間などのリクエスト。

結論

要約すると、Sparkは、機械学習やグラフアルゴリズムなどの関連する複雑な機能をシームレスに統合し、構造化データと非構造化データの両方で、大量のリアルタイムデータまたはアーカイブデータを処理するという困難で計算量の多いタスクを簡素化するのに役立ちます。 Sparkはビッグデータ処理を大衆にもたらします。 見てみな!