Örnekler ve Kullanım Örnekleriyle Apache Spark'a Giriş
Yayınlanan: 2022-03-11Spark'ı ilk kez 2013'ün sonlarında Spark'ın yazıldığı dil olan Scala ile ilgilenmeye başladığımda duydum. Bir süre sonra, Titanik'te hayatta kalmayı tahmin etmeye çalışan eğlenceli bir veri bilimi projesi yaptım. Bu, Spark konseptlerini ve programlamasını daha fazla tanıtmanın harika bir yolu oldu. Başlamak için bir yer arayan tüm hevesli Spark geliştiricilerine şiddetle tavsiye ederim.
Bugün Spark, Amazon, eBay ve Yahoo gibi büyük oyuncular tarafından benimseniyor! Birçok kuruluş Spark'ı binlerce düğüm içeren kümeler üzerinde çalıştırır. Spark SSS'ye göre, bilinen en büyük kümenin 8000'den fazla düğümü vardır. Gerçekten de Spark, dikkate alınmaya ve öğrenilmeye değer bir teknolojidir.
Bu makale, kullanım örnekleri ve örnekler dahil olmak üzere Spark'a bir giriş sağlar. Apache Spark web sitesinden ve Learning Spark - Lightning-Fast Big Data Analysis kitabından bilgiler içerir.
Apache Spark nedir? Giriş
Spark, “yıldırım hızında küme hesaplama” olarak tanıtılan bir Apache projesidir. Gelişen bir açık kaynak topluluğuna sahiptir ve şu anda en aktif Apache projesidir.
Spark, daha hızlı ve daha genel bir veri işleme platformu sağlar. Spark, programları Hadoop'a göre bellekte 100 kata kadar veya diskte 10 kata kadar daha hızlı çalıştırmanıza olanak tanır. Geçen yıl Spark, 100 TB Daytona GraySort yarışmasını makine sayısının onda birinde 3 kat daha hızlı tamamlayarak Hadoop'u devraldı ve aynı zamanda bir petabaytı sıralamak için en hızlı açık kaynaklı motor oldu.
Spark ayrıca, emrinizde 80'den fazla üst düzey operatöre sahip olduğunuz için daha hızlı kod yazmayı mümkün kılar. Bunu göstermek için “Merhaba Dünya!”ya bir göz atalım. BigData: Kelime Sayısı örneği. Java'da MapReduce için yazılmıştır, yaklaşık 50 satır kod içerir, oysa Spark'ta (ve Scala'da) bunu bu kadar basit bir şekilde yapabilirsiniz:
sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")
Apache Spark'ın nasıl kullanılacağını öğrenirken bir diğer önemli husus, kullanıma hazır sağladığı etkileşimli kabuktur (REPL). REPL kullanarak, önce tüm işi kodlamaya ve yürütmeye gerek kalmadan her bir kod satırının sonucu test edilebilir. Böylece çalışma koduna giden yol çok daha kısadır ve geçici veri analizi mümkün hale gelir.
Spark'ın ek temel özellikleri şunları içerir:
- Şu anda diğer dilleri (R gibi) destekleyen Scala, Java ve Python'da API'ler sağlıyor.
- Hadoop ekosistemi ve veri kaynaklarıyla (HDFS, Amazon S3, Hive, HBase, Cassandra, vb.)
- Hadoop YARN veya Apache Mesos tarafından yönetilen kümelerde çalışabilir ve bağımsız olarak da çalışabilir
Spark çekirdeği, aynı uygulamada sorunsuz bir şekilde kullanılabilen bir dizi güçlü, üst düzey kitaplıkla tamamlanır. Bu kitaplıklar şu anda her biri bu makalede ayrıntılı olarak açıklanan SparkSQL, Spark Streaming, MLlib (makine öğrenimi için) ve GraphX'i içermektedir. Ek Spark kitaplıkları ve uzantıları da şu anda geliştirme aşamasındadır.
Kıvılcım Çekirdeği
Spark Core, büyük ölçekli paralel ve dağıtılmış veri işleme için temel motordur. Şunlardan sorumludur:
- bellek yönetimi ve hata kurtarma
- bir kümedeki işleri zamanlama, dağıtma ve izleme
- depolama sistemleri ile etkileşim
Spark, paralel olarak üzerinde çalıştırılabilen değişmez hataya dayanıklı, dağıtılmış nesneler koleksiyonu olan bir RDD (Dayanıklı Dağıtılmış Veri Kümesi) kavramını sunar. Bir RDD, herhangi bir nesne türünü içerebilir ve harici bir veri kümesi yüklenerek veya sürücü programından bir koleksiyon dağıtılarak oluşturulur.
RDD'ler iki tür işlemi destekler:
- Dönüşümler, bir RDD üzerinde gerçekleştirilen ve sonucu içeren yeni bir RDD veren işlemlerdir (harita, filtre, birleştirme, birleştirme vb.).
- Eylemler, bir RDD üzerinde bir hesaplama çalıştırdıktan sonra bir değer döndüren işlemlerdir (azaltma, sayma, ilk vb.).
Spark'taki dönüşümler "tembeldir", yani sonuçlarını hemen hesaplamazlar. Bunun yerine, yalnızca gerçekleştirilecek işlemi ve işlemin gerçekleştirileceği veri kümesini (örn. dosya) “hatırlarlar”. Dönüşümler, yalnızca bir eylem çağrıldığında ve sonuç sürücü programına döndürüldüğünde hesaplanır. Bu tasarım, Spark'ın daha verimli çalışmasını sağlar. Örneğin, büyük bir dosya çeşitli şekillerde dönüştürülmüş ve ilk eyleme geçirilmişse, Spark tüm dosya için işi yapmak yerine yalnızca ilk satır için sonucu işleyecek ve döndürecektir.
Varsayılan olarak, dönüştürülen her RDD, üzerinde her eylem çalıştırdığınızda yeniden hesaplanabilir. Bununla birlikte, kalıcı veya önbellek yöntemini kullanarak bellekte bir RDD'yi kalıcı hale getirebilirsiniz; bu durumda Spark, bir sonraki sorgunuzda çok daha hızlı erişim için öğeleri kümede tutacaktır.
SparkSQL
SparkSQL, verileri SQL veya Hive Query Language aracılığıyla sorgulamayı destekleyen bir Spark bileşenidir. Spark'ın üzerinde (MapReduce yerine) çalışmak üzere Apache Hive bağlantı noktası olarak ortaya çıktı ve şimdi Spark yığınıyla entegre edildi. Çeşitli veri kaynakları için destek sağlamanın yanı sıra, çok güçlü bir araçla sonuçlanan kod dönüşümleri ile SQL sorgularını dokumayı mümkün kılar. Aşağıda Hive uyumlu bir sorgu örneği verilmiştir:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
Kıvılcım Akışı
Spark Streaming, üretim web sunucusu günlük dosyaları (örn. Apache Flume ve HDFS/S3), Twitter gibi sosyal medya ve Kafka gibi çeşitli mesajlaşma kuyrukları gibi akış verilerinin gerçek zamanlı işlenmesini destekler. Başlık altında, Spark Streaming, girdi veri akışlarını alır ve verileri yığınlara böler. Ardından, Spark motoru tarafından işlenirler ve aşağıda gösterildiği gibi toplu olarak nihai sonuç akışını oluştururlar.

Spark Streaming API, Spark Core ile yakından eşleşir ve programcıların hem toplu hem de akış verisi dünyalarında çalışmasını kolaylaştırır.
MLlib
MLlib, sınıflandırma, regresyon, kümeleme, işbirlikçi filtreleme vb. için bir küme üzerinde ölçeği genişletmek üzere tasarlanmış çeşitli algoritmalar sağlayan bir makine öğrenimi kitaplığıdır (bu konu hakkında daha fazla bilgi için Toptal'ın makine öğrenimi hakkındaki makalesine bakın). Bu algoritmalardan bazıları, sıradan en küçük kareler kullanan doğrusal regresyon veya k-ortalama kümeleme (ve daha fazlası) gibi akış verileriyle de çalışır. Apache Mahout (Hadoop için bir makine öğrenimi kitaplığı) şimdiden MapReduce'dan uzaklaştı ve Spark MLlib'de güçlerini birleştirdi.
GrafikX
GraphX, grafikleri işlemek ve grafik-paralel işlemler gerçekleştirmek için bir kitaplıktır. ETL, keşif analizi ve yinelemeli grafik hesaplamaları için tek tip bir araç sağlar. Grafik işleme için yerleşik işlemlerin yanı sıra, PageRank gibi ortak grafik algoritmalarından oluşan bir kitaplık sağlar.
Apache Spark Nasıl Kullanılır: Olay Algılama Kullanım Örneği
Şimdi “Apache Spark nedir?” sorusunun cevabını verdiğimize göre, ne tür problemler veya zorluklar için en etkili şekilde kullanılabileceğini düşünelim.
Geçenlerde bir Twitter akışını analiz ederek bir depremi tespit etmeye yönelik bir deneyle ilgili bir makaleye rastladım. İlginç bir şekilde, bu tekniğin sizi Japonya'daki bir deprem hakkında Japonya Meteoroloji Ajansı'ndan daha hızlı bilgilendireceği gösterildi. Makalelerinde farklı teknolojiler kullanmalarına rağmen, Spark'ı basitleştirilmiş kod parçacıklarıyla ve tutkal kodu olmadan nasıl kullanabileceğimizi görmek için harika bir örnek olduğunu düşünüyorum.
İlk olarak, “deprem” veya “sallanma” gibi alakalı görünen tweetleri filtrelememiz gerekir. Spark Streaming'i bu amaç için aşağıdaki şekilde kolayca kullanabiliriz:
TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
Ardından, mevcut bir deprem oluşumuna gönderme yapıp yapmadıklarını belirlemek için tweet'ler üzerinde bazı anlamsal analizler yapmamız gerekir. “Deprem!” Gibi Tweetler veya “Şimdi sallanıyor”, örneğin, olumlu eşleşmeler olarak kabul edilirken, “Deprem Konferansına Katılmak” veya “Dün deprem korkutucuydu” gibi tweetler sayılmaz. Makalenin yazarları bu amaç için bir destek vektör makinesi (SVM) kullanmışlardır. Aynısını burada yapacağız, ancak bir akış sürümünü de deneyebiliriz. MLlib'den elde edilen bir kod örneği aşağıdaki gibi görünecektir:
// We would prepare some earthquake tweet data and load it in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)
Modelin tahmin oranından memnunsak, bir sonraki aşamaya geçebilir ve bir deprem keşfettiğimizde tepki verebiliriz. Birini tespit etmek için belirli bir zaman aralığında (makalede açıklandığı gibi) belirli sayıda (yani yoğunluk) pozitif tweet'e ihtiyacımız var. Twitter konum servislerinin etkinleştirildiği tweetler için depremin yerini de çıkaracağımızı unutmayın. Bu bilgiyle donanmış olarak, SparkSQL'i kullanabilir ve e-posta adreslerini almak ve onlara kişiselleştirilmiş bir uyarı e-postası göndermek için mevcut bir Hive tablosunu (deprem bildirimleri almakla ilgilenen kullanıcıları depolar) sorgulayabiliriz:
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
Diğer Apache Spark Kullanım Örnekleri
Spark için potansiyel kullanım durumları, elbette depremlerin tespit edilmesinin çok ötesine uzanır.
İşte Spark'ın çok uygun olduğu Büyük Veri'nin hızı, çeşitliliği ve hacmi ile uğraşmayı gerektiren diğer kullanım durumlarının hızlı (ama kesinlikle hiçbir yerde ayrıntılı değil!) bir örneği:
Oyun endüstrisinde, gerçek zamanlı oyun içi olayların potansiyel yangın hortumundan kalıpları işlemek ve keşfetmek ve bunlara anında yanıt verebilmek, oyuncu tutma, hedefli reklamcılık, otomobil gibi amaçlar için kazançlı bir iş sağlayabilecek bir yetenektir. -karmaşıklık seviyesinin ayarlanması vb.
E-ticaret endüstrisinde, gerçek zamanlı işlem bilgileri, k-means gibi bir akış kümeleme algoritmasına veya ALS gibi işbirlikçi filtrelemeye iletilebilir. Sonuçlar daha sonra müşteri yorumları veya ürün incelemeleri gibi diğer yapılandırılmamış veri kaynaklarıyla bile birleştirilebilir ve önerileri sürekli olarak iyileştirmek ve yeni trendlerle zaman içinde uyarlamak için kullanılabilir.
Finans veya güvenlik endüstrisinde, Spark yığını bir dolandırıcılık veya izinsiz giriş tespit sistemine veya risk tabanlı kimlik doğrulamaya uygulanabilir. Büyük miktarlarda arşivlenmiş günlükleri toplayarak, bunları veri ihlalleri ve güvenliği ihlal edilmiş hesaplar hakkındaki bilgiler (bkz. IP coğrafi konum veya zaman gibi istekler.
Çözüm
Özetlemek gerekirse, Spark, hem yapılandırılmış hem de yapılandırılmamış yüksek hacimli gerçek zamanlı veya arşivlenmiş verilerin işlenmesi gibi zorlu ve hesaplama açısından yoğun bir görevi basitleştirmeye yardımcı olur ve makine öğrenimi ve grafik algoritmaları gibi ilgili karmaşık yetenekleri sorunsuz bir şekilde entegre eder. Spark, Büyük Veri işlemeyi kitlelere taşıyor. Buna bir bak!