บทช่วยสอนการสตรีม Apache Spark: การระบุ Twitter Hashtags ที่กำลังมาแรง

เผยแพร่แล้ว: 2022-03-11

ปัจจุบันข้อมูลมีการเติบโตและสะสมได้เร็วกว่าที่เคย ปัจจุบันประมาณ 90% ของข้อมูลทั้งหมดที่สร้างขึ้นในโลกของเราสร้างขึ้นในช่วงสองปีที่ผ่านมาเท่านั้น เนื่องจากอัตราการเติบโตที่สั่นคลอนนี้ แพลตฟอร์มบิ๊กดาต้าจึงต้องนำโซลูชันที่รุนแรงมาใช้เพื่อรักษาปริมาณข้อมูลจำนวนมหาศาลดังกล่าว

หนึ่งในแหล่งข้อมูลหลักในปัจจุบันคือเครือข่ายสังคมออนไลน์ ให้ฉันได้สาธิตตัวอย่างในชีวิตจริง: การจัดการ วิเคราะห์ และดึงข้อมูลเชิงลึกจากข้อมูลเครือข่ายโซเชียลแบบเรียลไทม์โดยใช้หนึ่งในโซลูชันสะท้อนบิ๊กดาต้าที่สำคัญที่สุด — Apache Spark และ Python

Apache Spark Streaming สามารถใช้เพื่อดึงข้อมูลเชิงลึกจากโซเชียลมีเดีย เช่น แฮชแท็ก Twitter ที่กำลังเป็นที่นิยม

ในบทความนี้ ฉันจะสอนวิธีสร้างแอปพลิเคชันง่ายๆ ที่อ่านสตรีมออนไลน์จาก Twitter โดยใช้ Python จากนั้นประมวลผลทวีตโดยใช้ Apache Spark Streaming เพื่อระบุแฮชแท็ก และสุดท้าย ส่งคืนแฮชแท็กยอดนิยมและแสดงข้อมูลนี้บนของจริง - แดชบอร์ดเวลา

การสร้างข้อมูลรับรองของคุณเองสำหรับ Twitter APIs

ในการรับทวีตจาก Twitter คุณต้องลงทะเบียนบน TwitterApps โดยคลิกที่ "สร้างแอปใหม่" จากนั้นกรอกแบบฟอร์มด้านล่าง คลิก "สร้างแอป Twitter ของคุณ"

สกรีนช็อต: วิธีสร้างแอพ Twitter ของคุณ

ประการที่สอง ไปที่แอปที่สร้างขึ้นใหม่แล้วเปิดแท็บ "คีย์และโทเค็นการเข้าถึง" จากนั้นคลิกที่ "สร้างโทเค็นการเข้าถึงของฉัน"

สกรีนช็อต: การตั้งค่าข้อมูลประจำตัวของแอพ Twitter คีย์และโทเค็นการเข้าถึง

โทเค็นการเข้าถึงใหม่ของคุณจะปรากฏดังนี้

สกรีนช็อต: การตั้งค่าโทเค็นการเข้าถึงแอพ Twitter

และตอนนี้คุณก็พร้อมสำหรับขั้นตอนต่อไปแล้ว

การสร้างไคลเอนต์ Twitter HTTP

ในขั้นตอนนี้ ฉันจะแสดงวิธีสร้างไคลเอนต์อย่างง่ายที่จะรับทวีตจาก Twitter API โดยใช้ Python และส่งผ่านไปยังอินสแตนซ์ Spark Streaming ควรติดตามได้ง่ายสำหรับนักพัฒนา Python มืออาชีพ

ขั้นแรก มาสร้างไฟล์ชื่อ twitter_app.py กันก่อน จากนั้นเราจะเพิ่มโค้ดลงไปตามด้านล่าง

นำเข้าไลบรารีที่เราจะใช้ดังต่อไปนี้:

 import socket import sys import requests import requests_oauthlib import json

และเพิ่มตัวแปรที่จะใช้ใน OAuth เพื่อเชื่อมต่อกับ Twitter ดังนี้

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

ตอนนี้ เราจะสร้างฟังก์ชันใหม่ที่เรียกว่า get_tweets ซึ่งจะเรียก URL ของ Twitter API และส่งคืนการตอบกลับสำหรับสตรีมทวีต

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

จากนั้น สร้างฟังก์ชันที่รับการตอบสนองจากฟังก์ชันด้านบน และแยกข้อความของทวีตออกจากออบเจ็กต์ JSON ของทวีตทั้งหมด หลังจากนั้นจะส่งทุกทวีตไปยังอินสแตนซ์ Spark Streaming (จะกล่าวถึงในภายหลัง) ผ่านการเชื่อมต่อ TCP

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

ตอนนี้ เราจะสร้างส่วนหลักซึ่งจะทำให้การเชื่อมต่อซ็อกเก็ตโฮสต์แอปที่จุดประกายเชื่อมต่อด้วย เราจะกำหนดค่า IP ที่นี่ให้เป็น localhost เนื่องจากทั้งหมดจะทำงานบนเครื่องเดียวกันและพอร์ต 9009 จากนั้นเราจะเรียกเมธอด get_tweets ซึ่งเราทำไว้ด้านบน เพื่อรับทวีตจาก Twitter และส่งการตอบกลับพร้อมกับการเชื่อมต่อซ็อกเก็ตไปยัง send_tweets_to_spark เพื่อส่งทวีตไปยัง Spark

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

การตั้งค่าแอปพลิเคชันสตรีมมิ่ง Apache Spark ของเรา

มาสร้างแอปสตรีมมิ่ง Spark ของเรากันเถอะ ซึ่งจะประมวลผลแบบเรียลไทม์สำหรับทวีตที่เข้ามา แยกแฮชแท็กออกจากพวกเขา และคำนวณจำนวนแฮชแท็กที่มีการกล่าวถึง

ภาพประกอบ: การสตรีม Spark ช่วยให้สามารถประมวลผลทวีตขาเข้าและการแยกแฮชแท็กแบบเรียลไทม์ได้

อันดับแรก เราต้องสร้างอินสแตนซ์ของ Spark Context sc จากนั้นเราสร้าง Streaming Context ssc จาก sc โดยมีช่วงแบตช์สองวินาทีที่จะทำการแปลงบนสตรีมทั้งหมดที่ได้รับทุกๆ สองวินาที ขอให้สังเกตว่าเราได้ตั้งค่าระดับบันทึกเป็น ERROR เพื่อปิดการใช้งานบันทึกส่วนใหญ่ที่ Spark เขียน

เรากำหนดจุดตรวจที่นี่เพื่ออนุญาตให้มีจุดตรวจ RDD เป็นระยะ สิ่งนี้จำเป็นสำหรับใช้ในแอปของเรา เนื่องจากเราจะใช้การแปลงแบบเก็บสถานะ (จะกล่าวถึงในภายหลังในหัวข้อเดียวกัน)

จากนั้นเรากำหนด DStream dataStream หลักของเราที่จะเชื่อมต่อกับซ็อกเก็ตเซิร์ฟเวอร์ที่เราสร้างขึ้นก่อนหน้านี้บนพอร์ต 9009 และอ่านทวีตจากพอร์ตนั้น แต่ละบันทึกใน DStream จะเป็นทวีต

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

ตอนนี้ เราจะกำหนดตรรกะการแปลงของเรา อันดับแรก เราจะแบ่งทวีตทั้งหมดเป็นคำและใส่เป็นคำ RDD จากนั้นเราจะกรองเฉพาะแฮชแท็กจากทุกคำและจับคู่เป็นคู่ (hashtag, 1) และใส่ลงในแฮชแท็ก RDD

จากนั้นเราต้องคำนวณว่ามีการกล่าวถึงแฮชแท็กกี่ครั้ง เราสามารถทำได้โดยใช้ฟังก์ชัน reduceByKey ฟังก์ชันนี้จะคำนวณจำนวนครั้งที่มีการกล่าวถึงแฮชแท็กต่อแต่ละกลุ่ม กล่าวคือจะรีเซ็ตการนับในแต่ละกลุ่ม

ในกรณีของเรา เราจำเป็นต้องคำนวณจำนวนในแบตช์ทั้งหมด ดังนั้นเราจะใช้ฟังก์ชันอื่นที่เรียกว่า updateStateByKey เนื่องจากฟังก์ชันนี้ช่วยให้คุณสามารถรักษาสถานะของ RDD ในขณะที่อัปเดตด้วยข้อมูลใหม่ วิธีนี้เรียกว่า Stateful Transformation

โปรดทราบว่าในการใช้ updateStateByKey คุณต้องกำหนดค่าจุดตรวจ และสิ่งที่เราได้ทำในขั้นตอนก่อนหน้านี้

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey ใช้ฟังก์ชันเป็นพารามิเตอร์ที่เรียกว่าฟังก์ชัน update มันทำงานบนแต่ละรายการใน RDD และทำตรรกะที่ต้องการ

ในกรณีของเรา เราได้สร้างฟังก์ชันอัปเดตที่เรียกว่า aggregate_tags_count ซึ่งจะรวมค่า new_values ​​ทั้งหมดสำหรับแฮชแท็กแต่ละรายการ และเพิ่มลงใน total_sum ซึ่งเป็นผลรวมของแบตช์ทั้งหมด และบันทึกข้อมูลลงใน tags_totals RDD

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

จากนั้นเราทำการประมวลผลบน tags_totals RDD ในทุกแบตช์เพื่อแปลงเป็นตารางชั่วคราวโดยใช้บริบท SQL ของ Spark จากนั้นดำเนินการคำสั่ง Select เพื่อดึงแฮชแท็กสิบอันดับแรกที่มีการนับและใส่ลงในกรอบข้อมูล hashtag_counts_df

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

ขั้นตอนสุดท้ายในแอปพลิเคชัน Spark ของเราคือส่งกรอบข้อมูล hashtag_counts_df ไปยังแอปพลิเคชันแดชบอร์ด ดังนั้น เราจะแปลง data frame เป็นสองอาร์เรย์ อันหนึ่งสำหรับแฮชแท็ก และอีกอันสำหรับการนับ จากนั้นเราจะส่งไปยังแอปพลิเคชันแดชบอร์ดผ่าน REST API

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

สุดท้าย นี่คือตัวอย่างผลลัพธ์ของ Spark Streaming ขณะทำงานและพิมพ์ hashtag_counts_df คุณจะสังเกตเห็นว่าเอาต์พุตถูกพิมพ์ทุกสองวินาทีตามช่วงเวลาของแบทช์

ตัวอย่างเอาต์พุตการสตรีม Twitter Spark ที่พิมพ์ตามการตั้งค่าช่วงแบตช์

สร้างแดชบอร์ดแบบเรียลไทม์อย่างง่ายสำหรับเป็นตัวแทนของข้อมูล

ตอนนี้ เราจะสร้างแอปพลิเคชันแดชบอร์ดอย่างง่ายที่จะอัปเดตตามเวลาจริงโดย Spark เราจะสร้างโดยใช้ Python, Flask และ Charts.js

ขั้นแรก ให้สร้างโปรเจ็กต์ Python ที่มีโครงสร้างดังแสดงด้านล่าง แล้วดาวน์โหลดและเพิ่มไฟล์ Chart.js ลงในไดเร็กทอรีสแตติก

ภาพประกอบ: การสร้างโครงการ Python สำหรับใช้ในการวิเคราะห์แฮชแท็ก Twitter

จากนั้นในไฟล์ app.py เราจะสร้างฟังก์ชันชื่อ update_data ซึ่ง Spark จะเรียกผ่าน URL http://localhost:5001/updateData เพื่ออัปเดต Global label และอาร์เรย์ค่า

นอกจากนี้ ฟังก์ชัน refresh_graph_data ยังถูกสร้างให้เรียกใช้โดยคำขอ AJAX เพื่อส่งคืนป้ายกำกับและอาร์เรย์ค่าที่อัปเดตใหม่เป็น JSON ฟังก์ชัน get_chart_page จะแสดงหน้า chart.html เมื่อเรียกใช้

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

ตอนนี้ มาสร้างแผนภูมิอย่างง่ายในไฟล์ chart.html เพื่อแสดงข้อมูลแฮชแท็กและอัปเดตตามเวลาจริง ตามที่กำหนดไว้ด้านล่าง เราจำเป็นต้องนำเข้าไลบรารี JavaScript Chart.js และ jquery.min.js

ในแท็กเนื้อหา เราต้องสร้างแคนวาสและระบุ ID เพื่ออ้างอิงขณะแสดงแผนภูมิโดยใช้ JavaScript ในขั้นตอนต่อไป

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

ตอนนี้ มาสร้างแผนภูมิโดยใช้โค้ด JavaScript ด้านล่างกัน ขั้นแรก เราได้รับองค์ประกอบผ้าใบ จากนั้นเราสร้างวัตถุแผนภูมิใหม่ และส่งองค์ประกอบผ้าใบไปยังองค์ประกอบนั้น และกำหนดวัตถุข้อมูลดังนี้

โปรดทราบว่าป้ายกำกับและข้อมูลถูกผูกไว้ด้วยตัวแปรป้ายกำกับและค่าที่ส่งคืนขณะแสดงหน้าเว็บเมื่อเรียกใช้ฟังก์ชัน get_chart_page ในไฟล์ app.py

ส่วนสุดท้ายที่เหลือคือฟังก์ชันที่ได้รับการกำหนดค่าให้ทำคำขอ Ajax ทุก ๆ วินาทีและเรียก URL /refreshData ซึ่งจะดำเนินการ refresh_graph_data ใน app.py และส่งคืนข้อมูลที่อัปเดตใหม่ จากนั้นอัปเดตถ่านที่แสดงข้อมูลใหม่

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

เรียกใช้แอพพลิเคชั่นด้วยกัน

เรียกใช้สามแอปพลิเคชันตามลำดับด้านล่าง: 1. Twitter App Client 2. แอพจุดประกาย 3. เว็บแอพแดชบอร์ด

จากนั้นคุณสามารถเข้าถึงแดชบอร์ดแบบเรียลไทม์โดยใช้ URL <http://localhost:5001/>

ตอนนี้คุณสามารถดูแผนภูมิของคุณได้รับการอัปเดตดังนี้:

แอนิเมชั่น: แผนภูมิแฮชแท็กแนวโน้ม Twitter แบบเรียลไทม์

กรณีการใช้งานจริงสตรีมมิ่ง Apache

เราได้เรียนรู้วิธีวิเคราะห์ข้อมูลอย่างง่ายในข้อมูลแบบเรียลไทม์โดยใช้ Spark Streaming และผสานเข้ากับแดชบอร์ดอย่างง่ายโดยตรงโดยใช้บริการเว็บ RESTful จากตัวอย่างนี้ เราจะเห็นได้ว่า Spark นั้นทรงพลังเพียงใด เมื่อมันดักจับกระแสข้อมูลจำนวนมหาศาล แปลงมัน และดึงข้อมูลเชิงลึกอันมีค่าที่สามารถใช้ได้อย่างง่ายดายในการตัดสินใจในเวลาไม่นาน มีกรณีการใช้งานที่เป็นประโยชน์มากมายที่สามารถนำไปใช้ได้และสามารถให้บริการแก่อุตสาหกรรมต่างๆ เช่น ข่าวสารหรือการตลาด

ภาพประกอบ: สามารถใช้แฮชแท็กเพื่อดึงข้อมูลเชิงลึกและความรู้สึกที่มีค่า ซึ่งนำไปใช้ได้ในหลายอุตสาหกรรม

ตัวอย่างวงการข่าว

เราสามารถติดตามแฮชแท็กที่กล่าวถึงบ่อยที่สุดเพื่อทราบว่าหัวข้อใดที่ผู้คนพูดถึงมากที่สุดบนโซเชียลมีเดีย นอกจากนี้ เราสามารถติดตามแฮชแท็กเฉพาะและทวีตของพวกเขา เพื่อที่จะรู้ว่าผู้คนพูดถึงหัวข้อหรือเหตุการณ์เฉพาะในโลกว่าอย่างไร

ตัวอย่างการตลาด

เราสามารถรวบรวมกระแสของทวีตและโดยการวิเคราะห์ความรู้สึก จัดหมวดหมู่และกำหนดความสนใจของผู้คนเพื่อกำหนดเป้าหมายพวกเขาด้วยข้อเสนอที่เกี่ยวข้องกับความสนใจของพวกเขา

นอกจากนี้ยังมีกรณีการใช้งานจำนวนมากที่สามารถนำไปใช้กับการวิเคราะห์ข้อมูลขนาดใหญ่โดยเฉพาะ และสามารถรองรับอุตสาหกรรมจำนวนมากได้ สำหรับกรณีการใช้งาน Apache Spark เพิ่มเติมโดยทั่วไป ฉันแนะนำให้คุณตรวจสอบหนึ่งในโพสต์ก่อนหน้าของเรา

ฉันแนะนำให้คุณอ่านเพิ่มเติมเกี่ยวกับ Spark Streaming จากที่นี่ เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับความสามารถของมัน และทำการแปลงข้อมูลขั้นสูงเพิ่มเติมสำหรับข้อมูลเชิงลึกแบบเรียลไทม์โดยใช้มัน