Apache Spark 스트리밍 자습서: 인기 있는 Twitter 해시태그 식별
게시 됨: 2022-03-11오늘날 데이터는 그 어느 때보다 빠르게 증가하고 축적됩니다. 현재 전 세계에서 생성되는 모든 데이터의 약 90%는 지난 2년 동안에만 생성되었습니다. 이러한 엄청난 성장률로 인해 빅 데이터 플랫폼은 엄청난 양의 데이터를 유지하기 위해 급진적인 솔루션을 채택해야 했습니다.
오늘날 데이터의 주요 소스 중 하나는 소셜 네트워크입니다. 가장 중요한 빅 데이터 에코 솔루션 중 하나인 Apache Spark 및 Python을 사용하여 소셜 네트워크 데이터를 실시간으로 처리, 분석 및 추출하는 실제 예를 보여드리겠습니다.
이 기사에서는 Python을 사용하여 Twitter에서 온라인 스트림을 읽은 다음 Apache Spark Streaming을 사용하여 트윗을 처리하여 해시태그를 식별하고 마지막으로 인기 급상승 해시태그를 반환하고 이 데이터를 실제 - 시간 대시보드.
Twitter API에 대한 자신의 자격 증명 만들기
Twitter에서 트윗을 받으려면 "새 앱 만들기"를 클릭하여 TwitterApps에 등록한 다음 "내 Twitter 앱 만들기"를 클릭하여 아래 양식을 작성해야 합니다.
둘째, 새로 만든 앱으로 이동하여 "키 및 액세스 토큰" 탭을 엽니다. 그런 다음 "내 액세스 토큰 생성"을 클릭하십시오.
새 액세스 토큰이 아래와 같이 표시됩니다.
이제 다음 단계를 위한 준비가 되었습니다.
Twitter HTTP 클라이언트 구축
이 단계에서는 Python을 사용하여 Twitter API에서 트윗을 가져와 Spark Streaming 인스턴스에 전달하는 간단한 클라이언트를 빌드하는 방법을 보여 드리겠습니다. 전문 Python 개발자라면 누구나 쉽게 따라할 수 있어야 합니다.
먼저 twitter_app.py
라는 파일을 만들고 그 안에 아래와 같이 코드를 추가해 보겠습니다.
아래와 같이 사용할 라이브러리를 가져옵니다.
import socket import sys import requests import requests_oauthlib import json
그리고 아래와 같이 Twitter에 연결하기 위해 OAuth에서 사용할 변수를 추가합니다.
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
이제 Twitter API URL을 호출하고 트윗 스트림에 대한 응답을 반환하는 get_tweets
라는 새 함수를 만듭니다.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
그런 다음 위의 응답을 가져와 전체 트윗의 JSON 개체에서 트윗의 텍스트를 추출하는 함수를 만듭니다. 그 후 TCP 연결을 통해 모든 트윗을 Spark Streaming 인스턴스(나중에 설명함)로 보냅니다.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
이제 spark가 연결할 앱 호스트 소켓 연결을 만드는 주요 부분을 만들 것입니다. 여기에서 IP를 localhost
로 구성할 것입니다. 모두 동일한 머신과 포트 9009
에서 실행될 것이기 때문입니다. 그런 다음 위에서 만든 get_tweets
메서드를 호출하여 Twitter에서 트윗을 가져오고 해당 응답을 소켓 연결과 함께 소켓 연결과 함께 전달하여 트윗을 send_tweets_to_spark
로 전송합니다.
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
Apache Spark 스트리밍 애플리케이션 설정
들어오는 트윗을 실시간으로 처리하고, 해시태그를 추출하고, 언급된 해시태그 수를 계산하는 Spark 스트리밍 앱을 구축해 보겠습니다.
먼저 Spark Context sc
의 인스턴스를 생성한 다음 2초마다 수신되는 모든 스트림에 대해 변환을 수행하는 배치 간격 2초로 sc
에서 Streaming Context ssc
를 생성해야 합니다. Spark가 작성하는 대부분의 로그를 비활성화하기 위해 로그 수준을 ERROR
로 설정했습니다.
주기적인 RDD 체크포인트를 허용하기 위해 여기에서 체크포인트를 정의했습니다. 이것은 상태 저장 변환을 사용하므로 앱에서 반드시 사용해야 합니다(같은 섹션의 뒷부분에서 설명함).
그런 다음 포트 9009
에서 이전에 만든 소켓 서버에 연결하고 해당 포트에서 트윗을 읽을 기본 DStream dataStream을 정의합니다. DStream의 각 레코드는 트윗이 됩니다.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
이제 변환 논리를 정의하겠습니다. 먼저 모든 트윗을 단어로 나누고 단어 RDD에 넣습니다. 그런 다음 모든 단어에서 해시태그만 필터링하여 (hashtag, 1)
쌍에 매핑하고 해시태그 RDD에 넣습니다.
그런 다음 해시태그가 언급된 횟수를 계산해야 합니다. reduceByKey
함수를 사용하여 이를 수행할 수 있습니다. 이 기능은 각 배치당 해시태그가 언급된 횟수를 계산합니다. 즉, 각 배치의 카운트를 재설정합니다.
우리의 경우 모든 배치에 대한 카운트를 계산해야 하므로 updateStateByKey
라는 다른 함수를 사용합니다. 이 함수를 사용하면 새 데이터로 업데이트하는 동안 RDD의 상태를 유지할 수 있기 때문입니다. 이 방법을 Stateful Transformation
이라고 합니다.
updateStateByKey
를 사용하려면 체크포인트를 구성해야 하며 이전 단계에서 수행한 작업에 유의하십시오.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
는 update
함수라는 매개변수로 함수를 사용합니다. RDD의 각 항목에서 실행되고 원하는 논리를 수행합니다.
우리의 경우 각 해시태그에 대한 모든 new_values
를 합산하고 모든 배치의 합계인 total_sum
에 추가하고 데이터를 tags_totals
RDD에 저장하는 aggregate_tags_count
_태그_카운트라는 업데이트 함수를 만들었습니다.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
그런 다음 Spark SQL 컨텍스트를 사용하여 임시 테이블로 변환하기 위해 모든 배치에서 tags_totals
RDD에 대한 처리를 수행한 다음 상위 10개의 해시태그를 검색하여 해시태그 hashtag_counts_df
데이터 프레임에 저장하기 위해 select 문을 수행합니다.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
Spark 애플리케이션의 마지막 단계는 hashtag_counts_df
데이터 프레임을 대시보드 애플리케이션으로 보내는 것입니다. 그래서 우리는 데이터 프레임을 두 개의 배열로 변환할 것입니다. 하나는 해시태그용이고 다른 하나는 카운트용입니다. 그런 다음 REST API를 통해 대시보드 애플리케이션으로 전송합니다.

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
마지막으로, 다음은 hashtag_counts_df
를 실행하고 인쇄하는 동안 Spark Streaming의 샘플 출력입니다. 출력이 배치 간격에 따라 정확히 2초마다 인쇄되는 것을 알 수 있습니다.
데이터를 나타내는 간단한 실시간 대시보드 만들기
이제 Spark에 의해 실시간으로 업데이트되는 간단한 대시보드 애플리케이션을 만들 것입니다. Python, Flask 및 Charts.js를 사용하여 빌드합니다.
먼저 아래와 같은 구조로 Python 프로젝트를 생성하고 Chart.js 파일을 다운받아 정적 디렉토리에 추가해 봅시다.
그런 다음 app.py
파일에서 update_data
라는 함수를 만들 것입니다. 이 함수는 전역 레이블 및 값 배열을 업데이트하기 위해 URL http://localhost:5001/updateData
를 통해 Spark에서 호출합니다.
또한 새로 업데이트된 레이블 및 값 배열을 JSON으로 반환하기 위해 AJAX 요청에 의해 호출되도록 refresh_graph_data
함수가 생성됩니다. get_chart_page
함수는 호출될 때 chart.html
페이지를 렌더링합니다.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
이제 실시간으로 해시태그 데이터를 표시하고 업데이트하기 위해 chart.html
파일에 간단한 차트를 만들어 보겠습니다. 아래 정의된 대로 Chart.js
및 jquery.min.js
JavaScript 라이브러리를 가져와야 합니다.
body 태그에서 다음 단계에서 JavaScript를 사용하여 차트를 표시하는 동안 참조하기 위해 캔버스를 만들고 ID를 부여해야 합니다.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
이제 아래의 JavaScript 코드를 사용하여 차트를 구성해 보겠습니다. 먼저 캔버스 요소를 가져온 다음 새 차트 개체를 만들고 캔버스 요소를 전달하고 아래와 같이 데이터 개체를 정의합니다.
데이터의 레이블과 데이터는 app.py
파일에서 get_chart_page
함수를 호출할 때 페이지를 렌더링하는 동안 반환되는 레이블 및 값 변수로 제한됩니다.
마지막으로 남은 부분은 매초마다 Ajax 요청을 수행하고 URL /refreshData
를 호출하도록 구성된 함수로, 이 함수는 app.py
에서 refresh_graph_data
를 실행하고 업데이트된 새 데이터를 반환한 다음 새 데이터를 렌더링하는 char를 업데이트합니다.
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
응용 프로그램을 함께 실행
아래 순서대로 3개의 애플리케이션을 실행해 봅시다. 1. Twitter App Client. 2. 스파크 앱. 3. 대시보드 웹 앱.
그런 다음 URL <http://localhost:5001/>
을 사용하여 실시간 대시보드에 액세스할 수 있습니다.
이제 아래와 같이 차트가 업데이트되는 것을 볼 수 있습니다.
Apache 스트리밍 실제 사용 사례
Spark Streaming을 사용하여 실시간으로 데이터에 대한 간단한 데이터 분석을 수행하고 RESTful 웹 서비스를 사용하여 간단한 대시보드와 직접 통합하는 방법을 배웠습니다. 이 예에서 우리는 Spark가 방대한 데이터 스트림을 캡처하고, 변환하고, 즉시 결정을 내리는 데 쉽게 사용할 수 있는 귀중한 통찰력을 추출하므로 얼마나 강력한지 알 수 있습니다. 구현될 수 있고 뉴스나 마케팅과 같은 다양한 산업에 서비스를 제공할 수 있는 유용한 사용 사례가 많이 있습니다.
뉴스 산업 사례
가장 자주 언급되는 해시태그를 추적하여 사람들이 소셜 미디어에서 가장 많이 이야기하는 주제를 파악할 수 있습니다. 또한 특정 해시태그와 해당 트윗을 추적하여 사람들이 전 세계의 특정 주제나 이벤트에 대해 말하는 내용을 알 수 있습니다.
마케팅 사례
우리는 트윗 스트림을 수집하고 감정 분석을 수행하여 이를 분류하고 사람들의 관심사를 파악하여 관심사와 관련된 제안을 대상으로 할 수 있습니다.
또한 빅 데이터 분석을 위해 특별히 적용할 수 있고 많은 산업 분야에 서비스를 제공할 수 있는 많은 사용 사례가 있습니다. 일반적으로 더 많은 Apache Spark 사용 사례를 보려면 이전 게시물 중 하나를 확인하는 것이 좋습니다.
여기에서 Spark Streaming의 기능에 대해 자세히 알아보고 이를 사용하여 실시간으로 더 많은 통찰력을 얻기 위해 데이터에 대한 고급 변환을 수행하는 것이 좋습니다.