Apache Spark Akış Eğitimi: Trend Olan Twitter Hashtag'lerini Belirleme
Yayınlanan: 2022-03-11Günümüzde veriler her zamankinden daha hızlı büyüyor ve birikiyor. Şu anda dünyamızda üretilen tüm verilerin yaklaşık %90'ı yalnızca son iki yılda üretildi. Bu şaşırtıcı büyüme hızı nedeniyle, büyük veri platformları bu kadar büyük miktarda veriyi korumak için radikal çözümler benimsemek zorunda kaldı.
Günümüzün ana veri kaynaklarından biri sosyal ağlardır. Gerçek hayattan bir örnek göstermeme izin verin: En önemli büyük veri yankı çözümlerinden biri olan Apache Spark ve Python'u kullanarak sosyal ağ verileriyle gerçek zamanlı olarak ilgilenme, analiz etme ve içgörü çıkarma.
Bu makalede, Python kullanarak Twitter'dan çevrimiçi akışları okuyan, ardından hashtag'leri belirlemek için tweet'leri Apache Spark Streaming kullanarak işleyen ve son olarak en popüler hashtag'leri döndüren ve bu verileri gerçek bir -zaman kontrol paneli.
Twitter API'leri için Kendi Kimlik Bilgilerinizi Oluşturma
Twitter'dan tweet almak için "Yeni uygulama oluştur" seçeneğine tıklayarak TwitterApps'e kaydolmanız ve ardından aşağıdaki formu "Twitter uygulamanızı oluşturun" seçeneğine tıklayarak doldurmanız gerekir.
İkinci olarak, yeni oluşturduğunuz uygulamaya gidin ve “Anahtarlar ve Erişim Simgeleri” sekmesini açın. Ardından “Erişim jetonumu oluştur” seçeneğine tıklayın.
Yeni erişim jetonlarınız aşağıdaki gibi görünecektir.
Ve şimdi bir sonraki adım için hazırsınız.
Twitter HTTP İstemcisi Oluşturma
Bu adımda, Python kullanarak Twitter API'sinden tweetleri alacak ve bunları Spark Streaming örneğine iletecek basit bir istemcinin nasıl oluşturulacağını göstereceğim. Herhangi bir profesyonel Python geliştiricisi için takip etmesi kolay olmalıdır.
İlk olarak twitter_app.py
adında bir dosya oluşturalım ve ardından içindeki kodu aşağıdaki gibi birlikte ekleyeceğiz.
Kullanacağımız kitaplıkları aşağıdaki gibi içe aktarın:
import socket import sys import requests import requests_oauthlib import json
Ve OAuth'ta Twitter'a bağlanmak için kullanılacak değişkenleri aşağıdaki gibi ekleyin:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Şimdi, Twitter API URL'sini çağıracak ve bir tweet akışı için yanıtı döndürecek olan get_tweets
adlı yeni bir işlev oluşturacağız.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Ardından, yukarıdaki yanıtı alan ve tweet'lerin metnini tüm tweet'lerin JSON nesnesinden çıkaran bir işlev oluşturun. Bundan sonra, her tweet'i bir TCP bağlantısı üzerinden Spark Streaming örneğine (daha sonra tartışılacaktır) gönderir.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
Şimdi, Spark'ın bağlanacağı uygulama ana bilgisayar soket bağlantılarını yapacak ana kısmı yapacağız. Hepsi aynı makinede ve 9009
numaralı bağlantı noktasında çalışacağından, IP'yi burada localhost
olacak şekilde yapılandıracağız. Ardından tweetleri Twitter'dan almak için yukarıda yaptığımız get_tweets
yöntemini çağıracağız ve yanıtını soket bağlantısıyla birlikte tweet'leri send_tweets_to_spark
göndermek için send_tweets_to_spark'a ileteceğiz.
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
Apache Spark Akış Uygulamamızı Kurma
Gelen tweet'ler için gerçek zamanlı işlem yapacak, onlardan hashtag'leri çıkaracak ve kaç hashtag'den bahsedildiğini hesaplayacak Spark akış uygulamamızı oluşturalım.
İlk olarak, bir Spark Context sc
örneği oluşturmalıyız, ardından, her iki saniyede bir alınan tüm akışlarda dönüşümü yapacak olan, iki saniyelik bir toplu aralıkla sc
Streaming Context ssc
oluşturduk. Spark'ın yazdığı günlüklerin çoğunu devre dışı bırakmak için günlük düzeyini ERROR
olarak ayarladığımıza dikkat edin.
Periyodik RDD kontrol noktalarına izin vermek için burada bir kontrol noktası tanımladık; durumsal dönüşümleri kullanacağımızdan (aynı bölümde daha sonra tartışılacaktır) bunun uygulamamızda kullanılması zorunludur.
Daha sonra 9009
portunda daha önce oluşturduğumuz soket sunucusuna bağlanacak ana DStream dataStream'imizi tanımlıyoruz ve o porttan gelen tweetleri okuyoruz. DStream'deki her kayıt bir tweet olacaktır.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
Şimdi dönüşüm mantığımızı tanımlayacağız. İlk önce tüm tweetleri kelimelere ayıracağız ve onları RDD kelimelerine koyacağız. Ardından, tüm kelimelerden yalnızca hashtag'leri filtreleyeceğiz ve bunları çift (hashtag, 1)
ile eşleştireceğiz ve bunları RDD hashtag'lerine koyacağız.
Ardından hashtag'den kaç kez bahsedildiğini hesaplamamız gerekiyor. Bunu reduceByKey
işlevini kullanarak yapabiliriz. Bu fonksiyon, hashtag'in her parti için kaç kez bahsedildiğini hesaplayacaktır, yani her partideki sayıları sıfırlayacaktır.
Bizim durumumuzda, tüm gruplardaki sayıları hesaplamamız gerekiyor, bu nedenle updateStateByKey
adlı başka bir işlev kullanacağız, çünkü bu işlev RDD'yi yeni verilerle güncellerken durumunu korumanıza izin verir. Bu yola Stateful Transformation
denir.
updateStateByKey
kullanmak için bir kontrol noktası yapılandırmanız gerektiğini ve önceki adımda yaptığımızı unutmayın.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
, update
işlevi adı verilen bir parametre olarak bir işlevi alır. RDD'deki her öğe üzerinde çalışır ve istenen mantığı yapar.
Bizim durumumuzda, her hashtag için tüm new_values
ve bunları tüm gruplardaki aggregate_tags_count
olan total_sum
ekleyecek ve verileri tags_totals
kaydedecek olan cluster_tags_count adlı bir güncelleme fonksiyonu oluşturduk.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Daha sonra Spark SQL Context kullanarak geçici tabloya dönüştürmek için her partide tags_totals
RDD üzerinde işlem yapıyoruz ve ardından sayıları ile ilk on hashtag'i almak ve hashtag_counts_df
veri çerçevesine koymak için bir select ifadesi yapıyoruz.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
Spark uygulamamızdaki son adım, hashtag_counts_df
veri çerçevesini pano uygulamasına göndermektir. Böylece veri çerçevesini, biri hashtag'ler ve diğeri sayıları için olmak üzere iki diziye dönüştüreceğiz. Ardından bunları REST API aracılığıyla gösterge tablosu uygulamasına göndereceğiz.

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Son olarak, hashtag_counts_df
çalıştırılırken ve yazdırılırken Spark Streaming'in örnek bir çıktısı burada, çıktının toplu aralıklarla tam olarak iki saniyede bir yazdırıldığını fark edeceksiniz.
Verileri Temsil Etmek İçin Basit Bir Gerçek Zamanlı Gösterge Tablosu Oluşturun
Şimdi, Spark tarafından gerçek zamanlı olarak güncellenecek basit bir pano uygulaması oluşturacağız. Python, Flask ve Charts.js kullanarak oluşturacağız.
Öncelikle aşağıda görülen yapısı ile bir Python projesi oluşturalım ve Chart.js dosyasını indirip statik dizine ekleyelim.
Ardından, app.py
dosyasında, Global etiketler ve değerler dizilerini güncellemek için http://localhost:5001/updateData
URL'si aracılığıyla Spark tarafından çağrılacak update_data
adlı bir işlev oluşturacağız.
Ayrıca, yeni güncellenen etiketleri ve değer dizilerini JSON olarak döndürmek için AJAX isteği tarafından çağrılacak şekilde refresh_graph_data
işlevi oluşturulur. get_chart_page
işlevi, çağrıldığında chart.html
sayfasını oluşturacaktır.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
Şimdi hashtag verilerini görüntülemek ve gerçek zamanlı olarak güncellemek için chart.html
dosyasında basit bir grafik oluşturalım. Aşağıda tanımlandığı gibi Chart.js
ve jquery.min.js
JavaScript kitaplıklarını içe aktarmamız gerekiyor.
Gövde etiketinde, bir sonraki adımda JavaScript kullanarak grafiği görüntülerken referans olması için bir tuval oluşturmalı ve ona bir kimlik vermeliyiz.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
Şimdi aşağıdaki JavaScript kodunu kullanarak grafiği oluşturalım. İlk olarak canvas elementini alıyoruz ve ardından yeni bir grafik nesnesi oluşturup buna canvas elementini iletiyoruz ve data nesnesini aşağıdaki gibi tanımlıyoruz.
Verilerin etiketlerinin ve verilerinin, app.py
dosyasında bir get_chart_page
işlevi çağrılırken sayfa oluşturulurken döndürülen etiketler ve değer değişkenleriyle sınırlandırıldığını unutmayın.
Kalan son kısım, her saniye bir Ajax isteği yapacak ve URL'yi çağıracak şekilde yapılandırılmış /refreshData
, bu, refresh_graph_data
app.py
ve yeni güncellenmiş verileri döndürecek ve ardından yeni verileri işleyen karakteri güncelleyecektir.
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
Uygulamaları birlikte çalıştırma
Üç uygulamayı aşağıdaki sırayla çalıştıralım: 1. Twitter Uygulama İstemcisi. 2. Kıvılcım Uygulaması. 3. Pano Web Uygulaması.
Ardından, <http://localhost:5001/>
URL'sini kullanarak gerçek zamanlı gösterge tablosuna erişebilirsiniz.
Şimdi, grafiğinizin aşağıdaki gibi güncellendiğini görebilirsiniz:
Apache Akışı Gerçek Hayat Kullanım Örnekleri
Spark Streaming'i kullanarak gerçek zamanlı olarak veriler üzerinde basit veri analitiği yapmayı ve RESTful web hizmetini kullanarak basit bir pano ile doğrudan tümleştirmeyi öğrendik. Bu örnekten, büyük bir veri akışını yakaladığı, dönüştürdüğü ve anında karar vermek için kolayca kullanılabilecek değerli içgörüler çıkardığı için Spark'ın ne kadar güçlü olduğunu görebiliriz. Uygulanabilecek ve haberler veya pazarlama gibi farklı sektörlere hizmet edebilecek birçok yararlı kullanım örneği vardır.
Haber sektörü örneği
İnsanların sosyal medyada en çok hangi konuları konuştuğunu öğrenmek için en sık bahsedilen hashtag'leri takip edebiliriz. Ayrıca, dünyadaki belirli konular veya olaylar hakkında insanların ne söylediğini bilmek için belirli hashtag'leri ve tweet'lerini takip edebiliriz.
Pazarlama örneği
Tweet akışlarını toplayabilir ve duygu analizi yaparak kategorilere ayırabilir ve ilgi alanlarıyla ilgili tekliflerle onları hedeflemek için kişilerin ilgi alanlarını belirleyebiliriz.
Ayrıca, özellikle büyük veri analitiği için uygulanabilecek ve birçok sektöre hizmet edebilecek birçok kullanım durumu vardır. Genel olarak daha fazla Apache Spark kullanım durumu için önceki gönderilerimizden birine göz atmanızı öneririm.
Yetenekleri hakkında daha fazla bilgi edinmek ve onu kullanarak gerçek zamanlı olarak daha fazla içgörü elde etmek için veriler üzerinde daha gelişmiş dönüşüm yapmak için Spark Streaming hakkında buradan daha fazla bilgi edinmenizi tavsiye ederim.