Apache Sparkストリーミングチュートリアル:トレンドのTwitterハッシュタグの特定

公開: 2022-03-11

今日、データはかつてないほど急速に成長し、蓄積されています。 現在、私たちの世界で生成されたすべてのデータの約90%は、過去2年間にのみ生成されました。 この驚異的な成長率のために、ビッグデータプラットフォームは、そのような膨大な量のデータを維持するために根本的なソリューションを採用する必要がありました。

今日の主なデータソースの1つは、ソーシャルネットワークです。 実際の例を示しましょう。最も重要なビッグデータエコーソリューションの1つであるApacheSparkとPythonを使用して、ソーシャルネットワークデータからリアルタイムで洞察を処理、分析、抽出します。

Apache Spark Streamingを使用して、トレンドのTwitterハッシュタグなどのソーシャルメディアから洞察を抽出できます。

この記事では、Pythonを使用してTwitterからオンラインストリームを読み取り、Apache Spark Streamingを使用してツイートを処理してハッシュタグを識別し、最後にトレンドのハッシュタグを返し、このデータを実際に表す簡単なアプリケーションを構築する方法を説明します。 -タイムダッシュボード。

TwitterAPI用の独自のクレデンシャルを作成する

Twitterからツイートを取得するには、[新しいアプリの作成]をクリックしてTwitterAppsに登録し、以下のフォームに入力して[Twitterアプリの作成]をクリックする必要があります。

スクリーンショット:Twitterアプリの作成方法。

次に、新しく作成したアプリに移動し、[キーとアクセストークン]タブを開きます。 次に、「アクセストークンを生成する」をクリックします。

スクリーンショット:Twitterアプリのクレデンシャル、キー、アクセストークンを設定します。

新しいアクセストークンは次のように表示されます。

スクリーンショット:Twitterアプリのアクセストークンの設定。

これで、次のステップの準備が整いました。

TwitterHTTPクライアントの構築

このステップでは、Pythonを使用してTwitter APIからツイートを取得し、それらをSparkStreamingインスタンスに渡す単純なクライアントを構築する方法を示します。 プロのPython開発者にとっては簡単に理解できるはずです。

まず、 twitter_app.pyというファイルを作成してから、以下のようにコードを追加します。

以下のように使用するライブラリをインポートします。

 import socket import sys import requests import requests_oauthlib import json

そして、以下のように、Twitterに接続するためにOAuthで使用される変数を追加します。

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

次に、Twitter API URLを呼び出し、ツイートのストリームに対する応答を返すget_tweetsという新しい関数を作成します。

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

次に、上記の応答を受け取り、ツイート全体のJSONオブジェクトからツイートのテキストを抽出する関数を作成します。 その後、TCP接続を介してすべてのツイートをSpark Streamingインスタンス(後で説明します)に送信します。

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

次に、sparkが接続するアプリホストソケット接続を作成する主要部分を作成します。 ここでは、すべてが同じマシンとポート9009で実行されるため、IPをlocalhostとして構成します。 次に、上記で作成したget_tweetsメソッドを呼び出して、Twitterからツイートを取得し、その応答をソケット接続とともにsend_tweets_to_sparkに渡して、ツイートをSparkに送信します。

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

ApacheSparkストリーミングアプリケーションのセットアップ

着信ツイートをリアルタイムで処理し、それらからハッシュタグを抽出して、言及されているハッシュタグの数を計算するSparkストリーミングアプリを構築しましょう。

イラスト:Sparkストリーミングにより、着信ツイートのリアルタイム処理とハッシュタグ抽出が可能になります

まず、Spark Context scのインスタンスを作成する必要があります。次に、2秒ごとに受信したすべてのストリームで変換を行う2秒のバッチ間隔でscからストリーミングコンテキストsscを作成しました。 Sparkが書き込むほとんどのログを無効にするために、ログレベルをERRORに設定していることに注意してください。

定期的なRDDチェックポイントを可能にするために、ここでチェックポイントを定義しました。 ステートフル変換を使用するため、これはアプリで使用する必要があります(同じセクションで後述します)。

次に、ポート9009で以前に作成したソケットサーバーに接続し、そのポートからツイートを読み取るメインのDStreamdataStreamを定義します。 DStreamの各レコードはツイートになります。

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

次に、変換ロジックを定義します。 まず、すべてのツイートを単語に分割し、単語RDDに入れます。 次に、すべての単語からハッシュタグのみをフィルタリングし、それらを(hashtag, 1)のペアにマップして、ハッシュタグRDDに配置します。

次に、ハッシュタグが言及された回数を計算する必要があります。 これは、関数reduceByKeyを使用して行うことができます。 この関数は、各バッチごとにハッシュタグが言及された回数を計算します。つまり、各バッチのカウントをリセットします。

この場合、すべてのバッチのカウントを計算する必要があるため、 updateStateByKeyという別の関数を使用します。この関数を使用すると、RDDの状態を維持しながら、新しいデータで更新できます。 この方法はStateful Transformationと呼ばれます。

updateStateByKeyを使用するには、チェックポイントを構成する必要があり、前の手順で行ったことに注意してください。

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKeyは、関数をupdate関数と呼ばれるパラメーターとして受け取ります。 RDDの各アイテムで実行され、必要なロジックを実行します。

この例では、 aggregate_tags_countという更新関数を作成しました。この関数は、各ハッシュタグのすべてのnew_valuesを合計し、すべてのバッチの合計であるtotal_sumに追加して、データをtags_totalsに保存します。

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

次に、すべてのバッチでtags_totals RDDを処理して、Spark SQL Contextを使用して一時テーブルに変換し、selectステートメントを実行して、上位10個のハッシュタグとそのカウントを取得し、 hashtag_counts_dfデータフレームに配置します。

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Sparkアプリケーションの最後のステップは、 hashtag_counts_dfデータフレームをダッシュ​​ボードアプリケーションに送信することです。 したがって、データフレームを2つの配列に変換します。1つはハッシュタグ用で、もう1つはそれらのカウント用です。 次に、RESTAPIを介してダッシュボードアプリケーションに送信します。

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

最後に、 hashtag_counts_dfの実行および印刷中のSpark Streamingの出力例を示します。出力は、バッチ間隔に従って正確に2秒ごとに出力されます。

バッチ間隔設定ごとに印刷されたTwitterSparkストリーミング出力の例

データを表現するためのシンプルなリアルタイムダッシュボードを作成する

次に、Sparkによってリアルタイムで更新されるシンプルなダッシュボードアプリケーションを作成します。 Python、Flask、Charts.jsを使用してビルドします。

まず、以下のような構造のPythonプロジェクトを作成し、Chart.jsファイルをダウンロードして静的ディレクトリに追加しましょう。

イラスト:Twitterハッシュタグ分析で使用するPythonプロジェクトの作成

次に、 app.pyファイルにupdate_dataという関数を作成します。この関数は、グローバルラベルと値の配列を更新するためにURL http://localhost:5001/updateDataを介してSparkによって呼び出されます。

また、関数refresh_graph_dataは、AJAXリクエストによって呼び出されるように作成され、新しく更新されたラベルと値の配列をJSONとして返します。 関数get_chart_pageは、呼び出されたときにchart.htmlページをレンダリングします。

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

それでは、ハッシュタグデータを表示してリアルタイムで更新するために、 chart.htmlファイルに簡単なグラフを作成しましょう。 以下に定義するように、 Chart.jsおよびjquery.min.jsライブラリをインポートする必要があります。

bodyタグでは、次のステップでJavaScriptを使用してグラフを表示するときに参照するために、キャンバスを作成してIDを指定する必要があります。

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

それでは、以下のJavaScriptコードを使用してチャートを作成しましょう。 まず、canvas要素を取得し、次に新しいグラフオブジェクトを作成してcanvas要素を渡し、以下のようにそのデータオブジェクトを定義します。

データのラベルとデータは、 app.pyファイルのget_chart_page関数を呼び出すときにページのレンダリング中に返されるラベルと値の変数で囲まれていることに注意してください。

最後の残りの部分は、毎秒Ajaxリクエストを実行し、URL /refreshDataを呼び出すように構成された関数です。これにより、 refresh_graph_dataapp.pyが実行され、新しい更新データが返され、新しいデータをレンダリングするcharが更新されます。

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

アプリケーションを一緒に実行する

以下の順序で3つのアプリケーションを実行してみましょう。1。Twitterアプリクライアント。 2.スパークアプリ。 3.ダッシュボードWebアプリ。

次に、URL <http://localhost:5001/>を使用してリアルタイムダッシュボードにアクセスできます。

これで、以下のようにチャートが更新されているのを確認できます。

アニメーション:リアルタイムのTwitterトレンドハッシュタグチャート

Apacheストリーミングの実際のユースケース

Spark Streamingを使用してリアルタイムでデータの簡単なデータ分析を行い、RESTfulWebサービスを使用して簡単なダッシュボードと直接統合する方法を学びました。 この例から、大量のデータストリームをキャプチャして変換し、すぐに意思決定を行うために簡単に使用できる貴重な洞察を抽出するため、Sparkがいかに強力であるかがわかります。 実装でき、ニュースやマーケティングなどのさまざまな業界に役立つ多くの役立つユースケースがあります。

イラスト:ハッシュタグは、複数の業界に適用できる貴重な洞察と感情を抽出するために使用できます。

ニュース業界の例

最も頻繁に言及されるハッシュタグを追跡して、ソーシャルメディアで人々が最も話題にしているトピックを知ることができます。 また、特定のハッシュタグとそのツイートを追跡して、世界中の特定のトピックやイベントについて人々が何を言っているかを知ることができます。

マーケティングの例

ツイートのストリームを収集し、感情分析を行うことで、それらを分類し、人々の興味を判断して、彼らの興味に関連するオファーでそれらをターゲットにすることができます。

また、ビッグデータ分析に特に適用でき、多くの業界に役立つユースケースがたくさんあります。 一般的なApacheSparkのユースケースの詳細については、以前の投稿の1つを確認することをお勧めします。

Spark Streamingの機能について詳しく知り、データをより高度に変換してリアルタイムでより多くの洞察を得るために、ここからSparkStreamingの詳細を読むことをお勧めします。