Apache Spark Streaming Tutorial: Identifizieren von Trend-Twitter-Hashtags

Veröffentlicht: 2022-03-11

Heutzutage wachsen und häufen sich Daten schneller an als je zuvor. Aktuell sind rund 90 % aller auf unserer Welt generierten Daten erst in den letzten zwei Jahren entstanden. Aufgrund dieser atemberaubenden Wachstumsrate mussten Big-Data-Plattformen radikale Lösungen finden, um solch riesige Datenmengen zu verwalten.

Eine der wichtigsten Datenquellen sind heute soziale Netzwerke. Erlauben Sie mir, ein Beispiel aus dem wirklichen Leben zu demonstrieren: den Umgang, die Analyse und das Extrahieren von Erkenntnissen aus Daten sozialer Netzwerke in Echtzeit mit einer der wichtigsten Big-Data-Echo-Lösungen auf dem Markt – Apache Spark und Python.

Apache Spark Streaming kann verwendet werden, um Erkenntnisse aus sozialen Medien zu extrahieren, z. B. Twitter-Hashtags im Trend

In diesem Artikel zeige ich Ihnen, wie Sie eine einfache Anwendung erstellen, die Online-Streams von Twitter mit Python liest, dann die Tweets mit Apache Spark Streaming verarbeitet, um Hashtags zu identifizieren, und schließlich die Top-Trend-Hashtags zurückgibt und diese Daten in einem echten darstellt -Zeit-Dashboard.

Erstellen Ihrer eigenen Anmeldeinformationen für Twitter-APIs

Um Tweets von Twitter zu erhalten, müssen Sie sich bei TwitterApps registrieren, indem Sie auf „Neue App erstellen“ klicken und dann das folgende Formular ausfüllen und auf „Erstellen Sie Ihre Twitter-App“ klicken.

Screenshot: So erstellen Sie Ihre Twitter-App.

Zweitens gehen Sie zu Ihrer neu erstellten App und öffnen Sie die Registerkarte „Schlüssel und Zugriffstoken“. Klicken Sie dann auf „Meinen Zugriffstoken generieren“.

Screenshot: Einrichten von Anmeldeinformationen, Schlüsseln und Zugriffstoken für die Twitter-App.

Ihre neuen Zugriffstoken werden wie unten angezeigt.

Screenshot: Einrichtung des Zugriffstokens für die Twitter-App.

Und jetzt sind Sie bereit für den nächsten Schritt.

Erstellen des Twitter-HTTP-Clients

In diesem Schritt zeige ich Ihnen, wie Sie einen einfachen Client erstellen, der die Tweets von der Twitter-API mit Python abruft und an die Spark Streaming-Instanz weiterleitet. Es sollte für jeden professionellen Python-Entwickler einfach zu befolgen sein.

Zuerst erstellen wir eine Datei namens twitter_app.py und fügen dann den Code wie unten beschrieben zusammen.

Importieren Sie die Bibliotheken, die wir wie folgt verwenden werden:

 import socket import sys import requests import requests_oauthlib import json

Und fügen Sie die Variablen hinzu, die in OAuth für die Verbindung mit Twitter verwendet werden, wie unten:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Jetzt erstellen wir eine neue Funktion namens get_tweets , die die Twitter-API-URL aufruft und die Antwort für einen Stream von Tweets zurückgibt.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Erstellen Sie dann eine Funktion, die die Antwort von der obigen übernimmt und den Text der Tweets aus dem JSON-Objekt des gesamten Tweets extrahiert. Danach sendet es jeden Tweet über eine TCP-Verbindung an die Spark Streaming-Instanz (wird später besprochen).

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Jetzt machen wir den Hauptteil, der die App-Host-Socket-Verbindungen herstellt, mit denen Spark eine Verbindung herstellt. Wir konfigurieren die IP hier als localhost , da alle auf demselben Computer und dem Port 9009 ausgeführt werden. Dann rufen wir die Methode get_tweets , die wir oben erstellt haben, um die Tweets von Twitter abzurufen, und leiten ihre Antwort zusammen mit der Socket-Verbindung an send_tweets_to_spark , um die Tweets an Spark zu senden.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Einrichten unserer Apache Spark Streaming-Anwendung

Lassen Sie uns unsere Spark-Streaming-App aufbauen, die die eingehenden Tweets in Echtzeit verarbeitet, die Hashtags daraus extrahiert und berechnet, wie viele Hashtags erwähnt wurden.

Abbildung: Spark-Streaming ermöglicht die Echtzeitverarbeitung eingehender Tweets und die Hashtag-Extraktion

Zuerst müssen wir eine Instanz von Spark Context sc erstellen, dann haben wir den Streaming-Kontext ssc aus sc mit einem Batch-Intervall von zwei Sekunden erstellt, das die Transformation für alle alle zwei Sekunden empfangenen Streams durchführt. Beachten Sie, dass wir die Protokollebene auf ERROR gesetzt haben, um die meisten Protokolle zu deaktivieren, die Spark schreibt.

Wir haben hier einen Checkpoint definiert, um regelmäßiges RDD-Checkpointing zu ermöglichen; Dies ist zwingend erforderlich, um in unserer App verwendet zu werden, da wir zustandsbehaftete Transformationen verwenden (wird später im selben Abschnitt besprochen).

Dann definieren wir unseren Haupt-DStream-Datenstrom, der sich mit dem Socket-Server verbindet, den wir zuvor auf Port 9009 erstellt haben, und lesen die Tweets von diesem Port. Jeder Datensatz im DStream ist ein Tweet.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Jetzt definieren wir unsere Transformationslogik. Zuerst werden wir alle Tweets in Wörter aufteilen und sie in RDD-Wörter fassen. Dann filtern wir nur Hashtags aus allen Wörtern und ordnen sie Paaren von (hashtag, 1) zu und fügen sie in Hashtags RDD ein.

Dann müssen wir berechnen, wie oft der Hashtag erwähnt wurde. Wir können dies tun, indem wir die Funktion reduceByKey . Diese Funktion berechnet, wie oft der Hashtag pro Batch erwähnt wurde, dh sie setzt die Zählungen in jedem Batch zurück.

In unserem Fall müssen wir die Zählungen für alle Batches berechnen, also verwenden wir eine andere Funktion namens updateStateByKey , da diese Funktion es Ihnen ermöglicht, den Status von RDD beizubehalten, während Sie es mit neuen Daten aktualisieren. Dieser Weg wird Stateful Transformation genannt.

Beachten Sie, dass Sie zur Verwendung von updateStateByKey einen Prüfpunkt konfigurieren müssen, und zwar das, was wir im vorherigen Schritt getan haben.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

Der updateStateByKey übernimmt eine Funktion als Parameter namens update -Funktion. Es läuft auf jedem Element in RDD und führt die gewünschte Logik aus.

In unserem Fall haben wir eine Aktualisierungsfunktion namens „ aggregate_tags_count “ erstellt, die alle new_values für jeden Hashtag summiert und zu total_sum hinzufügt, d. h. der Summe aller Batches, und die Daten in tags_totals RDD speichert.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Dann verarbeiten wir tags_totals RDD in jedem Batch, um es mit Spark SQL Context in eine temporäre Tabelle zu konvertieren, und führen dann eine select-Anweisung aus, um die Top-Ten-Hashtags mit ihren Zählungen abzurufen und sie in hashtag_counts_df -Datenrahmen einzufügen.

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Der letzte Schritt in unserer Spark-Anwendung besteht darin, den hashtag_counts_df an die Dashboard-Anwendung zu senden. Also konvertieren wir den Datenrahmen in zwei Arrays, eines für die Hashtags und das andere für ihre Anzahl. Dann senden wir sie über die REST-API an die Dashboard-Anwendung.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Hier ist schließlich eine Beispielausgabe des Spark-Streamings beim Ausführen und Drucken von hashtag_counts_df . Sie werden feststellen, dass die Ausgabe gemäß den Stapelintervallen genau alle zwei Sekunden gedruckt wird.

Ein Beispiel für die Twitter Spark-Streaming-Ausgabe, die pro Batch-Intervalleinstellungen gedruckt wird

Erstellen Sie ein einfaches Echtzeit-Dashboard zur Darstellung der Daten

Jetzt erstellen wir eine einfache Dashboard-Anwendung, die von Spark in Echtzeit aktualisiert wird. Wir erstellen es mit Python, Flask und Charts.js.

Lassen Sie uns zunächst ein Python-Projekt mit der unten gezeigten Struktur erstellen und die Datei Chart.js herunterladen und in das statische Verzeichnis einfügen.

Abbildung: Erstellen eines Python-Projekts zur Verwendung in der Twitter-Hashtag-Analyse

Dann erstellen wir in der Datei app.py eine Funktion mit dem Namen update_data , die von Spark über die URL http://localhost:5001/updateData wird, um die globalen Beschriftungen und Wertearrays zu aktualisieren.

Außerdem wird die Funktion refresh_graph_data erstellt, die von einer AJAX-Anforderung aufgerufen wird, um die neuen aktualisierten Beschriftungen und Wertearrays als JSON zurückzugeben. Die Funktion get_chart_page rendert die Seite chart.html , wenn sie aufgerufen wird.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Lassen Sie uns nun ein einfaches Diagramm in der Datei chart.html erstellen, um die Hashtag-Daten anzuzeigen und in Echtzeit zu aktualisieren. Wie unten definiert, müssen wir die JavaScript-Bibliotheken Chart.js und jquery.min.js importieren.

Im Body-Tag müssen wir einen Canvas erstellen und ihm eine ID geben, um ihn zu referenzieren, während wir im nächsten Schritt das Diagramm mit JavaScript anzeigen.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Lassen Sie uns nun das Diagramm mit dem folgenden JavaScript-Code erstellen. Zuerst erhalten wir das Canvas-Element, und dann erstellen wir ein neues Diagrammobjekt, übergeben das Canvas-Element daran und definieren sein Datenobjekt wie unten.

Beachten Sie, dass die Bezeichnungen und Daten der Daten mit Bezeichnungen und Wertevariablen begrenzt sind, die beim Rendern der Seite zurückgegeben werden, wenn eine get_chart_page Funktion in der Datei app.py “ aufgerufen wird.

Der letzte verbleibende Teil ist die Funktion, die so konfiguriert ist, dass sie jede Sekunde eine Ajax-Anforderung ausführt und die URL /refreshData , die refresh_graph_data in app.py und die neuen aktualisierten Daten zurückgibt und dann das Zeichen aktualisiert, das die neuen Daten rendert.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Ausführen der Anwendungen zusammen

Lassen Sie uns die drei Anwendungen in der folgenden Reihenfolge ausführen: 1. Twitter App Client. 2. Spark-App. 3. Dashboard-Web-App.

Anschließend können Sie über die URL <http://localhost:5001/> auf das Echtzeit-Dashboard zugreifen

Jetzt können Sie sehen, wie Ihr Diagramm wie folgt aktualisiert wird:

Animation: Echtzeit-Twitter-Hashtag-Diagramm

Apache-Streaming Anwendungsfälle aus der Praxis

Wir haben gelernt, wie man einfache Datenanalysen von Daten in Echtzeit mit Spark Streaming durchführt und sie über einen RESTful-Webdienst direkt in ein einfaches Dashboard integriert. An diesem Beispiel können wir sehen, wie leistungsfähig Spark ist, da es einen riesigen Datenstrom erfasst, transformiert und wertvolle Erkenntnisse extrahiert, die einfach verwendet werden können, um in kürzester Zeit Entscheidungen zu treffen. Es gibt viele hilfreiche Anwendungsfälle, die implementiert werden können und die verschiedenen Branchen wie Nachrichten oder Marketing dienen können.

Illustration: Hashtags können verwendet werden, um wertvolle Erkenntnisse und Stimmungen zu extrahieren, die in mehreren Branchen anwendbar sind.

Beispiel aus der Nachrichtenbranche

Wir können die am häufigsten erwähnten Hashtags verfolgen, um zu wissen, über welche Themen die Leute in den sozialen Medien am meisten sprechen. Außerdem können wir bestimmte Hashtags und ihre Tweets verfolgen, um zu erfahren, was Menschen über bestimmte Themen oder Ereignisse in der Welt sagen.

Beispiel Marketing

Wir können den Stream von Tweets sammeln und sie durch Stimmungsanalysen kategorisieren und die Interessen der Menschen bestimmen, um sie mit Angeboten zu treffen, die ihren Interessen entsprechen.

Außerdem gibt es viele Anwendungsfälle, die speziell für Big-Data-Analysen angewendet werden können und vielen Branchen dienen können. Für weitere Apache Spark-Anwendungsfälle im Allgemeinen schlage ich vor, dass Sie sich einen unserer vorherigen Posts ansehen.

Ich ermutige Sie, von hier aus mehr über Spark Streaming zu lesen, um mehr über seine Fähigkeiten zu erfahren und eine fortgeschrittenere Transformation von Daten durchzuführen, um mehr Einblicke in Echtzeit zu erhalten.