Introducere în Apache Spark cu exemple și cazuri de utilizare

Publicat: 2022-03-11

Am auzit prima dată de Spark la sfârșitul anului 2013, când m-am interesat de Scala, limba în care este scris Spark. Un timp mai târziu, am făcut un proiect distractiv de știință a datelor, încercând să prezic supraviețuirea pe Titanic. Aceasta s-a dovedit a fi o modalitate excelentă de a vă prezenta în continuare conceptele și programarea Spark. Îl recomand cu căldură oricăror aspiranți dezvoltatori Spark care caută un loc pentru a începe.

Astăzi, Spark este adoptat de jucători importanți precum Amazon, eBay și Yahoo! Multe organizații rulează Spark pe clustere cu mii de noduri. Conform FAQ Spark, cel mai mare cluster cunoscut are peste 8000 de noduri. Într-adevăr, Spark este o tehnologie care merită luată în considerare și despre care merită să înveți.

Acest articol oferă o introducere în Spark, inclusiv cazuri de utilizare și exemple. Conține informații de pe site-ul web Apache Spark, precum și cartea Learning Spark - Lightning-Fast Big Data Analysis.

Ce este Apache Spark? O introducere

Spark este un proiect Apache promovat drept „computing cluster fulgerător”. Are o comunitate open-source înfloritoare și este cel mai activ proiect Apache în acest moment.

Spark oferă o platformă de procesare a datelor mai rapidă și mai generală. Spark vă permite să rulați programe de până la 100 de ori mai rapid în memorie sau de 10 ori mai rapid pe disc decât Hadoop. Anul trecut, Spark a preluat Hadoop prin finalizarea concursului Daytona GraySort de 100 TB de 3 ori mai rapid cu o zecime din numărul de mașini și a devenit, de asemenea, cel mai rapid motor open source pentru sortarea unui petabyte.

De asemenea, Spark face posibilă scrierea codului mai rapid, deoarece aveți la dispoziție peste 80 de operatori de nivel înalt. Pentru a demonstra acest lucru, să aruncăm o privire la „Hello World!” de BigData: exemplul Număr de cuvinte. Scris în Java pentru MapReduce, are aproximativ 50 de linii de cod, în timp ce în Spark (și Scala) o puteți face la fel de simplu:

 sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

Un alt aspect important atunci când învățați cum să utilizați Apache Spark este shell-ul interactiv (REPL) pe care îl oferă imediat. Folosind REPL, se poate testa rezultatul fiecărei linii de cod fără a fi nevoie mai întâi de a codifica și de a executa întregul job. Calea către codul de lucru este astfel mult mai scurtă și este posibilă analiza ad-hoc a datelor.

Caracteristicile cheie suplimentare ale Spark includ:

  • În prezent oferă API-uri în Scala, Java și Python, cu suport pentru alte limbi (cum ar fi R) pe drum
  • Se integrează bine cu ecosistemul Hadoop și cu sursele de date (HDFS, Amazon S3, Hive, HBase, Cassandra etc.)
  • Poate rula pe clustere gestionate de Hadoop YARN sau Apache Mesos și poate rula și independent

Nucleul Spark este completat de un set de biblioteci puternice, de nivel superior, care pot fi utilizate fără probleme în aceeași aplicație. Aceste biblioteci includ în prezent SparkSQL, Spark Streaming, MLlib (pentru învățare automată) și GraphX, fiecare dintre acestea fiind detaliat în acest articol. Biblioteci și extensii Spark suplimentare sunt, de asemenea, în curs de dezvoltare.

biblioteci și extensii spark

Spark Core

Spark Core este motorul de bază pentru procesarea datelor distribuite și paralele la scară largă. Este responsabil pentru:

  • managementul memoriei și recuperarea erorilor
  • programarea, distribuirea și monitorizarea joburilor pe un cluster
  • interacționarea cu sistemele de stocare

Spark introduce conceptul de RDD (Resilient Distributed Dataset), o colecție distribuită imuabilă, tolerantă la erori, care poate fi operată în paralel. Un RDD poate conține orice tip de obiect și este creat prin încărcarea unui set de date extern sau distribuirea unei colecții din programul driver.

RDD-urile acceptă două tipuri de operațiuni:

  • Transformările sunt operațiuni (cum ar fi hartă, filtru, unire, unire și așa mai departe) care sunt efectuate pe un RDD și care generează un nou RDD care conține rezultatul.
  • Acțiunile sunt operațiuni (cum ar fi reducere, numărare, primul și așa mai departe) care returnează o valoare după rularea unui calcul pe un RDD.

Transformările din Spark sunt „leneșe”, ceea ce înseamnă că nu își calculează rezultatele imediat. În schimb, ei doar „își amintesc” operația care trebuie efectuată și setul de date (de exemplu, fișierul) în care urmează să fie efectuată operația. Transformările sunt de fapt calculate numai atunci când o acțiune este apelată și rezultatul este returnat programului driver. Acest design permite Spark să ruleze mai eficient. De exemplu, dacă un fișier mare a fost transformat în diferite moduri și trecut la prima acțiune, Spark ar procesa și returna rezultatul doar pentru prima linie, mai degrabă decât să facă treaba pentru întreg fișierul.

În mod implicit, fiecare RDD transformat poate fi recalculat de fiecare dată când executați o acțiune asupra acestuia. Cu toate acestea, puteți, de asemenea, să păstrați un RDD în memorie folosind metoda persist sau cache, caz în care Spark va păstra elementele din jurul clusterului pentru un acces mult mai rapid data viitoare când îl interogați.

SparkSQL

SparkSQL este o componentă Spark care acceptă interogarea datelor fie prin SQL, fie prin Hive Query Language. A apărut ca portul Apache Hive pentru a rula deasupra lui Spark (în locul MapReduce) și este acum integrat cu stiva Spark. Pe lângă faptul că oferă suport pentru diverse surse de date, face posibilă împletirea interogărilor SQL cu transformări de cod, ceea ce are ca rezultat un instrument foarte puternic. Mai jos este un exemplu de interogare compatibilă Hive:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming

Spark Streaming acceptă procesarea în timp real a datelor în flux, cum ar fi fișierele jurnal de server web de producție (de exemplu Apache Flume și HDFS/S3), rețelele sociale precum Twitter și diverse cozi de mesaje precum Kafka. Sub capotă, Spark Streaming primește fluxurile de date de intrare și împarte datele în loturi. Apoi, sunt procesate de motorul Spark și generează fluxul final de rezultate în loturi, așa cum este prezentat mai jos.

flux de scântei

API-ul Spark Streaming se potrivește îndeaproape cu cel al Spark Core, făcându-le mai ușor pentru programatori să lucreze atât în ​​lumea loturilor, cât și a datelor în flux.

MLlib

MLlib este o bibliotecă de învățare automată care oferă diverși algoritmi proiectați să extindă un cluster pentru clasificare, regresie, grupare, filtrare colaborativă și așa mai departe (consultați articolul Toptal despre învățarea automată pentru mai multe informații despre acest subiect). Unii dintre acești algoritmi funcționează și cu date în flux, cum ar fi regresia liniară folosind cele mai mici pătrate obișnuite sau gruparea k-means (și multe altele pe drum). Apache Mahout (o bibliotecă de învățare automată pentru Hadoop) a renunțat deja la MapReduce și și-a unit forțele pe Spark MLlib.

GraphX

graphx

GraphX ​​este o bibliotecă pentru manipularea graficelor și efectuarea de operații grafice-paralele. Oferă un instrument uniform pentru ETL, analiză exploratorie și calcule grafice iterative. În afară de operațiunile încorporate pentru manipularea graficelor, oferă o bibliotecă de algoritmi de grafică obișnuiți, cum ar fi PageRank.

Cum să utilizați Apache Spark: caz de utilizare pentru detectarea evenimentelor

Acum că am răspuns la întrebarea „Ce este Apache Spark?”, să ne gândim la ce fel de probleme sau provocări ar putea fi folosit cel mai eficient.

Am dat de un articol recent despre un experiment de detectare a unui cutremur prin analiza unui flux Twitter. În mod interesant, s-a demonstrat că această tehnică ar putea să vă informeze despre un cutremur în Japonia mai repede decât Agenția de Meteorologie din Japonia. Chiar dacă au folosit tehnologie diferită în articolul lor, cred că este un exemplu grozav pentru a vedea cum am putea folosi Spark cu fragmente de cod simplificate și fără codul lipici.

În primul rând, ar trebui să filtram tweet-urile care par relevante, cum ar fi „cutremur” sau „tremur”. Am putea folosi cu ușurință Spark Streaming în acest scop, după cum urmează:

 TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

Apoi, ar trebui să facem o analiză semantică pe tweet-uri pentru a determina dacă par să se refere la o apariție curentă a unui cutremur. Tweets precum „Cutremur!” sau „Acum tremură”, de exemplu, ar fi considerate potriviri pozitive, în timp ce tweet-uri precum „Participarea la o conferință de cutremur” sau „Cutremurul de ieri a fost înfricoșător” nu ar fi. Autorii lucrării au folosit în acest scop o mașină vectorială suport (SVM). Vom face același lucru aici, dar putem încerca și o versiune de streaming. Un exemplu de cod rezultat din MLlib ar arăta astfel:

 // We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)

Dacă suntem mulțumiți de rata de predicție a modelului, am putea trece la următoarea etapă și am putea reacționa ori de câte ori descoperim un cutremur. Pentru a detecta unul avem nevoie de un anumit număr (adică, densitatea) de tweet-uri pozitive într-o fereastră de timp definită (așa cum este descris în articol). Rețineți că, pentru tweet-urile cu serviciile de localizare Twitter activate, am extrage și locația cutremurului. Înarmați cu aceste cunoștințe, am putea folosi SparkSQL și am putea interoga un tabel Hive existent (care stochează utilizatorii interesați să primească notificări de cutremur) pentru a le prelua adresele de e-mail și a le trimite un e-mail de avertizare personalizat, după cum urmează:

 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

Alte cazuri de utilizare Apache Spark

Cazurile de utilizare potențiale pentru Spark se extind cu mult dincolo de detectarea cutremurelor, desigur.

Iată o eșantionare rapidă (dar cu siguranță deloc exhaustivă!) a altor cazuri de utilizare care necesită abordarea vitezei, varietății și volumului Big Data, pentru care Spark este atât de potrivit:

În industria jocurilor, procesarea și descoperirea tiparelor din potențialul furtun de incendiu al evenimentelor din joc în timp real și posibilitatea de a răspunde imediat la acestea este o capacitate care ar putea genera o afacere profitabilă, în scopuri precum reținerea jucătorilor, publicitate direcționată, auto -ajustarea nivelului de complexitate și așa mai departe.

În industria comerțului electronic, informațiile despre tranzacții în timp real ar putea fi transmise unui algoritm de grupare în flux, cum ar fi k-means sau filtrarea colaborativă, cum ar fi ALS. Rezultatele ar putea fi apoi combinate cu alte surse de date nestructurate, cum ar fi comentariile clienților sau recenziile de produse, și utilizate pentru a îmbunătăți și adapta în mod constant recomandările în timp cu noile tendințe.

În industria financiară sau de securitate, stiva Spark ar putea fi aplicată unui sistem de detectare a fraudei sau intruziunilor sau a autentificării bazate pe risc. Ar putea obține rezultate de top prin colectarea unor cantități uriașe de jurnale arhivate, combinându-le cu surse de date externe, cum ar fi informații despre încălcări ale datelor și conturi compromise (a se vedea, de exemplu, https://haveibeenpwned.com/) și informații din conexiune/ cerere, cum ar fi geolocalizarea IP sau ora.

Concluzie

În concluzie, Spark ajută la simplificarea sarcinii provocatoare și intensive din punct de vedere computațional de procesare a unor volume mari de date în timp real sau arhivate, atât structurate, cât și nestructurate, integrând perfect capabilități complexe relevante, cum ar fi algoritmii de învățare automată și grafice. Spark aduce procesarea Big Data către mase. Verifică!