Tutorial di streaming di Apache Spark: Identificazione degli hashtag di Twitter di tendenza
Pubblicato: 2022-03-11Al giorno d'oggi, i dati crescono e si accumulano più velocemente che mai. Attualmente, circa il 90% di tutti i dati generati nel nostro mondo è stato generato solo negli ultimi due anni. A causa di questo incredibile tasso di crescita, le piattaforme di big data hanno dovuto adottare soluzioni radicali per mantenere enormi volumi di dati.
Una delle principali fonti di dati oggi sono i social network. Consentitemi di illustrare un esempio di vita reale: trattare, analizzare ed estrarre informazioni dai dati dei social network in tempo reale utilizzando una delle più importanti soluzioni di eco per i big data in circolazione: Apache Spark e Python.
In questo articolo, ti insegnerò come costruire una semplice applicazione che legge i flussi online da Twitter utilizzando Python, quindi elabora i tweet utilizzando Apache Spark Streaming per identificare gli hashtag e, infine, restituisce gli hashtag più di tendenza e rappresenta questi dati su un formato reale - dashboard del tempo.
Creare le tue credenziali per le API di Twitter
Per ricevere tweet da Twitter, devi registrarti su TwitterApps facendo clic su "Crea nuova app" e quindi compilare il modulo sottostante e fare clic su "Crea la tua app Twitter".
In secondo luogo, vai all'app appena creata e apri la scheda "Chiavi e token di accesso". Quindi fare clic su "Genera il mio token di accesso".
I tuoi nuovi token di accesso appariranno come di seguito.
E ora sei pronto per il passaggio successivo.
Creazione del client HTTP di Twitter
In questo passaggio, ti mostrerò come creare un semplice client che otterrà i tweet dall'API di Twitter utilizzando Python e li passerà all'istanza Spark Streaming. Dovrebbe essere facile da seguire per qualsiasi sviluppatore Python professionista.
Per prima cosa, creiamo un file chiamato twitter_app.py
e poi aggiungeremo il codice insieme come di seguito.
Importa le librerie che useremo come di seguito:
import socket import sys import requests import requests_oauthlib import json
E aggiungi le variabili che verranno utilizzate in OAuth per la connessione a Twitter come di seguito:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Ora creeremo una nuova funzione chiamata get_tweets
che chiamerà l'URL dell'API di Twitter e restituirà la risposta per un flusso di tweet.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Quindi, crea una funzione che prende la risposta da quella sopra ed estrae il testo dei tweet dall'intero oggetto JSON dei tweet. Successivamente, invia ogni tweet all'istanza Spark Streaming (sarà discussa in seguito) tramite una connessione TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
Ora creeremo la parte principale che renderà le connessioni socket host dell'app con cui si collegherà Spark. Configurare qui l'IP in modo che sia localhost
poiché tutti verranno eseguiti sulla stessa macchina e sulla porta 9009
. Quindi chiameremo il metodo get_tweets
, che abbiamo creato sopra, per ottenere i tweet da Twitter e passeremo la sua risposta insieme alla connessione socket a send_tweets_to_spark
per inviare i tweet a Spark.
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
Configurazione della nostra applicazione di streaming Apache Spark
Costruiamo la nostra app di streaming Spark che eseguirà l'elaborazione in tempo reale per i tweet in arrivo, estrarrà gli hashtag da essi e calcolerà quanti hashtag sono stati menzionati.
Innanzitutto, dobbiamo creare un'istanza di Spark Context sc
, quindi abbiamo creato Streaming Context ssc
da sc
con un intervallo batch di due secondi che eseguirà la trasformazione su tutti i flussi ricevuti ogni due secondi. Si noti che abbiamo impostato il livello di registro su ERROR
per disabilitare la maggior parte dei registri scritti da Spark.
Abbiamo definito qui un checkpoint per consentire il checkpoint periodico RDD; questo è obbligatorio per essere utilizzato nella nostra app, poiché utilizzeremo le trasformazioni stateful (saranno discusse più avanti nella stessa sezione).
Quindi definiamo il nostro dataStream DStream principale che si collegherà al server socket che abbiamo creato in precedenza sulla porta 9009
e leggerà i tweet da quella porta. Ogni record nel DStream sarà un tweet.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
Ora definiremo la nostra logica di trasformazione. Per prima cosa divideremo tutti i tweet in parole e li metteremo in parole RDD. Quindi filtreremo solo gli hashtag da tutte le parole e li mapperemo su una coppia di (hashtag, 1)
e li inseriremo negli hashtag RDD.
Quindi dobbiamo calcolare quante volte è stato menzionato l'hashtag. Possiamo farlo usando la funzione reduceByKey
. Questa funzione calcolerà quante volte l'hashtag è stato menzionato per ogni batch, cioè azzererà i conteggi in ogni batch.
Nel nostro caso, dobbiamo calcolare i conteggi su tutti i batch, quindi utilizzeremo un'altra funzione chiamata updateStateByKey
, poiché questa funzione consente di mantenere lo stato di RDD mentre lo si aggiorna con nuovi dati. Questo modo è chiamato Stateful Transformation
.
Nota che per utilizzare updateStateByKey
, devi configurare un checkpoint e quello che abbiamo fatto nel passaggio precedente.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
accetta una funzione come parametro chiamata funzione di update
. Funziona su ogni elemento in RDD e esegue la logica desiderata.
Nel nostro caso, abbiamo creato una funzione di aggiornamento chiamata aggregate_tags_count
che sommerà tutti i new_values
per ogni hashtag e li aggiungerà al total_sum
che è la somma di tutti i batch e salverà i dati in tags_totals
RDD.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Quindi eseguiamo l'elaborazione su tags_totals
RDD in ogni batch per convertirlo in una tabella temporanea utilizzando Spark SQL Context e quindi eseguire un'istruzione select per recuperare i primi dieci hashtag con i loro conteggi e inserirli nel frame di dati hashtag_counts_df
.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
L'ultimo passaggio nella nostra applicazione Spark è inviare il frame di dati hashtag_counts_df
all'applicazione dashboard. Quindi convertiremo il frame di dati in due array, uno per gli hashtag e l'altro per i loro conteggi. Quindi li invieremo all'applicazione dashboard tramite l'API REST.

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Infine, ecco un esempio di output di Spark Streaming durante l'esecuzione e la stampa di hashtag_counts_df
, noterai che l'output viene stampato esattamente ogni due secondi secondo gli intervalli batch.
Crea un semplice dashboard in tempo reale per rappresentare i dati
Ora creeremo una semplice applicazione dashboard che verrà aggiornata in tempo reale da Spark. Lo costruiremo usando Python, Flask e Charts.js.
Per prima cosa, creiamo un progetto Python con la struttura vista di seguito e scarichiamo e aggiungiamo il file Chart.js nella directory statica.
Quindi, nel file app.py
, creeremo una funzione chiamata update_data
, che verrà chiamata da Spark tramite l'URL http://localhost:5001/updateData
per aggiornare le etichette globali e gli array di valori.
Inoltre, la funzione refresh_graph_data
viene creata per essere chiamata dalla richiesta AJAX per restituire le nuove etichette e gli array di valori aggiornati come JSON. La funzione get_chart_page
renderà la pagina chart.html
quando chiamata.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
Ora creiamo un semplice grafico nel file chart.html
per visualizzare i dati dell'hashtag e aggiornarli in tempo reale. Come definito di seguito, è necessario importare le librerie JavaScript Chart.js
e jquery.min.js
.
Nel tag body, dobbiamo creare un canvas e dargli un ID per fare riferimento mentre visualizziamo il grafico usando JavaScript nel passaggio successivo.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
Ora costruiamo il grafico usando il codice JavaScript di seguito. Innanzitutto, otteniamo l'elemento canvas, quindi creiamo un nuovo oggetto grafico e gli passiamo l'elemento canvas e definiamo il suo oggetto dati come di seguito.
Si noti che le etichette e i dati dei dati sono delimitati da etichette e valori variabili restituiti durante il rendering della pagina quando si chiama una funzione get_chart_page
nel file app.py
L'ultima parte rimanente è la funzione che è configurata per eseguire una richiesta Ajax ogni secondo e chiamare l'URL /refreshData
, che eseguirà refresh_graph_data
in app.py
e restituirà i nuovi dati aggiornati, quindi aggiornerà il carattere che esegue il rendering dei nuovi dati.
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
Esecuzione delle applicazioni insieme
Eseguiamo le tre applicazioni nell'ordine seguente: 1. Twitter App Client. 2. App Spark. 3. App Web dashboard.
Quindi puoi accedere alla dashboard in tempo reale utilizzando l'URL <http://localhost:5001/>
Ora puoi vedere il tuo grafico aggiornato, come di seguito:
Casi d'uso reali in streaming di Apache
Abbiamo imparato come eseguire semplici analisi dei dati sui dati in tempo reale utilizzando Spark Streaming e integrandolo direttamente con un semplice dashboard utilizzando un servizio Web RESTful. Da questo esempio, possiamo vedere quanto sia potente Spark, poiché acquisisce un enorme flusso di dati, lo trasforma ed estrae informazioni preziose che possono essere utilizzate facilmente per prendere decisioni in pochissimo tempo. Esistono molti casi d'uso utili che possono essere implementati e che possono servire diversi settori, come le notizie o il marketing.
Esempio del settore delle notizie
Possiamo tenere traccia degli hashtag citati più di frequente per sapere di quali argomenti le persone parlano di più sui social media. Inoltre, possiamo tenere traccia di hashtag specifici e dei loro tweet per sapere cosa dicono le persone su argomenti o eventi specifici nel mondo.
Esempio di marketing
Possiamo raccogliere il flusso di tweet e, facendo un'analisi del sentiment, classificarli e determinare gli interessi delle persone al fine di indirizzarli con offerte correlate ai loro interessi.
Inoltre, ci sono molti casi d'uso che possono essere applicati specificamente per l'analisi dei big data e possono servire molti settori. Per altri casi d'uso di Apache Spark in generale, ti suggerisco di controllare uno dei nostri post precedenti.
Ti incoraggio a leggere di più su Spark Streaming da qui per saperne di più sulle sue capacità e fare una trasformazione più avanzata dei dati per ulteriori approfondimenti in tempo reale utilizzandolo.