Apache Spark Streaming 教程:識別趨勢 Twitter Hashtags
已發表: 2022-03-11如今,數據的增長和積累速度比以往任何時候都快。 目前,我們世界上產生的所有數據中約有 90% 僅在過去兩年內產生。 由於這種驚人的增長速度,大數據平台不得不採取激進的解決方案來維護如此龐大的數據量。
當今的主要數據來源之一是社交網絡。 請允許我演示一個真實的示例:使用最重要的大數據回顯解決方案之一——Apache Spark 和 Python,實時處理、分析和提取社交網絡數據中的見解。
在本文中,我將教你如何構建一個簡單的應用程序,該應用程序使用 Python 從 Twitter 讀取在線流,然後使用 Apache Spark Streaming 處理推文以識別主題標籤,最後返回熱門主題標籤並以真實的方式呈現這些數據-時間儀表板。
為 Twitter API 創建自己的憑據
為了從 Twitter 獲取推文,您需要通過單擊“創建新應用”在 TwitterApps 上註冊,然後填寫下面的表格,單擊“創建您的 Twitter 應用”。
其次,轉到您新創建的應用程序並打開“密鑰和訪問令牌”選項卡。 然後單擊“生成我的訪問令牌”。
您的新訪問令牌將如下所示。
現在您已準備好進行下一步。
構建 Twitter HTTP 客戶端
在這一步中,我將向您展示如何構建一個簡單的客戶端,該客戶端將使用 Python 從 Twitter API 獲取推文並將它們傳遞給 Spark Streaming 實例。 對於任何專業的 Python 開發人員來說,它都應該很容易理解。
首先,讓我們創建一個名為twitter_app.py
的文件,然後我們將在其中添加代碼,如下所示。
導入我們將使用的庫,如下所示:
import socket import sys import requests import requests_oauthlib import json
並添加將在 OAuth 中用於連接 Twitter 的變量,如下所示:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
現在,我們將創建一個名為get_tweets
的新函數,該函數將調用 Twitter API URL 並返回推文流的響應。
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
然後,創建一個函數,從上述響應中獲取響應,並從整個推文的 JSON 對像中提取推文的文本。 之後,它通過 TCP 連接將每條推文發送到 Spark Streaming 實例(將在後面討論)。
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
現在,我們將製作主要部分,該部分將建立 Spark 將連接的應用程序主機套接字連接。 我們將在這裡將 IP 配置為localhost
,因為所有這些都將在同一台機器和端口9009
上運行。 然後我們將調用我們在上面創建的get_tweets
方法,用於從 Twitter 獲取推文,並將其響應與套接字連接一起傳遞給send_tweets_to_spark
以將推文發送到 Spark。
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
設置我們的 Apache Spark 流應用程序
讓我們構建我們的 Spark 流應用程序,它將對傳入的推文進行實時處理,從中提取主題標籤,併計算提及了多少主題標籤。
首先,我們必須創建一個 Spark Context sc
的實例,然後我們從sc
創建 Streaming Context ssc
,批處理間隔為 2 秒,它將對每兩秒接收到的所有流進行轉換。 請注意,我們已將日誌級別設置為ERROR
,以禁用 Spark 寫入的大部分日誌。
我們在這裡定義了一個檢查點,以允許定期的 RDD 檢查點; 這是必須在我們的應用程序中使用的,因為我們將使用有狀態轉換(稍後將在同一部分中討論)。
然後我們定義我們的主 DStream 數據流,它將連接到我們之前在端口9009
上創建的套接字服務器,並從該端口讀取推文。 DStream 中的每條記錄都是一條推文。
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
現在,我們將定義我們的轉換邏輯。 首先,我們將所有推文拆分為單詞並將它們放入單詞 RDD 中。 然後我們將只過濾所有單詞中的主題標籤並將它們映射到一對(hashtag, 1)
並將它們放入主題標籤 RDD 中。
然後我們需要計算該主題標籤被提及的次數。 我們可以通過使用函數reduceByKey
來做到這一點。 此函數將計算每個批次的主題標籤被提及的次數,即它將重置每個批次的計數。
在我們的例子中,我們需要計算所有批次的計數,因此我們將使用另一個名為updateStateByKey
的函數,因為該函數允許您在使用新數據更新 RDD 的同時保持其狀態。 這種方式稱為Stateful Transformation
。
請注意,為了使用updateStateByKey
,您必須配置一個檢查點,以及我們在上一步中所做的。
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
將一個函數作為參數,稱為update
函數。 它在 RDD 中的每個項目上運行並執行所需的邏輯。
在我們的例子中,我們創建了一個名為aggregate_tags_count
的更新函數,它將對每個主題標籤的所有new_values
求和,並將它們添加到total_sum
中,total_sum 是所有批次的總和,並將數據保存到tags_totals
RDD 中。
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
然後我們在每批中對tags_totals
RDD 進行處理,以便使用 Spark SQL 上下文將其轉換為臨時表,然後執行 select 語句以檢索前十個主題標籤及其計數並將它們放入hashtag_counts_df
數據幀。
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
我們 Spark 應用程序的最後一步是將hashtag_counts_df
數據幀發送到儀表板應用程序。 因此,我們將數據框轉換為兩個數組,一個用於主題標籤,另一個用於計數。 然後我們將通過 REST API 將它們發送到儀表板應用程序。

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
最後,這裡是運行和打印hashtag_counts_df
時 Spark Streaming 的示例輸出,您會注意到根據批處理間隔每兩秒精確打印一次輸出。
創建一個簡單的實時儀表板來表示數據
現在,我們將創建一個簡單的儀表板應用程序,該應用程序將由 Spark 實時更新。 我們將使用 Python、Flask 和 Charts.js 構建它。
首先,讓我們創建一個結構如下所示的 Python 項目,然後下載 Chart.js 文件並將其添加到靜態目錄中。
然後,在app.py
文件中,我們將創建一個名為update_data
的函數,Spark 將通過 URL http://localhost:5001/updateData
該函數,以更新全局標籤和值數組。
此外,創建函數refresh_graph_data
以供 AJAX 請求調用,以將新更新的標籤和值數組作為 JSON 返回。 函數get_chart_page
將在調用時呈現chart.html
頁面。
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
現在,讓我們在chart.html
文件中創建一個簡單的圖表,以便顯示主題標籤數據並實時更新它們。 如下定義,我們需要導入Chart.js
和jquery.min.js
JavaScript 庫。
在 body 標籤中,我們必須創建一個畫布並為其提供一個 ID,以便在下一步使用 JavaScript 顯示圖表時引用它。
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
現在,讓我們使用下面的 JavaScript 代碼構建圖表。 首先,我們得到 canvas 元素,然後我們創建一個新的圖表對象並將 canvas 元素傳遞給它並定義它的數據對象,如下所示。
請注意,數據的標籤和數據與在app.py
文件中調用get_chart_page
函數時渲染頁面時返回的標籤和值變量綁定。
最後剩下的部分是配置為每秒執行一次 Ajax 請求並調用 URL /refreshData
的函數,它將執行app.py
中的refresh_graph_data
並返回新的更新數據,然後更新呈現新數據的 char。
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
一起運行應用程序
讓我們按以下順序運行這三個應用程序: 1. Twitter App Client。 2. 星火應用。 3.儀表板網絡應用程序。
然後您可以使用 URL <http://localhost:5001/>
訪問實時儀表板
現在,您可以看到您的圖表正在更新,如下所示:
Apache Streaming 真實生活用例
我們已經學習瞭如何使用 Spark Streaming 實時對數據進行簡單的數據分析,並使用 RESTful Web 服務將其直接與簡單的儀表板集成。 從這個例子中,我們可以看到 Spark 的強大之處,因為它可以捕獲大量數據流,對其進行轉換,並提取有價值的見解,這些見解可以很容易地用於立即做出決策。 有許多有用的用例可以實施,可以服務於不同的行業,如新聞或營銷。
新聞行業示例
我們可以跟踪最常提及的主題標籤,以了解人們在社交媒體上談論最多的話題。 此外,我們可以跟踪特定的主題標籤及其推文,以了解人們對世界上特定主題或事件的看法。
營銷示例
我們可以收集推文流,並通過進行情緒分析,對其進行分類並確定人們的興趣,以便針對他們提供與其興趣相關的優惠。
此外,還有很多用例可以專門用於大數據分析,可以服務於很多行業。 對於更多 Apache Spark 的一般用例,我建議您查看我們之前的一篇文章。
我鼓勵您從這裡閱讀更多關於 Spark Streaming 的信息,以便更多地了解它的功能,並對數據進行更高級的轉換,以獲得更多實時使用它的見解。