Didacticiel Apache Spark Streaming : Identifier les hashtags Twitter tendance
Publié: 2022-03-11De nos jours, les données augmentent et s'accumulent plus rapidement que jamais. Actuellement, environ 90 % de toutes les données générées dans notre monde ne l'ont été qu'au cours des deux dernières années. En raison de ce taux de croissance fulgurant, les plateformes de big data ont dû adopter des solutions radicales afin de maintenir des volumes de données aussi importants.
L'une des principales sources de données aujourd'hui sont les réseaux sociaux. Permettez-moi de vous montrer un exemple concret : traiter, analyser et extraire des informations à partir de données de réseaux sociaux en temps réel à l'aide de l'une des plus importantes solutions d'écho de données volumineuses : Apache Spark et Python.
Dans cet article, je vais vous apprendre à créer une application simple qui lit les flux en ligne de Twitter à l'aide de Python, puis traite les tweets à l'aide d'Apache Spark Streaming pour identifier les hashtags et, enfin, renvoie les hashtags les plus populaires et représente ces données sur un vrai -tableau de bord horaire.
Création de vos propres informations d'identification pour les API Twitter
Afin d'obtenir des tweets de Twitter, vous devez vous inscrire sur TwitterApps en cliquant sur "Créer une nouvelle application", puis remplir le formulaire ci-dessous en cliquant sur "Créer votre application Twitter".
Deuxièmement, accédez à votre application nouvellement créée et ouvrez l'onglet "Clés et jetons d'accès". Cliquez ensuite sur "Générer mon jeton d'accès".
Vos nouveaux jetons d'accès apparaîtront comme ci-dessous.
Et maintenant vous êtes prêt pour la prochaine étape.
Création du client HTTP Twitter
Dans cette étape, je vais vous montrer comment créer un client simple qui obtiendra les tweets de l'API Twitter à l'aide de Python et les transmettra à l'instance Spark Streaming. Il devrait être facile à suivre pour tout développeur Python professionnel.
Commençons par créer un fichier appelé twitter_app.py
, puis nous y ajouterons le code comme ci-dessous.
Importez les bibliothèques que nous utiliserons comme ci-dessous :
import socket import sys import requests import requests_oauthlib import json
Et ajoutez les variables qui seront utilisées dans OAuth pour se connecter à Twitter comme ci-dessous :
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Maintenant, nous allons créer une nouvelle fonction appelée get_tweets
qui appellera l'URL de l'API Twitter et renverra la réponse pour un flux de tweets.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Ensuite, créez une fonction qui prend la réponse de celle ci-dessus et extrait le texte des tweets de l'objet JSON entier des tweets. Après cela, il envoie chaque tweet à l'instance Spark Streaming (nous en parlerons plus tard) via une connexion TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
Maintenant, nous allons créer la partie principale qui établira les connexions du socket hôte de l'application avec lesquelles Spark se connectera. Nous allons configurer l'adresse IP ici pour être localhost
car tout fonctionnera sur la même machine et le port 9009
. Ensuite, nous appellerons la méthode get_tweets
, que nous avons créée ci-dessus, pour obtenir les tweets de Twitter et transmettre sa réponse avec la connexion socket à send_tweets_to_spark
pour envoyer les tweets à Spark.
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
Configuration de notre application Apache Spark Streaming
Construisons notre application de streaming Spark qui traitera en temps réel les tweets entrants, en extraira les hashtags et calculera le nombre de hashtags mentionnés.
Tout d'abord, nous devons créer une instance de Spark Context sc
, puis nous avons créé le Streaming Context ssc
à partir de sc
avec un intervalle de lot de deux secondes qui effectuera la transformation sur tous les flux reçus toutes les deux secondes. Notez que nous avons défini le niveau de journalisation sur ERROR
afin de désactiver la plupart des journaux écrits par Spark.
Nous avons défini un point de contrôle ici afin de permettre un point de contrôle RDD périodique ; ceci est obligatoire pour être utilisé dans notre application, car nous utiliserons des transformations avec état (nous en parlerons plus tard dans la même section).
Ensuite, nous définissons notre flux de données DStream principal qui se connectera au serveur de socket que nous avons créé auparavant sur le port 9009
et lira les tweets de ce port. Chaque enregistrement dans le DStream sera un tweet.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
Maintenant, nous allons définir notre logique de transformation. Nous allons d'abord diviser tous les tweets en mots et les mettre en mots RDD. Ensuite, nous filtrerons uniquement les hashtags de tous les mots et les mapperons sur une paire de (hashtag, 1)
et les placerons dans les hashtags RDD.
Ensuite, nous devons calculer combien de fois le hashtag a été mentionné. Nous pouvons le faire en utilisant la fonction reduceByKey
. Cette fonction calculera combien de fois le hashtag a été mentionné pour chaque lot, c'est-à-dire qu'elle réinitialisera les comptes dans chaque lot.
Dans notre cas, nous devons calculer les décomptes sur tous les lots, nous allons donc utiliser une autre fonction appelée updateStateByKey
, car cette fonction vous permet de maintenir l'état de RDD tout en le mettant à jour avec de nouvelles données. Cette méthode s'appelle Stateful Transformation
.
Notez que pour utiliser updateStateByKey
, vous devez configurer un point de contrôle, et c'est ce que nous avons fait à l'étape précédente.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
Le updateStateByKey
prend une fonction comme paramètre appelé la fonction de update
. Il s'exécute sur chaque élément de RDD et effectue la logique souhaitée.
Dans notre cas, nous avons créé une fonction de mise à jour appelée " aggregate_tags_count
" qui additionnera toutes les new_values
pour chaque hashtag et les ajoutera au total_sum
qui est la somme de tous les lots et enregistrera les données dans tags_totals
RDD.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Ensuite, nous effectuons un traitement sur tags_totals
RDD dans chaque lot afin de le convertir en table temporaire à l'aide de Spark SQL Context, puis effectuons une instruction de sélection afin de récupérer les dix premiers hashtags avec leur nombre et de les placer dans la trame de données hashtag_counts_df
.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
La dernière étape de notre application Spark consiste à envoyer la trame de données hashtag_counts_df
à l'application de tableau de bord. Nous allons donc convertir la trame de données en deux tableaux, l'un pour les hashtags et l'autre pour leurs décomptes. Ensuite, nous les enverrons à l'application de tableau de bord via l'API REST.

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Enfin, voici un exemple de sortie du Spark Streaming lors de l'exécution et de l'impression du hashtag_counts_df
, vous remarquerez que la sortie est imprimée exactement toutes les deux secondes selon les intervalles de lot.
Créer un tableau de bord simple en temps réel pour représenter les données
Nous allons maintenant créer une application de tableau de bord simple qui sera mise à jour en temps réel par Spark. Nous allons le construire en utilisant Python, Flask et Charts.js.
Tout d'abord, créons un projet Python avec la structure ci-dessous et téléchargeons et ajoutons le fichier Chart.js dans le répertoire statique.
Ensuite, dans le fichier app.py
, nous allons créer une fonction appelée update_data
, qui sera appelée par Spark via l'URL http://localhost:5001/updateData
afin de mettre à jour les tableaux Global labels et values.
De plus, la fonction refresh_graph_data
est créée pour être appelée par une requête AJAX afin de renvoyer les nouveaux tableaux d'étiquettes et de valeurs mis à jour au format JSON. La fonction get_chart_page
affichera la page chart.html
lorsqu'elle sera appelée.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
Maintenant, créons un graphique simple dans le fichier chart.html
afin d'afficher les données du hashtag et de les mettre à jour en temps réel. Comme défini ci-dessous, nous devons importer les bibliothèques JavaScript Chart.js
et jquery.min.js
.
Dans la balise body, nous devons créer un canevas et lui attribuer un ID afin de le référencer lors de l'affichage du graphique à l'aide de JavaScript à l'étape suivante.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
Construisons maintenant le graphique en utilisant le code JavaScript ci-dessous. Tout d'abord, nous obtenons l'élément canvas, puis nous créons un nouvel objet graphique et lui transmettons l'élément canvas et définissons son objet de données comme ci-dessous.
Notez que les étiquettes et les données des données sont liées par des variables d'étiquettes et de valeurs qui sont renvoyées lors du rendu de la page lors de l'appel d'une fonction get_chart_page
dans le fichier app.py
La dernière partie restante est la fonction qui est configurée pour faire une requête Ajax toutes les secondes et appeler l'URL /refreshData
, qui exécutera refresh_graph_data
dans app.py
et renverra les nouvelles données mises à jour, puis mettra à jour le char qui rend les nouvelles données.
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
Exécuter les applications ensemble
Exécutons les trois applications dans l'ordre ci-dessous : 1. Twitter App Client. 2. Application Spark. 3. Application Web de tableau de bord.
Ensuite, vous pouvez accéder au tableau de bord en temps réel à l'aide de l'URL <http://localhost:5001/>
Maintenant, vous pouvez voir votre graphique en cours de mise à jour, comme ci-dessous :
Apache Streaming Cas d'utilisation réels
Nous avons appris à effectuer des analyses de données simples sur des données en temps réel à l'aide de Spark Streaming et à les intégrer directement à un tableau de bord simple à l'aide d'un service Web RESTful. À partir de cet exemple, nous pouvons voir à quel point Spark est puissant, car il capture un flux massif de données, le transforme et en extrait des informations précieuses qui peuvent être utilisées facilement pour prendre des décisions en un rien de temps. Il existe de nombreux cas d'utilisation utiles qui peuvent être mis en œuvre et qui peuvent servir différentes industries, comme les nouvelles ou le marketing.
Exemple de l'industrie de l'actualité
Nous pouvons suivre les hashtags les plus fréquemment mentionnés pour savoir de quels sujets les gens parlent le plus sur les réseaux sociaux. De plus, nous pouvons suivre des hashtags spécifiques et leurs tweets afin de savoir ce que les gens disent sur des sujets ou des événements spécifiques dans le monde.
Exemple de commercialisation
Nous pouvons collecter le flux de tweets et, en faisant une analyse des sentiments, les catégoriser et déterminer les intérêts des gens afin de les cibler avec des offres liées à leurs intérêts.
En outre, de nombreux cas d'utilisation peuvent être appliqués spécifiquement à l'analyse de données volumineuses et peuvent servir de nombreux secteurs. Pour plus de cas d'utilisation d'Apache Spark en général, je vous suggère de consulter l'un de nos articles précédents.
Je vous encourage à en savoir plus sur Spark Streaming à partir d'ici afin d'en savoir plus sur ses capacités et d'effectuer une transformation plus avancée des données pour plus d'informations en temps réel en l'utilisant.