Tutorial de streaming Apache Spark: Identificarea hashtag-urilor Twitter în tendințe

Publicat: 2022-03-11

În zilele noastre, datele cresc și se acumulează mai repede decât oricând. În prezent, aproximativ 90% din toate datele generate în lumea noastră au fost generate doar în ultimii doi ani. Datorită acestui ritm uluitor de creștere, platformele de date mari au fost nevoite să adopte soluții radicale pentru a menține astfel de volume uriașe de date.

Una dintre principalele surse de date astăzi sunt rețelele sociale. Permiteți-mi să vă demonstrez un exemplu din viața reală: gestionarea, analizarea și extragerea de informații din datele rețelelor sociale în timp real, folosind una dintre cele mai importante soluții de ecouri de date mari existente — Apache Spark și Python.

Apache Spark Streaming poate fi folosit pentru a extrage informații din rețelele sociale, cum ar fi hashtag-urile Twitter în tendințe

În acest articol, vă voi învăța cum să construiți o aplicație simplă care citește fluxurile online de pe Twitter folosind Python, apoi procesează tweet-urile folosind Apache Spark Streaming pentru a identifica hashtag-uri și, în cele din urmă, returnează hashtag-urile de top și reprezintă aceste date într-un mod real. -tabloul de bord timp.

Crearea propriilor acreditări pentru API-urile Twitter

Pentru a obține tweet-uri de la Twitter, trebuie să vă înregistrați pe TwitterApps făcând clic pe „Creați o nouă aplicație”, apoi completați formularul de mai jos, faceți clic pe „Creați-vă aplicația Twitter”.

Captură de ecran: Cum să vă creați aplicația Twitter.

În al doilea rând, accesați aplicația nou creată și deschideți fila „Chei și jetoane de acces”. Apoi faceți clic pe „Generează-mi simbolul de acces”.

Captură de ecran: Configurarea acreditărilor pentru aplicația Twitter, a cheilor și a simbolurilor de acces.

Noile dvs. jetoane de acces vor apărea ca mai jos.

Captură de ecran: configurarea jetonului de acces la aplicația Twitter.

Și acum ești gata pentru următorul pas.

Construirea clientului HTTP Twitter

În acest pas, vă voi arăta cum să construiți un client simplu care va primi tweet-urile de la Twitter API folosind Python și le va transmite instanței Spark Streaming. Ar trebui să fie ușor de urmărit pentru orice dezvoltator profesionist Python.

Mai întâi, să creăm un fișier numit twitter_app.py și apoi vom adăuga codul în el, ca mai jos.

Importați bibliotecile pe care le vom folosi după cum urmează:

 import socket import sys import requests import requests_oauthlib import json

Și adăugați variabilele care vor fi utilizate în OAuth pentru conectarea la Twitter, după cum urmează:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Acum, vom crea o nouă funcție numită get_tweets care va apela URL-ul API-ului Twitter și va returna răspunsul pentru un flux de tweet-uri.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Apoi, creați o funcție care preia răspunsul de la cel de mai sus și extrage textul tweet-urilor din obiectul JSON al întregului tweet. După aceea, trimite fiecare tweet către instanța Spark Streaming (va fi discutată mai târziu) printr-o conexiune TCP.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Acum, vom realiza partea principală care va face conexiunile prizei gazdă a aplicației la care se va conecta scânteia. Vom configura IP-ul aici să fie localhost , deoarece toate vor rula pe aceeași mașină și pe portul 9009 . Apoi vom apela metoda get_tweets , pe care am făcut-o mai sus, pentru a obține tweet-urile de pe Twitter și vom transmite răspunsul acestuia împreună cu conexiunea socket la send_tweets_to_spark pentru a trimite tweet-urile către Spark.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Configurarea aplicației noastre de streaming Apache Spark

Să creăm aplicația noastră de streaming Spark care va procesa în timp real tweet-urile primite, să extragem hashtag-urile din ele și să calculăm câte hashtag-uri au fost menționate.

Ilustrație: Spark streaming permite procesarea în timp real a tweet-urilor primite și extragerea hashtagurilor

În primul rând, trebuie să creăm o instanță de Spark Context sc , apoi am creat Streaming Context ssc din sc cu un interval de două secunde care va face transformarea pe toate fluxurile primite la fiecare două secunde. Observați că am setat nivelul de jurnal la ERROR pentru a dezactiva majoritatea jurnalelor pe care le scrie Spark.

Am definit aici un punct de control pentru a permite controlul periodic RDD; acest lucru este obligatoriu pentru a fi utilizat în aplicația noastră, deoarece vom folosi transformări stateful (vor fi discutate mai târziu în aceeași secțiune).

Apoi definim dataStream-ul nostru principal DStream care se va conecta la serverul socket creat anterior pe portul 9009 și se va citi tweet-urile de pe acel port. Fiecare înregistrare din DStream va fi un tweet.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Acum, vom defini logica noastră de transformare. Mai întâi vom împărți toate tweet-urile în cuvinte și le vom pune în cuvinte RDD. Apoi vom filtra numai hashtag-urile din toate cuvintele și le vom mapa la pereche de (hashtag, 1) și le vom pune în hashtag-uri RDD.

Apoi trebuie să calculăm de câte ori a fost menționat hashtagul. Putem face asta folosind funcția reduceByKey . Această funcție va calcula de câte ori a fost menționat hashtag-ul pentru fiecare lot, adică va reseta contorizarea în fiecare lot.

În cazul nostru, trebuie să calculăm numărătoarea pentru toate loturile, așa că vom folosi o altă funcție numită updateStateByKey , deoarece această funcție vă permite să mențineți starea RDD în timp ce o actualizați cu date noi. Acest mod se numește Stateful Transformation .

Rețineți că, pentru a utiliza updateStateByKey , trebuie să configurați un punct de control și asta am făcut în pasul anterior.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey ia o funcție ca parametru numită funcție de update . Se rulează pe fiecare articol din RDD și face logica dorită.

În cazul nostru, am creat o funcție de actualizare numită aggregate_tags_count care va însuma toate new_values ​​pentru fiecare hashtag și le va adăuga la total_sum care este suma tuturor loturii și va salva datele în tags_totals RDD.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Apoi procesăm tags_totals RDD în fiecare lot pentru a-l converti într-un tabel temporar utilizând Spark SQL Context și apoi efectuăm o instrucțiune select pentru a prelua primele zece hashtag-uri cu numărul lor și le punem în cadrul de date hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Ultimul pas în aplicația noastră Spark este să trimitem cadrul de date hashtag_counts_df către aplicația tablou de bord. Deci vom converti cadrul de date în două matrice, unul pentru hashtag-uri și celălalt pentru numărarea acestora. Apoi le vom trimite la aplicația tablou de bord prin API-ul REST.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

În cele din urmă, iată un exemplu de ieșire a Spark Streaming în timp ce rulați și imprimați hashtag_counts_df , veți observa că rezultatul este tipărit exact la fiecare două secunde conform intervalelor de lot.

Un exemplu de ieșire în flux Twitter Spark, tipărită pe setări de interval de lot

Creați un tablou de bord simplu în timp real pentru reprezentarea datelor

Acum, vom crea o aplicație simplă de tablou de bord care va fi actualizată în timp real de către Spark. Îl vom construi folosind Python, Flask și Charts.js.

Mai întâi, să creăm un proiect Python cu structura văzută mai jos și să descarcăm și să adăugăm fișierul Chart.js în directorul static.

Ilustrație: Crearea unui proiect Python pentru a fi utilizat în analiza hashtag-urilor Twitter

Apoi, în fișierul app.py , vom crea o funcție numită update_data , care va fi apelată de Spark prin adresa URL http://localhost:5001/updateData pentru a actualiza etichetele globale și matricele de valori.

De asemenea, funcția refresh_graph_data este creată pentru a fi apelată prin cererea AJAX pentru a returna noile etichete și matrice de valori actualizate ca JSON. Funcția get_chart_page va reda pagina chart.html atunci când este apelată.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Acum, să creăm o diagramă simplă în fișierul chart.html pentru a afișa datele hashtag-urilor și a le actualiza în timp real. După cum este definit mai jos, trebuie să importam bibliotecile JavaScript Chart.js și jquery.min.js .

În eticheta body, trebuie să creăm o pânză și să-i dăm un ID pentru a-l face referire în timp ce afișăm diagrama folosind JavaScript în pasul următor.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Acum, să construim diagrama folosind codul JavaScript de mai jos. În primul rând, obținem elementul canvas, apoi creăm un nou obiect diagramă și îi transmitem elementul canvas și definim obiectul de date ca mai jos.

Rețineți că etichetele și datele datelor sunt delimitate cu etichete și variabile de valori care sunt returnate în timpul redării paginii atunci când apelați o funcție get_chart_page în fișierul app.py

Ultima parte rămasă este funcția care este configurată să facă o solicitare Ajax în fiecare secundă și să apeleze URL-ul /refreshData , care va executa refresh_graph_data în app.py și va returna noile date actualizate, apoi va actualiza caracterul care redă noile date.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Rularea aplicațiilor împreună

Să rulăm cele trei aplicații în ordinea de mai jos: 1. Twitter App Client. 2. Aplicația Spark. 3. Aplicația Web Dashboard.

Apoi puteți accesa tabloul de bord în timp real folosind adresa URL <http://localhost:5001/>

Acum, puteți vedea diagrama dvs. fiind actualizată, după cum urmează:

Animație: diagramă de hashtag-uri în tendințe Twitter în timp real

Apache Streaming Real Life Use Cases

Am învățat cum să facem analize simple ale datelor în timp real folosind Spark Streaming și cum o integrăm direct cu un tablou de bord simplu folosind un serviciu web RESTful. Din acest exemplu, putem vedea cât de puternic este Spark, deoarece captează un flux masiv de date, îl transformă și extrage informații valoroase care pot fi utilizate cu ușurință pentru a lua decizii în cel mai scurt timp. Există multe cazuri de utilizare utile care pot fi implementate și care pot servi diferite industrii, cum ar fi știrile sau marketingul.

Ilustrație: Hashtag-urile pot fi folosite pentru a extrage informații și sentimente valoroase, aplicabile în mai multe industrii.

Exemplu în industria știrilor

Putem urmări hashtag-urile cele mai frecvent menționate pentru a ști despre ce subiecte vorbesc oamenii cel mai mult pe rețelele sociale. De asemenea, putem urmări anumite hashtag-uri și tweet-urile acestora pentru a ști ce spun oamenii despre anumite subiecte sau evenimente din lume.

Exemplu de marketing

Putem colecta fluxul de tweet-uri și, făcând o analiză de sentiment, le putem clasifica și determina interesele oamenilor pentru a le viza cu oferte legate de interesele lor.

De asemenea, există o mulțime de cazuri de utilizare care pot fi aplicate în mod specific pentru analiza datelor mari și pot deservi o mulțime de industrii. Pentru mai multe cazuri de utilizare Apache Spark în general, vă sugerez să consultați una dintre postările noastre anterioare.

Vă încurajez să citiți mai multe despre Spark Streaming de aici pentru a afla mai multe despre capacitățile sale și pentru a face o transformare mai avansată a datelor pentru mai multe informații în timp real folosindu-le.