Wprowadzenie do Apache Spark z przykładami i przypadkami użycia

Opublikowany: 2022-03-11

Po raz pierwszy usłyszałem o Spark pod koniec 2013 roku, kiedy zainteresowałem się Scala, językiem, w którym napisano Spark. Jakiś czas później wykonałem zabawny projekt z zakresu analizy danych, próbując przewidzieć przetrwanie na Titanicu. Okazało się to świetnym sposobem na dalsze zapoznanie się z koncepcjami i programowaniem Sparka. Gorąco polecam wszystkim początkującym programistom Sparka szukającym miejsca, w którym można zacząć.

Obecnie Spark jest przyjmowany przez głównych graczy, takich jak Amazon, eBay i Yahoo! Wiele organizacji korzysta z platformy Spark w klastrach z tysiącami węzłów. Według FAQ Sparka, największy znany klaster ma ponad 8000 węzłów. Rzeczywiście, Spark to technologia, na którą warto zwrócić uwagę i poznać ją.

Ten artykuł zawiera wprowadzenie do platformy Spark, w tym przypadki użycia i przykłady. Zawiera informacje ze strony internetowej Apache Spark oraz książki Learning Spark - Lightning-Fast Big Data Analysis.

Co to jest Apache Spark? Wstęp

Spark to projekt Apache reklamowany jako „błyskawicznie szybkie przetwarzanie klastrowe”. Ma dobrze prosperującą społeczność open-source i jest obecnie najbardziej aktywnym projektem Apache.

Spark zapewnia szybszą i bardziej ogólną platformę przetwarzania danych. Spark pozwala uruchamiać programy do 100 razy szybciej w pamięci lub 10 razy szybciej na dysku niż Hadoop. W zeszłym roku Spark przejął Hadoop, kończąc konkurs Daytona GraySort 100 TB 3 razy szybciej na jednej dziesiątej liczby maszyn, a także stał się najszybszym silnikiem open source do sortowania petabajta.

Spark umożliwia również szybsze pisanie kodu, ponieważ masz do dyspozycji ponad 80 operatorów wysokiego poziomu. Aby to zademonstrować, spójrzmy na „Hello World!” BigData: przykład Word Count. Napisany w Javie dla MapReduce ma około 50 linii kodu, podczas gdy w Spark (i Scala) możesz to zrobić tak prosto, jak to:

 sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

Innym ważnym aspektem przy nauce korzystania z Apache Spark jest interaktywna powłoka (REPL), którą zapewnia on po wyjęciu z pudełka. Korzystając z REPL, można przetestować wynik każdego wiersza kodu bez konieczności wcześniejszego kodowania i wykonywania całego zadania. Ścieżka do działającego kodu jest więc znacznie krótsza i możliwa jest analiza danych ad-hoc.

Dodatkowe kluczowe funkcje Sparka obejmują:

  • Obecnie udostępnia interfejsy API w Scali, Javie i Pythonie, a po drodze obsługuje inne języki (takie jak R)
  • Dobrze integruje się z ekosystemem Hadoop i źródłami danych (HDFS, Amazon S3, Hive, HBase, Cassandra itp.)
  • Może działać w klastrach zarządzanych przez Hadoop YARN lub Apache Mesos, a także może działać samodzielnie

Rdzeń Spark jest uzupełniony zestawem potężnych bibliotek wyższego poziomu, które można bezproblemowo używać w tej samej aplikacji. Te biblioteki obejmują obecnie SparkSQL, Spark Streaming, MLlib (do uczenia maszynowego) i GraphX, z których każda jest szczegółowo opisana w tym artykule. Obecnie trwają również prace nad dodatkowymi bibliotekami i rozszerzeniami Spark.

Spark biblioteki i rozszerzenia

Rdzeń iskry

Spark Core to podstawowy aparat do równoległego i rozproszonego przetwarzania danych na dużą skalę. Odpowiada za:

  • zarządzanie pamięcią i odzyskiwanie po awarii
  • planowanie, dystrybucja i monitorowanie zadań w klastrze
  • współdziałanie z systemami pamięci masowej

Spark wprowadza koncepcję RDD (Resilient Distributed Dataset), niezmiennej, odpornej na awarie, rozproszonej kolekcji obiektów, na których można operować równolegle. RDD może zawierać dowolny typ obiektu i jest tworzony przez ładowanie zewnętrznego zestawu danych lub dystrybucję kolekcji z programu sterownika.

RDD obsługują dwa rodzaje operacji:

  • Transformacje to operacje (takie jak mapowanie, filtrowanie, łączenie, sumowanie itd.), które są wykonywane na RDD i które dają nowy RDD zawierający wynik.
  • Akcje to operacje (takie jak zmniejszenie, zliczenie, pierwsze itd.), które zwracają wartość po uruchomieniu obliczeń na RDD.

Przekształcenia w Spark są „leniwe”, co oznacza, że ​​nie obliczają swoich wyników od razu. Zamiast tego po prostu „zapamiętują” operację do wykonania i zbiór danych (np. plik), na którym operacja ma zostać wykonana. Przekształcenia są faktycznie obliczane tylko wtedy, gdy wywoływana jest akcja, a wynik jest zwracany do programu sterującego. Ten projekt umożliwia wydajniejsze działanie Sparka. Na przykład, jeśli duży plik został przekształcony na różne sposoby i przekazany do pierwszej akcji, Spark przetworzy i zwróci wynik tylko dla pierwszego wiersza, a nie wykona pracę dla całego pliku.

Domyślnie każdy przekształcony RDD może być przeliczany za każdym razem, gdy uruchamiasz na nim akcję. Można jednak również utrwalić RDD w pamięci przy użyciu metody utrwalania lub pamięci podręcznej, w którym to przypadku Spark zachowa elementy w klastrze, aby uzyskać znacznie szybszy dostęp przy następnym zapytaniu.

SparkSQL

SparkSQL to składnik Spark, który obsługuje zapytania o dane za pośrednictwem języka SQL lub języka zapytań Hive. Powstał jako port Apache Hive do uruchamiania na platformie Spark (zamiast MapReduce) i jest teraz zintegrowany ze stosem Spark. Oprócz obsługi różnych źródeł danych, umożliwia tkanie zapytań SQL z przekształceniami kodu, co daje w efekcie bardzo potężne narzędzie. Poniżej znajduje się przykład zapytania zgodnego z Hive:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming

Spark Streaming obsługuje przetwarzanie danych przesyłanych strumieniowo w czasie rzeczywistym, takich jak pliki dziennika serwera produkcyjnego (np. Apache Flume i HDFS/S3), media społecznościowe, takie jak Twitter, oraz różne kolejki wiadomości, takie jak Kafka. Pod maską Spark Streaming odbiera strumienie danych wejściowych i dzieli je na partie. Następnie są przetwarzane przez silnik Spark i generują końcowy strumień wyników w partiach, jak pokazano poniżej.

Spark streaming

Spark Streaming API jest bardzo zbliżony do Spark Core, co ułatwia programistom pracę w światach zarówno danych wsadowych, jak i strumieniowych.

MLlib

MLlib to biblioteka uczenia maszynowego, która zapewnia różne algorytmy zaprojektowane do skalowania w klastrze w celu klasyfikacji, regresji, grupowania, filtrowania zespołowego itd. (więcej informacji na ten temat można znaleźć w artykule Toptal na temat uczenia maszynowego). Niektóre z tych algorytmów działają również z danymi strumieniowymi, takimi jak regresja liniowa wykorzystująca zwykłe grupowanie metodą najmniejszych kwadratów lub k-średnich (i więcej w drodze). Apache Mahout (biblioteka uczenia maszynowego dla Hadoop) już odszedł od MapReduce i połączył siły w Spark MLlib.

WykresX

wykresx

GraphX ​​to biblioteka do manipulowania wykresami i wykonywania operacji równoległych do wykresu. Zapewnia jednolite narzędzie do ETL, analizy eksploracyjnej i iteracyjnych obliczeń grafów. Oprócz wbudowanych operacji służących do manipulacji wykresami, udostępnia bibliotekę popularnych algorytmów wykresów, takich jak PageRank.

Jak korzystać z Apache Spark: przypadek użycia wykrywania zdarzeń

Teraz, gdy odpowiedzieliśmy na pytanie „Czym jest Apache Spark?”, zastanówmy się, do jakich problemów lub wyzwań można go najefektywniej wykorzystać.

Niedawno natknąłem się na artykuł o eksperymencie mającym na celu wykrycie trzęsienia ziemi poprzez analizę strumienia na Twitterze. Co ciekawe, wykazano, że ta technika prawdopodobnie poinformuje cię o trzęsieniu ziemi w Japonii szybciej niż Japońska Agencja Meteorologiczna. Mimo że w swoim artykule użyli innej technologii, myślę, że jest to świetny przykład, aby zobaczyć, jak możemy wykorzystać Spark z uproszczonymi fragmentami kodu i bez kodu kleju.

Najpierw musielibyśmy odfiltrować tweety, które wydają się istotne, takie jak „trzęsienie ziemi” lub „wstrząsanie”. Moglibyśmy z łatwością użyć do tego celu Spark Streaming w następujący sposób:

 TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

Następnie musielibyśmy przeprowadzić analizę semantyczną tweetów, aby określić, czy wydają się odnosić do obecnego trzęsienia ziemi. Tweety typu „Trzęsienie ziemi!” lub na przykład „Teraz się trzęsie” można uznać za pozytywne dopasowania, podczas gdy tweety takie jak „Uczestnictwo w konferencji poświęconej trzęsieniu ziemi” lub „Wczorajsze trzęsienie ziemi było przerażające” nie. Autorzy artykułu wykorzystali w tym celu maszynę wektorów nośnych (SVM). Zrobimy to samo tutaj, ale możemy również wypróbować wersję strumieniową. Wynikowy przykład kodu z MLlib wyglądałby następująco:

 // We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)

Jeśli jesteśmy zadowoleni z szybkości przewidywania modelu, możemy przejść do następnego etapu i zareagować za każdym razem, gdy odkryjemy trzęsienie ziemi. Aby go wykryć, potrzebujemy określonej liczby (tj. gęstości) pozytywnych tweetów w określonym oknie czasowym (jak opisano w artykule). Pamiętaj, że w przypadku tweetów z włączonymi usługami lokalizacyjnymi na Twitterze wyodrębnimy również lokalizację trzęsienia ziemi. Uzbrojeni w tę wiedzę, moglibyśmy użyć SparkSQL i wysłać zapytanie do istniejącej tabeli Hive (przechowującej użytkowników zainteresowanych otrzymywaniem powiadomień o trzęsieniach ziemi), aby pobrać ich adresy e-mail i wysłać im spersonalizowany e-mail z ostrzeżeniem w następujący sposób:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

Inne przypadki użycia Apache Spark

Potencjalne przypadki użycia Sparka wykraczają oczywiście daleko poza wykrywanie trzęsień ziemi.

Oto szybkie (ale na pewno nie wyczerpujące!) próbkowanie innych przypadków użycia, które wymagają radzenia sobie z prędkością, różnorodnością i objętością Big Data, do których Spark jest tak dobrze przystosowany:

W branży gier przetwarzanie i odkrywanie wzorców z potencjalnego węża strażackiego zdarzeń w czasie rzeczywistym oraz możliwość natychmiastowej reakcji na nie jest zdolnością, która może zaowocować lukratywnym biznesem, w celach takich jak zatrzymanie graczy, ukierunkowana reklama, auto -dopasowanie poziomu złożoności i tak dalej.

W branży e-commerce informacje o transakcjach w czasie rzeczywistym mogą być przekazywane do algorytmu klastrowania strumieniowego, takiego jak k-średnie lub filtrowania grupowego, takiego jak ALS. Wyniki można następnie połączyć z innymi nieustrukturyzowanymi źródłami danych, takimi jak komentarze klientów lub recenzje produktów, i wykorzystać do ciągłego ulepszania i dostosowywania rekomendacji w miarę upływu czasu do nowych trendów.

W branży finansowej lub bezpieczeństwa stos Spark można zastosować do systemu wykrywania oszustw lub włamań lub uwierzytelniania opartego na ryzyku. Może osiągnąć najlepsze wyniki, zbierając ogromne ilości zarchiwizowanych dzienników, łącząc je z zewnętrznymi źródłami danych, takimi jak informacje o naruszeniach bezpieczeństwa danych i kontach (patrz na przykład https://haveibeenpwned.com/) oraz informacje z połączenia/ żądanie, takie jak geolokalizacja IP lub czas.

Wniosek

Podsumowując, Spark pomaga uprościć trudne i wymagające obliczeniowo zadanie przetwarzania dużych ilości danych w czasie rzeczywistym lub danych archiwalnych, zarówno ustrukturyzowanych, jak i nieustrukturyzowanych, bezproblemowo integrując odpowiednie złożone możliwości, takie jak uczenie maszynowe i algorytmy wykresów. Spark udostępnia masom przetwarzanie Big Data. Sprawdź to!