Tutorial de streaming do Apache Spark: identificando as hashtags de tendências do Twitter

Publicados: 2022-03-11

Hoje em dia, os dados estão crescendo e se acumulando mais rápido do que nunca. Atualmente, cerca de 90% de todos os dados gerados em nosso mundo foram gerados apenas nos últimos dois anos. Devido a essa taxa de crescimento impressionante, as plataformas de big data tiveram que adotar soluções radicais para manter volumes tão grandes de dados.

Uma das principais fontes de dados hoje são as redes sociais. Permita-me demonstrar um exemplo da vida real: lidar, analisar e extrair insights de dados de redes sociais em tempo real usando uma das soluções de eco de big data mais importantes que existem – Apache Spark e Python.

O Apache Spark Streaming pode ser usado para extrair informações das mídias sociais, como hashtags de tendências do Twitter

Neste artigo, ensinarei como construir um aplicativo simples que lê streams online do Twitter usando Python, processa os tweets usando o Apache Spark Streaming para identificar hashtags e, finalmente, retorna as hashtags mais populares e representa esses dados em um real -painel de tempo.

Criando suas próprias credenciais para APIs do Twitter

Para obter tweets do Twitter, você precisa se registrar no TwitterApps clicando em “Criar novo aplicativo” e, em seguida, preencher o formulário abaixo e clicar em “Criar seu aplicativo do Twitter”.

Captura de tela: Como criar seu aplicativo do Twitter.

Em segundo lugar, vá para o seu aplicativo recém-criado e abra a guia “Chaves e tokens de acesso”. Em seguida, clique em “Gerar meu token de acesso”.

Captura de tela: Configurando credenciais, chaves e tokens de acesso do aplicativo do Twitter.

Seus novos tokens de acesso aparecerão conforme abaixo.

Captura de tela: configuração do token de acesso do aplicativo do Twitter.

E agora você está pronto para o próximo passo.

Construindo o cliente HTTP do Twitter

Nesta etapa, mostrarei como criar um cliente simples que obterá os tweets da API do Twitter usando Python e os passará para a instância do Spark Streaming. Deve ser fácil de seguir para qualquer desenvolvedor Python profissional.

Primeiro, vamos criar um arquivo chamado twitter_app.py e, em seguida, adicionaremos o código nele conforme abaixo.

Importe as bibliotecas que usaremos conforme abaixo:

 import socket import sys import requests import requests_oauthlib import json

E adicione as variáveis ​​que serão usadas no OAuth para conexão com o Twitter conforme abaixo:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Agora, criaremos uma nova função chamada get_tweets que chamará a URL da API do Twitter e retornará a resposta para um fluxo de tweets.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Em seguida, crie uma função que receba a resposta acima e extraia o texto dos tweets do objeto JSON dos tweets inteiros. Depois disso, ele envia todos os tweets para a instância do Spark Streaming (será discutido posteriormente) por meio de uma conexão TCP.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Agora, faremos a parte principal que fará as conexões de soquete do host do aplicativo com as quais o spark se conectará. Vamos configurar o IP aqui para ser localhost pois todos rodarão na mesma máquina e na porta 9009 . Em seguida, chamaremos o método get_tweets , que fizemos acima, para obter os tweets do Twitter e passar sua resposta junto com a conexão do soquete para send_tweets_to_spark para enviar os tweets ao Spark.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Configurando nosso aplicativo de streaming Apache Spark

Vamos construir nosso aplicativo de streaming Spark que fará o processamento em tempo real dos tweets recebidos, extrairá as hashtags deles e calculará quantas hashtags foram mencionadas.

Ilustração: O Spark streaming permite o processamento em tempo real de tweets recebidos e extração de hashtag

Primeiro, temos que criar uma instância do Spark Context sc , depois criamos o Streaming Context ssc de sc com um intervalo de lote de dois segundos que fará a transformação em todos os fluxos recebidos a cada dois segundos. Observe que definimos o nível de log como ERROR para desabilitar a maioria dos logs que o Spark grava.

Definimos um checkpoint aqui para permitir checkpoints periódicos de RDD; isso é obrigatório para ser usado em nosso aplicativo, pois usaremos transformações com estado (serão discutidas posteriormente na mesma seção).

Em seguida, definimos nosso principal DStream dataStream que se conectará ao servidor de soquete que criamos antes na porta 9009 e leremos os tweets dessa porta. Cada registro no DStream será um tweet.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Agora, vamos definir nossa lógica de transformação. Primeiro vamos dividir todos os tweets em palavras e colocá-los em palavras RDD. Em seguida, vamos filtrar apenas hashtags de todas as palavras e mapeá-las para um par de (hashtag, 1) e colocá-las em hashtags RDD.

Em seguida, precisamos calcular quantas vezes a hashtag foi mencionada. Podemos fazer isso usando a função reduceByKey . Esta função irá calcular quantas vezes a hashtag foi mencionada por cada lote, ou seja, irá zerar as contagens em cada lote.

No nosso caso, precisamos calcular as contagens em todos os lotes, então usaremos outra função chamada updateStateByKey , pois essa função permite manter o estado do RDD enquanto o atualiza com novos dados. Essa forma é chamada de Stateful Transformation .

Observe que, para usar updateStateByKey , você precisa configurar um ponto de verificação e o que fizemos na etapa anterior.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

O updateStateByKey recebe uma função como parâmetro chamada função de update . Ele é executado em cada item no RDD e faz a lógica desejada.

No nosso caso, criamos uma função de atualização chamada aggregate_tags_count que somará todos os new_values ​​para cada hashtag e os adicionará ao total_sum que é a soma de todos os lotes e salvará os dados em tags_totals RDD.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Em seguida, fazemos o processamento em tags_totals RDD em cada lote para convertê-lo em tabela temporária usando o Spark SQL Context e, em seguida, executamos uma instrução select para recuperar as dez principais hashtags com suas contagens e colocá-las no quadro de dados hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

A última etapa em nosso aplicativo Spark é enviar o quadro de dados hashtag_counts_df para o aplicativo do painel. Então vamos converter o quadro de dados em dois arrays, um para as hashtags e outro para suas contagens. Em seguida, os enviaremos para o aplicativo de painel por meio da API REST.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Por fim, aqui está um exemplo de saída do Spark Streaming ao executar e imprimir o hashtag_counts_df , você notará que a saída é impressa exatamente a cada dois segundos de acordo com os intervalos de lote.

Um exemplo de saída de streaming do Twitter Spark, impressa por configurações de intervalo de lote

Crie um painel simples em tempo real para representar os dados

Agora, vamos criar um aplicativo de dashboard simples que será atualizado em tempo real pelo Spark. Vamos construí-lo usando Python, Flask e Charts.js.

Primeiro, vamos criar um projeto Python com a estrutura vista abaixo e baixar e adicionar o arquivo Chart.js no diretório estático.

Ilustração: Criando um projeto Python para uso na análise de hashtag do Twitter

Em seguida, no arquivo app.py , criaremos uma função chamada update_data , que será chamada pelo Spark através da URL http://localhost:5001/updateData para atualizar as matrizes de rótulos e valores globais.

Além disso, a função refresh_graph_data é criada para ser chamada pela solicitação AJAX para retornar os novos rótulos atualizados e matrizes de valores como JSON. A função get_chart_page renderizará a página chart.html quando chamada.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Agora, vamos criar um gráfico simples no arquivo chart.html para exibir os dados da hashtag e atualizá-los em tempo real. Conforme definido abaixo, precisamos importar as bibliotecas JavaScript Chart.js e jquery.min.js .

Na tag body, temos que criar uma tela e fornecer um ID para fazer referência a ela enquanto exibimos o gráfico usando JavaScript na próxima etapa.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Agora, vamos construir o gráfico usando o código JavaScript abaixo. Primeiro, obtemos o elemento canvas e, em seguida, criamos um novo objeto gráfico e passamos o elemento canvas para ele e definimos seu objeto de dados como abaixo.

Observe que os rótulos e os dados dos dados são limitados a variáveis ​​de rótulos e valores que são retornadas durante a renderização da página ao chamar uma função get_chart_page no arquivo app.py

A última parte restante é a função que está configurada para fazer uma solicitação Ajax a cada segundo e chamar a URL /refreshData , que executará refresh_graph_data em app.py e retornará os novos dados atualizados e, em seguida, atualizará o char que renderiza os novos dados.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Executando os aplicativos juntos

Vamos executar os três aplicativos na ordem abaixo: 1. Twitter App Client. 2. Aplicativo Spark. 3. Aplicativo Web do Painel.

Então você pode acessar o painel em tempo real usando a URL <http://localhost:5001/>

Agora, você pode ver seu gráfico sendo atualizado, conforme abaixo:

Animação: gráfico de hashtag de tendências do Twitter em tempo real

Casos de uso da vida real do Apache Streaming

Aprendemos a fazer análises simples de dados em tempo real usando o Spark Streaming e integrando-o diretamente a um painel simples usando um serviço Web RESTful. A partir deste exemplo, podemos ver como o Spark é poderoso, pois captura um fluxo massivo de dados, transforma-o e extrai informações valiosas que podem ser usadas facilmente para tomar decisões rapidamente. Existem muitos casos de uso úteis que podem ser implementados e que podem atender a diferentes setores, como notícias ou marketing.

Ilustração: Hashtags podem ser usadas para extrair insights e sentimentos valiosos, aplicáveis ​​em vários setores.

Exemplo da indústria de notícias

Podemos rastrear as hashtags mencionadas com mais frequência para saber sobre quais tópicos as pessoas estão falando mais nas mídias sociais. Além disso, podemos rastrear hashtags específicas e seus tweets para saber o que as pessoas estão dizendo sobre tópicos ou eventos específicos no mundo.

Exemplo de marketing

Podemos coletar o fluxo de tweets e, através da análise de sentimentos, categorizá-los e determinar os interesses das pessoas para direcioná-los com ofertas relacionadas aos seus interesses.

Além disso, existem muitos casos de uso que podem ser aplicados especificamente para análise de big data e podem atender a muitos setores. Para mais casos de uso do Apache Spark em geral, sugiro que você confira um de nossos posts anteriores.

Convido você a ler mais sobre o Spark Streaming aqui para saber mais sobre seus recursos e fazer uma transformação mais avançada nos dados para obter mais insights em tempo real usando-o.