บทช่วยสอนการสตรีม Apache Spark: การระบุ Twitter Hashtags ที่กำลังมาแรง
เผยแพร่แล้ว: 2022-03-11ปัจจุบันข้อมูลมีการเติบโตและสะสมได้เร็วกว่าที่เคย ปัจจุบันประมาณ 90% ของข้อมูลทั้งหมดที่สร้างขึ้นในโลกของเราสร้างขึ้นในช่วงสองปีที่ผ่านมาเท่านั้น เนื่องจากอัตราการเติบโตที่สั่นคลอนนี้ แพลตฟอร์มบิ๊กดาต้าจึงต้องนำโซลูชันที่รุนแรงมาใช้เพื่อรักษาปริมาณข้อมูลจำนวนมหาศาลดังกล่าว
หนึ่งในแหล่งข้อมูลหลักในปัจจุบันคือเครือข่ายสังคมออนไลน์ ให้ฉันได้สาธิตตัวอย่างในชีวิตจริง: การจัดการ วิเคราะห์ และดึงข้อมูลเชิงลึกจากข้อมูลเครือข่ายโซเชียลแบบเรียลไทม์โดยใช้หนึ่งในโซลูชันสะท้อนบิ๊กดาต้าที่สำคัญที่สุด — Apache Spark และ Python
ในบทความนี้ ฉันจะสอนวิธีสร้างแอปพลิเคชันง่ายๆ ที่อ่านสตรีมออนไลน์จาก Twitter โดยใช้ Python จากนั้นประมวลผลทวีตโดยใช้ Apache Spark Streaming เพื่อระบุแฮชแท็ก และสุดท้าย ส่งคืนแฮชแท็กยอดนิยมและแสดงข้อมูลนี้บนของจริง - แดชบอร์ดเวลา
การสร้างข้อมูลรับรองของคุณเองสำหรับ Twitter APIs
ในการรับทวีตจาก Twitter คุณต้องลงทะเบียนบน TwitterApps โดยคลิกที่ "สร้างแอปใหม่" จากนั้นกรอกแบบฟอร์มด้านล่าง คลิก "สร้างแอป Twitter ของคุณ"
ประการที่สอง ไปที่แอปที่สร้างขึ้นใหม่แล้วเปิดแท็บ "คีย์และโทเค็นการเข้าถึง" จากนั้นคลิกที่ "สร้างโทเค็นการเข้าถึงของฉัน"
โทเค็นการเข้าถึงใหม่ของคุณจะปรากฏดังนี้
และตอนนี้คุณก็พร้อมสำหรับขั้นตอนต่อไปแล้ว
การสร้างไคลเอนต์ Twitter HTTP
ในขั้นตอนนี้ ฉันจะแสดงวิธีสร้างไคลเอนต์อย่างง่ายที่จะรับทวีตจาก Twitter API โดยใช้ Python และส่งผ่านไปยังอินสแตนซ์ Spark Streaming ควรติดตามได้ง่ายสำหรับนักพัฒนา Python มืออาชีพ
ขั้นแรก มาสร้างไฟล์ชื่อ twitter_app.py
กันก่อน จากนั้นเราจะเพิ่มโค้ดลงไปตามด้านล่าง
นำเข้าไลบรารีที่เราจะใช้ดังต่อไปนี้:
import socket import sys import requests import requests_oauthlib import json
และเพิ่มตัวแปรที่จะใช้ใน OAuth เพื่อเชื่อมต่อกับ Twitter ดังนี้
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
ตอนนี้ เราจะสร้างฟังก์ชันใหม่ที่เรียกว่า get_tweets
ซึ่งจะเรียก URL ของ Twitter API และส่งคืนการตอบกลับสำหรับสตรีมทวีต
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
จากนั้น สร้างฟังก์ชันที่รับการตอบสนองจากฟังก์ชันด้านบน และแยกข้อความของทวีตออกจากออบเจ็กต์ JSON ของทวีตทั้งหมด หลังจากนั้นจะส่งทุกทวีตไปยังอินสแตนซ์ Spark Streaming (จะกล่าวถึงในภายหลัง) ผ่านการเชื่อมต่อ TCP
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
ตอนนี้ เราจะสร้างส่วนหลักซึ่งจะทำให้การเชื่อมต่อซ็อกเก็ตโฮสต์แอปที่จุดประกายเชื่อมต่อด้วย เราจะกำหนดค่า IP ที่นี่ให้เป็น localhost
เนื่องจากทั้งหมดจะทำงานบนเครื่องเดียวกันและพอร์ต 9009
จากนั้นเราจะเรียกเมธอด get_tweets
ซึ่งเราทำไว้ด้านบน เพื่อรับทวีตจาก Twitter และส่งการตอบกลับพร้อมกับการเชื่อมต่อซ็อกเก็ตไปยัง send_tweets_to_spark
เพื่อส่งทวีตไปยัง Spark
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
การตั้งค่าแอปพลิเคชันสตรีมมิ่ง Apache Spark ของเรา
มาสร้างแอปสตรีมมิ่ง Spark ของเรากันเถอะ ซึ่งจะประมวลผลแบบเรียลไทม์สำหรับทวีตที่เข้ามา แยกแฮชแท็กออกจากพวกเขา และคำนวณจำนวนแฮชแท็กที่มีการกล่าวถึง
อันดับแรก เราต้องสร้างอินสแตนซ์ของ Spark Context sc
จากนั้นเราสร้าง Streaming Context ssc
จาก sc
โดยมีช่วงแบตช์สองวินาทีที่จะทำการแปลงบนสตรีมทั้งหมดที่ได้รับทุกๆ สองวินาที ขอให้สังเกตว่าเราได้ตั้งค่าระดับบันทึกเป็น ERROR
เพื่อปิดการใช้งานบันทึกส่วนใหญ่ที่ Spark เขียน
เรากำหนดจุดตรวจที่นี่เพื่ออนุญาตให้มีจุดตรวจ RDD เป็นระยะ สิ่งนี้จำเป็นสำหรับใช้ในแอปของเรา เนื่องจากเราจะใช้การแปลงแบบเก็บสถานะ (จะกล่าวถึงในภายหลังในหัวข้อเดียวกัน)
จากนั้นเรากำหนด DStream dataStream หลักของเราที่จะเชื่อมต่อกับซ็อกเก็ตเซิร์ฟเวอร์ที่เราสร้างขึ้นก่อนหน้านี้บนพอร์ต 9009
และอ่านทวีตจากพอร์ตนั้น แต่ละบันทึกใน DStream จะเป็นทวีต
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
ตอนนี้ เราจะกำหนดตรรกะการแปลงของเรา อันดับแรก เราจะแบ่งทวีตทั้งหมดเป็นคำและใส่เป็นคำ RDD จากนั้นเราจะกรองเฉพาะแฮชแท็กจากทุกคำและจับคู่เป็นคู่ (hashtag, 1)
และใส่ลงในแฮชแท็ก RDD
จากนั้นเราต้องคำนวณว่ามีการกล่าวถึงแฮชแท็กกี่ครั้ง เราสามารถทำได้โดยใช้ฟังก์ชัน reduceByKey
ฟังก์ชันนี้จะคำนวณจำนวนครั้งที่มีการกล่าวถึงแฮชแท็กต่อแต่ละกลุ่ม กล่าวคือจะรีเซ็ตการนับในแต่ละกลุ่ม
ในกรณีของเรา เราจำเป็นต้องคำนวณจำนวนในแบตช์ทั้งหมด ดังนั้นเราจะใช้ฟังก์ชันอื่นที่เรียกว่า updateStateByKey
เนื่องจากฟังก์ชันนี้ช่วยให้คุณสามารถรักษาสถานะของ RDD ในขณะที่อัปเดตด้วยข้อมูลใหม่ วิธีนี้เรียกว่า Stateful Transformation
โปรดทราบว่าในการใช้ updateStateByKey
คุณต้องกำหนดค่าจุดตรวจ และสิ่งที่เราได้ทำในขั้นตอนก่อนหน้านี้
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
ใช้ฟังก์ชันเป็นพารามิเตอร์ที่เรียกว่าฟังก์ชัน update
มันทำงานบนแต่ละรายการใน RDD และทำตรรกะที่ต้องการ
ในกรณีของเรา เราได้สร้างฟังก์ชันอัปเดตที่เรียกว่า aggregate_tags_count
ซึ่งจะรวมค่า new_values
ทั้งหมดสำหรับแฮชแท็กแต่ละรายการ และเพิ่มลงใน total_sum
ซึ่งเป็นผลรวมของแบตช์ทั้งหมด และบันทึกข้อมูลลงใน tags_totals
RDD
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
จากนั้นเราทำการประมวลผลบน tags_totals
RDD ในทุกแบตช์เพื่อแปลงเป็นตารางชั่วคราวโดยใช้บริบท SQL ของ Spark จากนั้นดำเนินการคำสั่ง Select เพื่อดึงแฮชแท็กสิบอันดับแรกที่มีการนับและใส่ลงในกรอบข้อมูล hashtag_counts_df
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
ขั้นตอนสุดท้ายในแอปพลิเคชัน Spark ของเราคือส่งกรอบข้อมูล hashtag_counts_df
ไปยังแอปพลิเคชันแดชบอร์ด ดังนั้น เราจะแปลง data frame เป็นสองอาร์เรย์ อันหนึ่งสำหรับแฮชแท็ก และอีกอันสำหรับการนับ จากนั้นเราจะส่งไปยังแอปพลิเคชันแดชบอร์ดผ่าน REST API

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
สุดท้าย นี่คือตัวอย่างผลลัพธ์ของ Spark Streaming ขณะทำงานและพิมพ์ hashtag_counts_df
คุณจะสังเกตเห็นว่าเอาต์พุตถูกพิมพ์ทุกสองวินาทีตามช่วงเวลาของแบทช์
สร้างแดชบอร์ดแบบเรียลไทม์อย่างง่ายสำหรับเป็นตัวแทนของข้อมูล
ตอนนี้ เราจะสร้างแอปพลิเคชันแดชบอร์ดอย่างง่ายที่จะอัปเดตตามเวลาจริงโดย Spark เราจะสร้างโดยใช้ Python, Flask และ Charts.js
ขั้นแรก ให้สร้างโปรเจ็กต์ Python ที่มีโครงสร้างดังแสดงด้านล่าง แล้วดาวน์โหลดและเพิ่มไฟล์ Chart.js ลงในไดเร็กทอรีสแตติก
จากนั้นในไฟล์ app.py
เราจะสร้างฟังก์ชันชื่อ update_data
ซึ่ง Spark จะเรียกผ่าน URL http://localhost:5001/updateData
เพื่ออัปเดต Global label และอาร์เรย์ค่า
นอกจากนี้ ฟังก์ชัน refresh_graph_data
ยังถูกสร้างให้เรียกใช้โดยคำขอ AJAX เพื่อส่งคืนป้ายกำกับและอาร์เรย์ค่าที่อัปเดตใหม่เป็น JSON ฟังก์ชัน get_chart_page
จะแสดงหน้า chart.html
เมื่อเรียกใช้
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
ตอนนี้ มาสร้างแผนภูมิอย่างง่ายในไฟล์ chart.html
เพื่อแสดงข้อมูลแฮชแท็กและอัปเดตตามเวลาจริง ตามที่กำหนดไว้ด้านล่าง เราจำเป็นต้องนำเข้าไลบรารี JavaScript Chart.js
และ jquery.min.js
ในแท็กเนื้อหา เราต้องสร้างแคนวาสและระบุ ID เพื่ออ้างอิงขณะแสดงแผนภูมิโดยใช้ JavaScript ในขั้นตอนต่อไป
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
ตอนนี้ มาสร้างแผนภูมิโดยใช้โค้ด JavaScript ด้านล่างกัน ขั้นแรก เราได้รับองค์ประกอบผ้าใบ จากนั้นเราสร้างวัตถุแผนภูมิใหม่ และส่งองค์ประกอบผ้าใบไปยังองค์ประกอบนั้น และกำหนดวัตถุข้อมูลดังนี้
โปรดทราบว่าป้ายกำกับและข้อมูลถูกผูกไว้ด้วยตัวแปรป้ายกำกับและค่าที่ส่งคืนขณะแสดงหน้าเว็บเมื่อเรียกใช้ฟังก์ชัน get_chart_page
ในไฟล์ app.py
ส่วนสุดท้ายที่เหลือคือฟังก์ชันที่ได้รับการกำหนดค่าให้ทำคำขอ Ajax ทุก ๆ วินาทีและเรียก URL /refreshData
ซึ่งจะดำเนินการ refresh_graph_data
ใน app.py
และส่งคืนข้อมูลที่อัปเดตใหม่ จากนั้นอัปเดตถ่านที่แสดงข้อมูลใหม่
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
เรียกใช้แอพพลิเคชั่นด้วยกัน
เรียกใช้สามแอปพลิเคชันตามลำดับด้านล่าง: 1. Twitter App Client 2. แอพจุดประกาย 3. เว็บแอพแดชบอร์ด
จากนั้นคุณสามารถเข้าถึงแดชบอร์ดแบบเรียลไทม์โดยใช้ URL <http://localhost:5001/>
ตอนนี้คุณสามารถดูแผนภูมิของคุณได้รับการอัปเดตดังนี้:
กรณีการใช้งานจริงสตรีมมิ่ง Apache
เราได้เรียนรู้วิธีวิเคราะห์ข้อมูลอย่างง่ายในข้อมูลแบบเรียลไทม์โดยใช้ Spark Streaming และผสานเข้ากับแดชบอร์ดอย่างง่ายโดยตรงโดยใช้บริการเว็บ RESTful จากตัวอย่างนี้ เราจะเห็นได้ว่า Spark นั้นทรงพลังเพียงใด เมื่อมันดักจับกระแสข้อมูลจำนวนมหาศาล แปลงมัน และดึงข้อมูลเชิงลึกอันมีค่าที่สามารถใช้ได้อย่างง่ายดายในการตัดสินใจในเวลาไม่นาน มีกรณีการใช้งานที่เป็นประโยชน์มากมายที่สามารถนำไปใช้ได้และสามารถให้บริการแก่อุตสาหกรรมต่างๆ เช่น ข่าวสารหรือการตลาด
ตัวอย่างวงการข่าว
เราสามารถติดตามแฮชแท็กที่กล่าวถึงบ่อยที่สุดเพื่อทราบว่าหัวข้อใดที่ผู้คนพูดถึงมากที่สุดบนโซเชียลมีเดีย นอกจากนี้ เราสามารถติดตามแฮชแท็กเฉพาะและทวีตของพวกเขา เพื่อที่จะรู้ว่าผู้คนพูดถึงหัวข้อหรือเหตุการณ์เฉพาะในโลกว่าอย่างไร
ตัวอย่างการตลาด
เราสามารถรวบรวมกระแสของทวีตและโดยการวิเคราะห์ความรู้สึก จัดหมวดหมู่และกำหนดความสนใจของผู้คนเพื่อกำหนดเป้าหมายพวกเขาด้วยข้อเสนอที่เกี่ยวข้องกับความสนใจของพวกเขา
นอกจากนี้ยังมีกรณีการใช้งานจำนวนมากที่สามารถนำไปใช้กับการวิเคราะห์ข้อมูลขนาดใหญ่โดยเฉพาะ และสามารถรองรับอุตสาหกรรมจำนวนมากได้ สำหรับกรณีการใช้งาน Apache Spark เพิ่มเติมโดยทั่วไป ฉันแนะนำให้คุณตรวจสอบหนึ่งในโพสต์ก่อนหน้าของเรา
ฉันแนะนำให้คุณอ่านเพิ่มเติมเกี่ยวกับ Spark Streaming จากที่นี่ เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับความสามารถของมัน และทำการแปลงข้อมูลขั้นสูงเพิ่มเติมสำหรับข้อมูลเชิงลึกแบบเรียลไทม์โดยใช้มัน