Tutorial de transmisión de Apache Spark: identificación de hashtags de tendencia en Twitter

Publicado: 2022-03-11

Hoy en día, los datos crecen y se acumulan más rápido que nunca. Actualmente, alrededor del 90% de todos los datos generados en nuestro mundo se generaron solo en los últimos dos años. Debido a esta asombrosa tasa de crecimiento, las plataformas de big data tuvieron que adoptar soluciones radicales para mantener volúmenes de datos tan grandes.

Una de las principales fuentes de datos en la actualidad son las redes sociales. Permítame demostrar un ejemplo de la vida real: tratar, analizar y extraer información de los datos de las redes sociales en tiempo real utilizando una de las soluciones de eco de big data más importantes que existen: Apache Spark y Python.

Apache Spark Streaming se puede utilizar para extraer información de las redes sociales, como los hashtags de tendencia de Twitter.

En este artículo, le enseñaré cómo crear una aplicación simple que lea transmisiones en línea de Twitter usando Python, luego procese los tweets usando Apache Spark Streaming para identificar hashtags y, finalmente, devuelva los hashtags más populares y represente estos datos en un real. -panel de tiempo.

Creación de sus propias credenciales para las API de Twitter

Para obtener tweets de Twitter, debe registrarse en TwitterApps haciendo clic en "Crear nueva aplicación" y luego completar el formulario a continuación, haga clic en "Crear su aplicación de Twitter".

Captura de pantalla: Cómo crear tu aplicación de Twitter.

En segundo lugar, vaya a su aplicación recién creada y abra la pestaña "Claves y tokens de acceso". Luego haga clic en "Generar mi token de acceso".

Captura de pantalla: Configuración de credenciales, claves y tokens de acceso de la aplicación de Twitter.

Sus nuevos tokens de acceso aparecerán como se muestra a continuación.

Captura de pantalla: Configuración del token de acceso a la aplicación de Twitter.

Y ahora estás listo para el siguiente paso.

Creación del cliente HTTP de Twitter

En este paso, le mostraré cómo crear un cliente simple que obtendrá los tweets de la API de Twitter mediante Python y los pasará a la instancia de Spark Streaming. Debería ser fácil de seguir para cualquier desarrollador profesional de Python.

Primero, creemos un archivo llamado twitter_app.py y luego agregaremos el código como se muestra a continuación.

Importe las bibliotecas que usaremos de la siguiente manera:

 import socket import sys import requests import requests_oauthlib import json

Y agregue las variables que se usarán en OAuth para conectarse a Twitter como se muestra a continuación:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Ahora, crearemos una nueva función llamada get_tweets que llamará a la URL de la API de Twitter y devolverá la respuesta para una secuencia de tweets.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Luego, crea una función que tome la respuesta de la anterior y extraiga el texto de los tweets del objeto JSON de todos los tweets. Después de eso, envía cada tweet a la instancia de Spark Streaming (se discutirá más adelante) a través de una conexión TCP.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Ahora, haremos la parte principal que hará las conexiones de socket del host de la aplicación con las que se conectará Spark. Configuraremos la IP aquí para que sea localhost ya que todos se ejecutarán en la misma máquina y el puerto 9009 . Luego llamaremos al método get_tweets , que hicimos anteriormente, para obtener los tweets de Twitter y pasar su respuesta junto con la conexión de socket a send_tweets_to_spark para enviar los tweets a Spark.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Configuración de nuestra aplicación Apache Spark Streaming

Construyamos nuestra aplicación de transmisión Spark que procesará en tiempo real los tweets entrantes, extraerá los hashtags de ellos y calculará cuántos hashtags se han mencionado.

Ilustración: Spark Streaming permite el procesamiento en tiempo real de los tweets entrantes y la extracción de hashtags

Primero, tenemos que crear una instancia de Spark Context sc , luego creamos Streaming Context ssc from sc con un intervalo de lote de dos segundos que hará la transformación en todas las transmisiones recibidas cada dos segundos. Tenga en cuenta que hemos establecido el nivel de registro en ERROR para deshabilitar la mayoría de los registros que escribe Spark.

Definimos un punto de control aquí para permitir puntos de control periódicos de RDD; esto es obligatorio para ser usado en nuestra aplicación, ya que usaremos transformaciones con estado (se discutirá más adelante en la misma sección).

Luego definimos nuestro flujo de datos DStream principal que se conectará al servidor de socket que creamos antes en el puerto 9009 y leerá los tweets desde ese puerto. Cada registro en el DStream será un tweet.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Ahora, definiremos nuestra lógica de transformación. Primero dividiremos todos los tweets en palabras y los pondremos en palabras RDD. Luego, filtraremos solo los hashtags de todas las palabras y los asignaremos a un par de (hashtag, 1) y los pondremos en hashtags RDD.

Luego, debemos calcular cuántas veces se ha mencionado el hashtag. Podemos hacerlo usando la función reduceByKey . Esta función calculará cuántas veces se ha mencionado el hashtag por cada lote, es decir, restablecerá los recuentos en cada lote.

En nuestro caso, necesitamos calcular los conteos en todos los lotes, por lo que usaremos otra función llamada updateStateByKey , ya que esta función le permite mantener el estado de RDD mientras lo actualiza con nuevos datos. Esta forma se llama Stateful Transformation .

Tenga en cuenta que para usar updateStateByKey , debe configurar un punto de control, y eso es lo que hemos hecho en el paso anterior.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey toma una función como parámetro llamada función de update . Se ejecuta en cada elemento en RDD y hace la lógica deseada.

En nuestro caso, hemos creado una función de actualización llamada aggregate_tags_count que sumará todos los total_sum new_values es la suma de todos los lotes, y guardará los datos en tags_totals RDD.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Luego procesamos tags_totals RDD en cada lote para convertirlo en una tabla temporal usando Spark SQL Context y luego realizamos una declaración de selección para recuperar los diez hashtags principales con sus conteos y colocarlos en el marco de datos hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

El último paso en nuestra aplicación Spark es enviar el marco de datos hashtag_counts_df a la aplicación del tablero. Entonces, convertiremos el marco de datos en dos matrices, una para los hashtags y la otra para sus conteos. Luego, los enviaremos a la aplicación del tablero a través de la API REST.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Finalmente, aquí hay una salida de muestra de Spark Streaming mientras ejecuta e imprime el hashtag_counts_df , notará que la salida se imprime exactamente cada dos segundos según los intervalos de lote.

Un ejemplo de salida de transmisión de Twitter Spark, impreso por configuración de intervalo de lote

Cree un tablero simple en tiempo real para representar los datos

Ahora, crearemos una aplicación de tablero simple que Spark actualizará en tiempo real. Lo construiremos usando Python, Flask y Charts.js.

Primero, creemos un proyecto de Python con la estructura que se ve a continuación y descarguemos y agreguemos el archivo Chart.js en el directorio estático.

Ilustración: creación de un proyecto de Python para su uso en el análisis de hashtags de Twitter

Luego, en el archivo app.py , crearemos una función llamada update_data , a la que Spark llamará a través de la URL http://localhost:5001/updateData para actualizar las etiquetas globales y las matrices de valores.

Además, la función refresh_graph_data se crea para que la solicite AJAX y devuelva las nuevas matrices de etiquetas y valores actualizadas como JSON. La función get_chart_page mostrará la página chart.html cuando se la llame.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Ahora, creemos un gráfico simple en el archivo chart.html para mostrar los datos del hashtag y actualizarlos en tiempo real. Como se define a continuación, necesitamos importar las bibliotecas de JavaScript Chart.js y jquery.min.js .

En la etiqueta del cuerpo, tenemos que crear un lienzo y darle una ID para hacer referencia a él mientras se muestra el gráfico usando JavaScript en el siguiente paso.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Ahora, construyamos el gráfico usando el código JavaScript a continuación. Primero, obtenemos el elemento de lienzo, y luego creamos un nuevo objeto de gráfico y le pasamos el elemento de lienzo y definimos su objeto de datos como se muestra a continuación.

Tenga en cuenta que las etiquetas de los datos y los datos están delimitados por etiquetas y variables de valores que se devuelven al representar la página al llamar a una función get_chart_page en el archivo app.py

La última parte restante es la función que está configurada para realizar una solicitud Ajax cada segundo y llamar a la URL /refreshData , que ejecutará refresh_graph_data en app.py y devolverá los nuevos datos actualizados, y luego actualizará el carácter que representa los nuevos datos.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Ejecutar las aplicaciones juntas

Ejecutemos las tres aplicaciones en el siguiente orden: 1. Cliente de aplicación de Twitter. 2. Aplicación Spark. 3. Aplicación web del panel.

Luego puede acceder al tablero en tiempo real usando la URL <http://localhost:5001/>

Ahora, puede ver que su gráfico se actualiza, como se muestra a continuación:

Animación: tabla de hashtags de tendencias de Twitter en tiempo real

Apache Streaming Casos de uso de la vida real

Aprendimos cómo realizar análisis de datos simples en tiempo real usando Spark Streaming e integrándolos directamente con un tablero simple usando un servicio web RESTful. A partir de este ejemplo, podemos ver cuán poderoso es Spark, ya que captura un flujo masivo de datos, los transforma y extrae información valiosa que se puede usar fácilmente para tomar decisiones en poco tiempo. Hay muchos casos de uso útiles que se pueden implementar y que pueden servir a diferentes industrias, como noticias o marketing.

Ilustración: los hashtags se pueden usar para extraer información y sentimientos valiosos, aplicables en múltiples industrias.

Ejemplo de la industria de las noticias

Podemos rastrear los hashtags mencionados con más frecuencia para saber de qué temas habla más la gente en las redes sociales. Además, podemos rastrear hashtags específicos y sus tweets para saber qué dice la gente sobre temas o eventos específicos en el mundo.

Ejemplo de mercadeo

Podemos recopilar el flujo de tweets y, mediante el análisis de sentimientos, categorizarlos y determinar los intereses de las personas para orientarlos con ofertas relacionadas con sus intereses.

Además, hay muchos casos de uso que se pueden aplicar específicamente para el análisis de big data y pueden servir a muchas industrias. Para obtener más casos de uso de Apache Spark en general, le sugiero que consulte una de nuestras publicaciones anteriores.

Lo animo a leer más sobre Spark Streaming desde aquí para saber más sobre sus capacidades y hacer una transformación más avanzada de los datos para obtener más información en tiempo real usándolo.