Введение в Apache Spark с примерами и вариантами использования
Опубликовано: 2022-03-11Впервые я услышал о Spark в конце 2013 года, когда заинтересовался Scala — языком, на котором написан Spark. Некоторое время спустя я сделал забавный проект по науке о данных, пытаясь предсказать выживание на Титанике. Это оказалось отличным способом познакомиться с концепциями и программированием Spark. Я настоятельно рекомендую его всем начинающим разработчикам Spark, которые ищут место для начала работы.
Сегодня Spark используется такими крупными игроками, как Amazon, eBay и Yahoo! Многие организации используют Spark в кластерах с тысячами узлов. Согласно Spark FAQ, самый большой из известных кластеров насчитывает более 8000 узлов. Действительно, Spark — это технология, которую стоит принять к сведению и изучить.
В этой статье представлено введение в Spark, включая варианты использования и примеры. Он содержит информацию с веб-сайта Apache Spark, а также из книги Learning Spark — Lightning-Fast Big Data Analysis.
Что такое Apache Spark? Введение
Spark — это проект Apache, рекламируемый как «молниеносные кластерные вычисления». У него процветающее сообщество разработчиков открытого исходного кода, и на данный момент это самый активный проект Apache.
Spark предоставляет более быструю и общую платформу обработки данных. Spark позволяет запускать программы до 100 раз быстрее в памяти или в 10 раз быстрее на диске, чем Hadoop. В прошлом году Spark взял верх над Hadoop, завершив конкурс Daytona GraySort на 100 ТБ в 3 раза быстрее на одной десятой от числа машин, а также стал самым быстрым движком с открытым исходным кодом для сортировки петабайт.
Spark также позволяет быстрее писать код, поскольку в вашем распоряжении более 80 высокоуровневых операторов. Чтобы продемонстрировать это, давайте посмотрим на «Hello World!» BigData: пример Word Count. Написанный на Java для MapReduce, он содержит около 50 строк кода, тогда как в Spark (и Scala) вы можете сделать это так же просто:
sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")
Другим важным аспектом при изучении использования Apache Spark является интерактивная оболочка (REPL), которую он предоставляет в готовом виде. Используя REPL, можно протестировать результат каждой строки кода без предварительного написания кода и выполнения всего задания. Таким образом, путь к рабочему коду становится намного короче, и становится возможным специальный анализ данных.
Дополнительные ключевые функции Spark включают в себя:
- В настоящее время предоставляет API-интерфейсы на Scala, Java и Python с поддержкой других языков (например, R) на подходе.
- Хорошо интегрируется с экосистемой Hadoop и источниками данных (HDFS, Amazon S3, Hive, HBase, Cassandra и т. д.)
- Может работать в кластерах, управляемых Hadoop YARN или Apache Mesos, а также может работать автономно.
Ядро Spark дополняется набором мощных высокоуровневых библиотек, которые можно беспрепятственно использовать в одном приложении. Эти библиотеки в настоящее время включают SparkSQL, Spark Streaming, MLlib (для машинного обучения) и GraphX, каждая из которых подробно описана в этой статье. В настоящее время также разрабатываются дополнительные библиотеки и расширения Spark.
Искровое ядро
Spark Core — это базовый механизм для крупномасштабной параллельной и распределенной обработки данных. Он отвечает за:
- управление памятью и восстановление после сбоев
- планирование, распределение и мониторинг заданий в кластере
- взаимодействие с системами хранения
Spark представляет концепцию RDD (Resilient Distributed Dataset), неизменной отказоустойчивой распределенной коллекции объектов, с которыми можно работать параллельно. RDD может содержать объект любого типа и создается путем загрузки внешнего набора данных или распространения коллекции из программы-драйвера.
RDD поддерживают два типа операций:
- Преобразования — это операции (такие как сопоставление, фильтрация, объединение, объединение и т. д.), которые выполняются над СДР и создают новый СДР, содержащий результат.
- Действия — это операции (такие как уменьшение, подсчет, первый и т. д.), которые возвращают значение после выполнения вычисления в СДР.
Преобразования в Spark являются «ленивыми», что означает, что они не вычисляют свои результаты сразу. Вместо этого они просто «запоминают» операцию, которую необходимо выполнить, и набор данных (например, файл), над которым должна быть выполнена операция. Преобразования фактически вычисляются только тогда, когда вызывается действие, и результат возвращается в программу-драйвер. Такая конструкция позволяет Spark работать более эффективно. Например, если большой файл был преобразован различными способами и передан первому действию, Spark обработает и вернет результат только для первой строки, а не для всего файла.
По умолчанию каждый преобразованный RDD может пересчитываться каждый раз, когда вы запускаете над ним действие. Однако вы также можете сохранить RDD в памяти, используя метод сохранения или кэширования, и в этом случае Spark сохранит элементы в кластере для гораздо более быстрого доступа при следующем запросе.
SparkSQL
SparkSQL — это компонент Spark, который поддерживает запросы данных либо через SQL, либо через язык запросов Hive. Он возник как порт Apache Hive для работы поверх Spark (вместо MapReduce) и теперь интегрирован со стеком Spark. Помимо обеспечения поддержки различных источников данных, он позволяет объединять SQL-запросы с преобразованиями кода, в результате чего получается очень мощный инструмент. Ниже приведен пример запроса, совместимого с Hive:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
Искра потокового
Spark Streaming поддерживает обработку потоковых данных в режиме реального времени, таких как файлы журнала производственного веб-сервера (например, Apache Flume и HDFS/S3), социальные сети, такие как Twitter, и различные очереди обмена сообщениями, такие как Kafka. Под капотом Spark Streaming получает входные потоки данных и делит данные на пакеты. Затем они обрабатываются движком Spark и генерируют окончательный поток результатов пакетами, как показано ниже.

Spark Streaming API очень похож на Spark Core, что позволяет программистам легко работать как с пакетными, так и с потоковыми данными.
MLlib
MLlib — это библиотека машинного обучения, которая предоставляет различные алгоритмы, предназначенные для масштабирования в кластере для классификации, регрессии, кластеризации, совместной фильтрации и т. д. (дополнительную информацию по этой теме см. в статье Toptal о машинном обучении). Некоторые из этих алгоритмов также работают с потоковыми данными, например, линейная регрессия с использованием обычного метода наименьших квадратов или кластеризация k-средних (и многое другое в процессе). Apache Mahout (библиотека машинного обучения для Hadoop) уже отказалась от MapReduce и объединила усилия над Spark MLlib.
ГрафикX
GraphX — это библиотека для управления графами и выполнения граф-параллельных операций. Он предоставляет единый инструмент для ETL, исследовательского анализа и итеративных вычислений графов. Помимо встроенных операций для манипулирования графами, он предоставляет библиотеку общих алгоритмов графов, таких как PageRank.
Как использовать Apache Spark: пример использования обнаружения событий
Теперь, когда мы ответили на вопрос «Что такое Apache Spark?», давайте подумаем, для решения каких проблем или задач его можно использовать наиболее эффективно.
Недавно я наткнулся на статью об эксперименте по обнаружению землетрясения путем анализа потока в Твиттере. Интересно, что было показано, что этот метод, скорее всего, сообщит вам о землетрясении в Японии быстрее, чем Японское метеорологическое агентство. Несмотря на то, что в своей статье они использовали другую технологию, я думаю, что это отличный пример того, как мы можем использовать Spark с упрощенными фрагментами кода и без связующего кода.
Во-первых, нам придется фильтровать твиты, которые кажутся релевантными, например, «землетрясение» или «тряска». Мы могли бы легко использовать Spark Streaming для этой цели следующим образом:
TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
Затем нам нужно будет провести некоторый семантический анализ твитов, чтобы определить, относятся ли они к текущему землетрясению. Твиты вроде «Землетрясение!» или «Теперь трясет», например, будут считаться положительными совпадениями, тогда как твиты, такие как «Посещение конференции по землетрясениям» или «Вчерашнее землетрясение было страшным», — нет. Авторы статьи использовали для этой цели машину опорных векторов (SVM). Мы сделаем то же самое здесь, но также можем попробовать потоковую версию. Результирующий пример кода из MLlib будет выглядеть следующим образом:
// We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)
Если нас устраивает скорость предсказания модели, мы можем перейти к следующему этапу и реагировать всякий раз, когда обнаруживаем землетрясение. Чтобы его обнаружить, нам нужно определенное количество (то есть плотность) положительных твитов в определенное временное окно (как описано в статье). Обратите внимание, что для твитов с включенными службами определения местоположения Twitter мы также извлекли бы местоположение землетрясения. Вооружившись этими знаниями, мы могли бы использовать SparkSQL и запросить существующую таблицу Hive (в которой хранятся пользователи, заинтересованные в получении уведомлений о землетрясениях), чтобы получить их адреса электронной почты и отправить им персонализированное электронное письмо с предупреждением, как показано ниже:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
Другие варианты использования Apache Spark
Конечно, потенциальные варианты использования Spark выходят далеко за рамки обнаружения землетрясений.
Вот краткий (но далеко не исчерпывающий!) пример других вариантов использования, требующих работы со скоростью, разнообразием и объемом больших данных, для которых так хорошо подходит Spark:
В игровой индустрии обработка и обнаружение шаблонов из потенциального потока внутриигровых событий в реальном времени и возможность немедленно реагировать на них — это возможность, которая может привести к прибыльному бизнесу для таких целей, как удержание игроков, таргетированная реклама, автоматизация. -регулировка уровня сложности и так далее.
В индустрии электронной коммерции информация о транзакциях в реальном времени может быть передана алгоритму потоковой кластеризации, такому как k-means, или совместной фильтрации, такой как ALS. Затем результаты можно было даже объединить с другими неструктурированными источниками данных, такими как комментарии клиентов или обзоры продуктов, и использовать для постоянного улучшения и адаптации рекомендаций с течением времени с учетом новых тенденций.
В сфере финансов или безопасности стек Spark может быть применен к системе обнаружения мошенничества или вторжений или аутентификации на основе рисков. Он может достичь первоклассных результатов, собирая огромное количество архивных журналов, объединяя их с внешними источниками данных, такими как информация об утечках данных и скомпрометированных учетных записях (см., например, https://haveibeenpwned.com/) и информация из соединения/ запрос, такой как IP-геолокация или время.
Заключение
Подводя итог, Spark помогает упростить сложную и ресурсоемкую задачу обработки больших объемов данных в режиме реального времени или архивных данных, как структурированных, так и неструктурированных, легко интегрируя соответствующие сложные возможности, такие как машинное обучение и алгоритмы графов. Spark приносит обработку больших данных в массы. Проверьте это!