Apache Spark Streaming 教程:识别趋势 Twitter Hashtags

已发表: 2022-03-11

如今,数据的增长和积累速度比以往任何时候都快。 目前,我们世界上产生的所有数据中约有 90% 仅在过去两年内产生。 由于这种惊人的增长速度,大数据平台不得不采取激进的解决方案来维护如此庞大的数据量。

当今的主要数据来源之一是社交网络。 请允许我演示一个真实的示例:使用最重要的大数据回显解决方案之一——Apache Spark 和 Python,实时处理、分析和提取社交网络数据中的见解。

Apache Spark Streaming 可用于从社交媒体中提取见解,例如趋势 Twitter 主题标签

在本文中,我将教你如何构建一个简单的应用程序,该应用程序使用 Python 从 Twitter 读取在线流,然后使用 Apache Spark Streaming 处理推文以识别主题标签,最后返回热门主题标签并在真实的-时间仪表板。

为 Twitter API 创建自己的凭据

为了从 Twitter 获取推文,您需要通过单击“创建新应用”在 TwitterApps 上注册,然后填写下面的表格,单击“创建您的 Twitter 应用”。

屏幕截图:如何创建您的 Twitter 应用程序。

其次,转到您新创建的应用程序并打开“密钥和访问令牌”选项卡。 然后单击“生成我的访问令牌”。

屏幕截图:设置 Twitter 应用凭据、密钥和访问令牌。

您的新访问令牌将如下所示。

屏幕截图:Twitter 应用访问令牌设置。

现在您已准备好进行下一步。

构建 Twitter HTTP 客户端

在这一步中,我将向您展示如何构建一个简单的客户端,该客户端将使用 Python 从 Twitter API 获取推文并将它们传递给 Spark Streaming 实例。 对于任何专业的 Python 开发人员来说,它都应该很容易理解。

首先,让我们创建一个名为twitter_app.py的文件,然后我们将在其中添加代码,如下所示。

导入我们将使用的库,如下所示:

 import socket import sys import requests import requests_oauthlib import json

并添加将在 OAuth 中用于连接 Twitter 的变量,如下所示:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

现在,我们将创建一个名为get_tweets的新函数,该函数将调用 Twitter API URL 并返回推文流的响应。

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

然后,创建一个函数,从上述响应中获取响应,并从整个推文的 JSON 对象中提取推文的文本。 之后,它通过 TCP 连接将每条推文发送到 Spark Streaming 实例(将在后面讨论)。

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

现在,我们将制作主要部分,该部分将建立 Spark 将连接的应用程序主机套接字连接。 我们将在这里将 IP 配置为localhost ,因为所有这些都将在同一台机器和端口9009上运行。 然后我们将调用我们在上面创建的get_tweets方法,用于从 Twitter 获取推文,并将其响应与套接字连接一起传递给send_tweets_to_spark以将推文发送到 Spark。

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

设置我们的 Apache Spark 流应用程序

让我们构建我们的 Spark 流应用程序,它将对传入的推文进行实时处理,从中提取主题标签,并计算提及了多少主题标签。

插图:Spark 流允许实时处理传入的推文和主题标签提取

首先,我们必须创建一个 Spark Context sc的实例,然后我们从sc创建 Streaming Context ssc ,批处理间隔为 2 秒,它将对每两秒接收到的所有流进行转换。 请注意,我们已将日志级别设置为ERROR ,以禁用 Spark 写入的大部分日志。

我们在这里定义了一个检查点,以允许定期的 RDD 检查点; 这是必须在我们的应用程序中使用的,因为我们将使用有状态转换(稍后将在同一部分中讨论)。

然后我们定义我们的主 DStream 数据流,它将连接到我们之前在端口9009上创建的套接字服务器,并从该端口读取推文。 DStream 中的每条记录都是一条推文。

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

现在,我们将定义我们的转换逻辑。 首先,我们将所有推文拆分为单词并将它们放入单词 RDD 中。 然后我们将只过滤所有单词中的主题标签并将它们映射到一对(hashtag, 1)并将它们放入主题标签 RDD 中。

然后我们需要计算该主题标签被提及的次数。 我们可以通过使用函数reduceByKey来做到这一点。 此函数将计算每个批次的主题标签被提及的次数,即它将重置每个批次的计数。

在我们的例子中,我们需要计算所有批次的计数,因此我们将使用另一个名为updateStateByKey的函数,因为该函数允许您在使用新数据更新 RDD 的同时保持其状态。 这种方式称为Stateful Transformation

请注意,为了使用updateStateByKey ,您必须配置一个检查点,以及我们在上一步中所做的。

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey将一个函数作为参数,称为update函数。 它在 RDD 中的每个项目上运行并执行所需的逻辑。

在我们的例子中,我们创建了一个名为aggregate_tags_count的更新函数,它将对每个主题标签的所有new_values求和,并将它们添加到total_sum中,total_sum 是所有批次的总和,并将数据保存到tags_totals RDD 中。

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

然后我们在每个批次中对tags_totals RDD 进行处理,以便使用 Spark SQL 上下文将其转换为临时表,然后执行 select 语句以检索前十个主题标签及其计数并将它们放入hashtag_counts_df数据帧。

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

我们 Spark 应用程序的最后一步是将hashtag_counts_df数据帧发送到仪表板应用程序。 因此,我们将数据框转换为两个数组,一个用于主题标签,另一个用于计数。 然后我们将通过 REST API 将它们发送到仪表板应用程序。

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

最后,这里是运行和打印hashtag_counts_df时 Spark Streaming 的示例输出,您会注意到根据批处理间隔每两秒精确打印一次输出。

Twitter Spark 流输出示例,按批次间隔设置打印

创建一个简单的实时仪表板来表示数据

现在,我们将创建一个简单的仪表板应用程序,该应用程序将由 Spark 实时更新。 我们将使用 Python、Flask 和 Charts.js 构建它。

首先,让我们创建一个结构如下所示的 Python 项目,然后下载 Chart.js 文件并将其添加到静态目录中。

插图:创建用于 Twitter 标签分析的 Python 项目

然后,在app.py文件中,我们将创建一个名为update_data的函数,Spark 将通过 URL http://localhost:5001/updateData该函数,以更新全局标签和值数组。

此外,创建函数refresh_graph_data以供 AJAX 请求调用,以将新更新的标签和值数组作为 JSON 返回。 函数get_chart_page将在调用时呈现chart.html页面。

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

现在,让我们在chart.html文件中创建一个简单的图表,以便显示主题标签数据并实时更新它们。 如下定义,我们需要导入Chart.jsjquery.min.js JavaScript 库。

在 body 标签中,我们必须创建一个画布并为其提供一个 ID,以便在下一步使用 JavaScript 显示图表时引用它。

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

现在,让我们使用下面的 JavaScript 代码构建图表。 首先,我们得到 canvas 元素,然后我们创建一个新的图表对象并将 canvas 元素传递给它并定义它的数据对象,如下所示。

请注意,数据的标签和数据与在app.py文件中调用get_chart_page函数时渲染页面时返回的标签和值变量绑定。

最后剩下的部分是配置为每秒执行一次 Ajax 请求并调用 URL /refreshData的函数,它将执行app.py中的refresh_graph_data并返回新的更新数据,然后更新呈现新数据的 char。

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

一起运行应用程序

让我们按以下顺序运行这三个应用程序: 1. Twitter App Client。 2. 星火应用。 3.仪表板网络应用程序。

然后您可以使用 URL <http://localhost:5001/>访问实时仪表板

现在,您可以看到您的图表正在更新,如下所示:

动画:实时 Twitter 趋势标签图表

Apache Streaming 真实生活用例

我们已经学习了如何使用 Spark Streaming 实时对数据进行简单的数据分析,并使用 RESTful Web 服务将其直接与简单的仪表板集成。 从这个例子中,我们可以看到 Spark 的强大之处,因为它可以捕获大量数据流,对其进行转换,并提取有价值的见解,这些见解可以很容易地用于立即做出决策。 有许多有用的用例可以实施,可以服务于不同的行业,如新闻或营销。

说明:标签可用于提取有价值的见解和情感,适用于多个行业。

新闻行业示例

我们可以跟踪最常提及的主题标签,以了解人们在社交媒体上谈论最多的话题。 此外,我们可以跟踪特定的主题标签及其推文,以了解人们对世界上特定主题或事件的看法。

营销示例

我们可以收集推文流,并通过进行情绪分析,对其进行分类并确定人们的兴趣,以便针对他们提供与其兴趣相关的优惠。

此外,还有很多用例可以专门用于大数据分析,可以服务于很多行业。 对于更多 Apache Spark 的一般用例,我建议您查看我们之前的一篇文章。

我鼓励您从这里阅读更多关于 Spark Streaming 的信息,以便更多地了解它的功能,并对数据进行更高级的转换,以获得更多实时使用它的见解。