Samouczek przesyłania strumieniowego Apache Spark: identyfikacja popularnych hashtagów na Twitterze

Opublikowany: 2022-03-11

W dzisiejszych czasach dane rosną i gromadzą się szybciej niż kiedykolwiek wcześniej. Obecnie około 90% wszystkich danych generowanych na naszym świecie zostało wygenerowanych tylko w ciągu ostatnich dwóch lat. Ze względu na to oszałamiające tempo wzrostu platformy big data musiały przyjąć radykalne rozwiązania, aby utrzymać tak ogromne ilości danych.

Jednym z głównych źródeł danych są dziś sieci społecznościowe. Pozwolę sobie zademonstrować rzeczywisty przykład: przetwarzanie, analizowanie i wydobywanie spostrzeżeń z danych z sieci społecznościowych w czasie rzeczywistym przy użyciu jednego z najważniejszych rozwiązań echa Big Data — Apache Spark i Python.

Apache Spark Streaming może być używany do wydobywania spostrzeżeń z mediów społecznościowych, takich jak popularne hashtagi na Twitterze

W tym artykule nauczę Cię, jak zbudować prostą aplikację, która odczytuje strumienie online z Twittera za pomocą Pythona, a następnie przetwarza tweety za pomocą Apache Spark Streaming w celu identyfikacji hashtagów, a na koniec zwraca topowe hashtagi i przedstawia te dane na rzeczywistym -czas deski rozdzielczej.

Tworzenie własnych poświadczeń dla API Twittera

Aby otrzymywać tweety z Twittera, musisz zarejestrować się na TwitterApps, klikając „Utwórz nową aplikację”, a następnie wypełnić poniższy formularz, klikając „Utwórz swoją aplikację na Twitterze”.

Zrzut ekranu: Jak stworzyć swoją aplikację na Twitterze.

Po drugie, przejdź do nowo utworzonej aplikacji i otwórz zakładkę „Klucze i tokeny dostępu”. Następnie kliknij „Wygeneruj mój token dostępu”.

Zrzut ekranu: Konfigurowanie danych logowania, kluczy i tokenów dostępu do aplikacji Twittera.

Twoje nowe tokeny dostępu pojawią się poniżej.

Zrzut ekranu: Konfiguracja tokena dostępu do aplikacji Twittera.

A teraz jesteś gotowy na kolejny krok.

Budowanie klienta HTTP Twitter

W tym kroku pokażę, jak zbudować prostego klienta, który będzie pobierał tweety z Twitter API za pomocą Pythona i przekazywał je do instancji Spark Streaming. Powinien być łatwy do naśladowania dla każdego profesjonalnego programisty Pythona.

Najpierw utwórzmy plik o nazwie twitter_app.py , a następnie dodamy do niego kod, jak poniżej.

Zaimportuj biblioteki, których będziemy używać, jak poniżej:

 import socket import sys import requests import requests_oauthlib import json

I dodaj zmienne, które będą używane w OAuth do łączenia się z Twitterem, jak poniżej:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Teraz utworzymy nową funkcję o nazwie get_tweets , która wywoła adres URL API Twittera i zwróci odpowiedź na strumień tweetów.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Następnie utwórz funkcję, która pobiera odpowiedź z powyższej i wyodrębnia tekst tweeta z obiektu JSON całego tweeta. Następnie wysyła każdy tweet do instancji Spark Streaming (omówione później) przez połączenie TCP.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Teraz zrobimy główną część, która wykona połączenia z gniazdem hosta aplikacji, z którymi będzie się łączyć Spark. Skonfigurujemy tutaj adres IP jako localhost , ponieważ wszystkie będą działały na tej samej maszynie i porcie 9009 . Następnie wywołamy metodę get_tweets , którą stworzyliśmy powyżej, aby pobrać tweety z Twittera i przekazać jej odpowiedź wraz z połączeniem z gniazdem do send_tweets_to_spark w celu wysłania tweetów do Sparka.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Konfigurowanie naszej aplikacji do przesyłania strumieniowego Apache Spark

Zbudujmy naszą aplikację do przesyłania strumieniowego Spark, która będzie przetwarzać w czasie rzeczywistym przychodzące tweety, wyodrębniać z nich hashtagi i obliczać, ile hashtagów zostało wspomnianych.

Ilustracja: Przesyłanie strumieniowe Spark umożliwia przetwarzanie w czasie rzeczywistym przychodzących tweetów i ekstrakcję hashtagów

Najpierw musimy utworzyć wystąpienie Spark Context sc , a następnie utworzyliśmy Streaming Context ssc from sc z interwałem partii wynoszącym dwie sekundy, który wykona przekształcenie we wszystkich strumieniach odbieranych co dwie sekundy. Zauważ, że ustawiliśmy poziom dziennika na ERROR , aby wyłączyć większość dzienników, które zapisuje Spark.

Zdefiniowaliśmy tutaj punkt kontrolny, aby umożliwić okresowe sprawdzanie punktów kontrolnych RDD; jest to obowiązkowe do użycia w naszej aplikacji, ponieważ będziemy używać przekształceń stanowych (zostaniemy omówione później w tej samej sekcji).

Następnie definiujemy nasz główny strumień danych DStream, który połączy się z utworzonym wcześniej serwerem gniazd na porcie 9009 i odczyta tweety z tego portu. Każdy rekord w DStream będzie tweetem.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Teraz zdefiniujemy naszą logikę transformacji. Najpierw podzielimy wszystkie tweety na słowa i umieścimy je w słowach RDD. Następnie odfiltrujemy tylko hashtagi ze wszystkich słów i zmapujemy je na parę (hashtag, 1) i umieścimy je w hashtagach RDD.

Następnie musimy obliczyć, ile razy hashtag został wspomniany. Możemy to zrobić za pomocą funkcji reduceByKey . Ta funkcja obliczy, ile razy hashtag został wymieniony w każdej partii, tj. zresetuje liczniki w każdej partii.

W naszym przypadku musimy obliczyć liczniki we wszystkich partiach, więc użyjemy innej funkcji o nazwie updateStateByKey , ponieważ ta funkcja pozwala zachować stan RDD podczas aktualizowania go o nowe dane. Ten sposób nazywa się Stateful Transformation .

Zauważ, że aby użyć updateStateByKey , musisz skonfigurować punkt kontrolny i to, co zrobiliśmy w poprzednim kroku.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey przyjmuje funkcję jako parametr zwany funkcją update . Działa na każdym elemencie w RDD i wykonuje pożądaną logikę.

W naszym przypadku stworzyliśmy funkcję aktualizacji o nazwie aggregate_tags_count , która sumuje wszystkie new_values dla każdego hashtagu i dodaje je do total_sum , która jest sumą dla wszystkich partii, i zapisuje dane w tags_totals RDD.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Następnie przetwarzamy tags_totals RDD w każdej partii, aby przekonwertować ją na tabelę tymczasową za pomocą kontekstu Spark SQL, a następnie wykonujemy instrukcję select w celu pobrania dziesięciu najlepszych hashtagów wraz z ich liczbami i umieszczenia ich w ramce danych hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Ostatnim krokiem w naszej aplikacji Spark jest wysłanie ramki danych hashtag_counts_df do aplikacji dashboardu. Skonwertujemy więc ramkę danych na dwie tablice, jedną dla hashtagów, a drugą dla ich liczby. Następnie prześlemy je do aplikacji dashboardu poprzez REST API.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Na koniec oto przykładowe dane wyjściowe Spark Streaming podczas uruchamiania i drukowania hashtag_counts_df , zauważysz, że dane wyjściowe są drukowane dokładnie co dwie sekundy zgodnie z interwałami partii.

Przykład wyjścia strumieniowego Twitter Spark, drukowane według ustawień interwału partii

Utwórz prosty pulpit nawigacyjny w czasie rzeczywistym do reprezentowania danych

Teraz stworzymy prostą aplikację dashboardu, która będzie aktualizowana w czasie rzeczywistym przez Spark. Zbudujemy go za pomocą Pythona, Flask i Charts.js.

Najpierw stwórzmy projekt Pythona o strukturze widocznej poniżej i pobierzmy i dodajmy plik Chart.js do katalogu statycznego.

Ilustracja: Tworzenie projektu w Pythonie do wykorzystania w analizie hashtagów na Twitterze

Następnie w pliku app.py utworzymy funkcję o nazwie update_data , która zostanie wywołana przez Spark za pośrednictwem adresu URL http://localhost:5001/updateData w celu zaktualizowania globalnych etykiet i tablic wartości.

Ponadto tworzona jest funkcja refresh_graph_data , która ma być wywoływana przez żądanie AJAX w celu zwrócenia nowych zaktualizowanych etykiet i tablic wartości w formacie JSON. Po wywołaniu funkcja get_chart_page wyrenderuje stronę chart.html .

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Teraz stwórzmy prosty wykres w pliku chart.html , aby wyświetlić dane hashtagów i aktualizować je w czasie rzeczywistym. Jak zdefiniowano poniżej, musimy zaimportować biblioteki JavaScript Chart.js i jquery.min.js .

W tagu body musimy utworzyć płótno i nadać mu identyfikator, aby w następnym kroku odwoływać się do niego podczas wyświetlania wykresu za pomocą JavaScript.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Teraz skonstruujmy wykres, korzystając z poniższego kodu JavaScript. Najpierw otrzymujemy element canvas, a następnie tworzymy nowy obiekt wykresu i przekazujemy do niego element canvas oraz definiujemy jego obiekt danych jak poniżej.

Należy zauważyć, że etykiety danych i dane są powiązane z etykietami i zmiennymi wartości, które są zwracane podczas renderowania strony podczas wywoływania funkcji get_chart_page w pliku app.py

Ostatnia pozostała część to funkcja, która jest skonfigurowana do wykonywania żądania Ajax co sekundę i wywoływania adresu URL /refreshData , która wykona refresh_graph_data w app.py i zwróci nowe zaktualizowane dane, a następnie zaktualizuje znak, który renderuje nowe dane.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Wspólne uruchamianie aplikacji

Uruchommy trzy aplikacje w następującej kolejności: 1. Klient aplikacji Twitter. 2. Aplikacja Spark. 3. Aplikacja internetowa na pulpicie.

Następnie możesz uzyskać dostęp do pulpitu nawigacyjnego w czasie rzeczywistym, korzystając z adresu URL <http://localhost:5001/>

Teraz możesz zobaczyć, jak aktualizowany jest wykres, jak poniżej:

Animacja: Wykres trendów na Twitterze w czasie rzeczywistym

Przypadki użycia usługi Apache Streaming w prawdziwym życiu

Dowiedzieliśmy się, jak wykonywać proste analizy danych w czasie rzeczywistym za pomocą Spark Streaming i integrować je bezpośrednio z prostym pulpitem nawigacyjnym przy użyciu usługi sieciowej RESTful. Na tym przykładzie widzimy, jak potężny jest Spark, ponieważ przechwytuje ogromny strumień danych, przekształca je i wydobywa cenne spostrzeżenia, które można łatwo wykorzystać do podejmowania decyzji w krótkim czasie. Istnieje wiele przydatnych przypadków użycia, które można wdrożyć i które mogą służyć różnym branżom, takim jak wiadomości lub marketing.

Ilustracja: Hashtagi mogą służyć do wydobywania cennych spostrzeżeń i sentymentów, mających zastosowanie w wielu branżach.

Przykład branży wiadomości

Możemy śledzić najczęściej wymieniane hashtagi, aby wiedzieć, o jakich tematach ludzie rozmawiają najczęściej w mediach społecznościowych. Ponadto możemy śledzić określone hashtagi i ich tweety, aby wiedzieć, co ludzie mówią na określone tematy lub wydarzenia na świecie.

Przykład marketingowy

Możemy zbierać strumienie tweetów i, przeprowadzając analizę sentymentu, kategoryzować je i określać zainteresowania ludzi, aby kierować do nich oferty związane z ich zainteresowaniami.

Ponadto istnieje wiele przypadków użycia, które można zastosować specjalnie do analizy dużych zbiorów danych i które mogą służyć wielu branżom. Aby uzyskać więcej ogólnych przypadków użycia Apache Spark, sugeruję zapoznanie się z jednym z naszych poprzednich postów.

Zachęcam Cię do przeczytania więcej na temat Spark Streaming stąd, aby dowiedzieć się więcej o jego możliwościach i wykonać bardziej zaawansowaną transformację danych, aby uzyskać więcej informacji w czasie rzeczywistym z jego wykorzystaniem.