برنامج Apache Spark Streaming التعليمي: تحديد علامات التصنيف الشائعة على Twitter
نشرت: 2022-03-11في الوقت الحاضر ، تنمو البيانات وتتراكم بشكل أسرع من أي وقت مضى. حاليًا ، تم إنشاء حوالي 90٪ من جميع البيانات التي تم إنشاؤها في عالمنا فقط في العامين الماضيين. بسبب معدل النمو المذهل هذا ، كان على منصات البيانات الضخمة اعتماد حلول جذرية من أجل الحفاظ على مثل هذه الكميات الضخمة من البيانات.
الشبكات الاجتماعية هي أحد المصادر الرئيسية للبيانات اليوم. اسمح لي بعرض مثال واقعي: التعامل مع بيانات الشبكات الاجتماعية وتحليلها واستخراجها في الوقت الفعلي باستخدام أحد أهم حلول صدى البيانات الضخمة المتوفرة - Apache Spark و Python.
في هذه المقالة ، سأعلمك كيفية إنشاء تطبيق بسيط يقرأ التدفقات عبر الإنترنت من Twitter باستخدام Python ، ثم يعالج التغريدات باستخدام Apache Spark Streaming لتحديد علامات التجزئة ، وأخيرًا ، يُرجع علامات التصنيف الأكثر شيوعًا ويمثل هذه البيانات على حقيقي لوحة عدادات الوقت.
إنشاء بيانات الاعتماد الخاصة بك لواجهات برمجة تطبيقات Twitter
للحصول على تغريدات من Twitter ، تحتاج إلى التسجيل في TwitterApps من خلال النقر على "إنشاء تطبيق جديد" ثم ملء النموذج أدناه ، انقر فوق "إنشاء تطبيق Twitter الخاص بك".
ثانيًا ، انتقل إلى التطبيق الذي تم إنشاؤه حديثًا وافتح علامة التبويب "المفاتيح ورموز الوصول". ثم انقر فوق "إنشاء رمز الوصول الخاص بي".
سوف تظهر رموز الوصول الجديدة الخاصة بك على النحو التالي.
والآن أنت جاهز للخطوة التالية.
بناء عميل Twitter HTTP
في هذه الخطوة ، سأوضح لك كيفية إنشاء عميل بسيط يحصل على التغريدات من Twitter API باستخدام Python ويمررها إلى مثيل Spark Streaming. يجب أن يكون من السهل متابعته لأي مطور بايثون محترف.
أولاً ، دعنا ننشئ ملفًا يسمى twitter_app.py
ثم نضيف الكود فيه معًا على النحو التالي.
قم باستيراد المكتبات التي سنستخدمها على النحو التالي:
import socket import sys import requests import requests_oauthlib import json
وأضف المتغيرات التي سيتم استخدامها في OAuth للاتصال بتويتر على النحو التالي:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
الآن ، سننشئ وظيفة جديدة تسمى get_tweets
تستدعي عنوان URL الخاص بواجهة برمجة تطبيقات Twitter وتعيد الاستجابة لتيار من التغريدات.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
بعد ذلك ، أنشئ وظيفة تأخذ الاستجابة من أعلاه وتستخرج نص التغريدات من كائن JSON للتغريدات بأكمله. بعد ذلك ، يرسل كل تغريدة إلى مثيل Spark Streaming (ستتم مناقشته لاحقًا) من خلال اتصال TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)
الآن ، سنصنع الجزء الرئيسي الذي سيجعل اتصالات مقبس مضيف التطبيق التي ستتصل بها شرارة. سنقوم بتهيئة عنوان IP هنا ليكون مضيفًا localhost
حيث سيتم تشغيل الكل على نفس الجهاز والمنفذ 9009
. ثم get_tweets
طريقة get_tweets ، التي قمنا بها أعلاه ، للحصول على التغريدات من Twitter وتمرير ردها جنبًا إلى جنب مع اتصال المقبس إلى send_tweets_to_spark
لإرسال التغريدات إلى Spark.
TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)
إعداد تطبيق دفق Apache Spark الخاص بنا
لنقم ببناء تطبيق Spark المتدفق الذي سيقوم بمعالجة التغريدات الواردة في الوقت الفعلي ، واستخراج علامات التجزئة منها ، وحساب عدد علامات التجزئة التي تم ذكرها.
أولاً ، يتعين علينا إنشاء مثيل لـ Spark Context sc
، ثم أنشأنا Streaming Context ssc
من sc
مع فاصل زمني للدفعات ثانيتين من شأنه إجراء التحويل على جميع التدفقات المستلمة كل ثانيتين. لاحظ أننا قمنا بتعيين مستوى السجل على ERROR
لتعطيل معظم السجلات التي يكتبها Spark.
لقد حددنا نقطة تفتيش هنا للسماح بنقاط تفتيش دورية RDD ؛ هذا أمر إلزامي لاستخدامه في تطبيقنا ، حيث سنستخدم تحويلات ذات حالة (ستتم مناقشتها لاحقًا في نفس القسم).
ثم نحدد بيانات DStream الرئيسية الخاصة بنا والتي ستتصل بخادم المقبس الذي أنشأناه من قبل على المنفذ 9009
ونقرأ التغريدات من هذا المنفذ. سيكون كل سجل في DStream تغريدة.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)
الآن ، سنحدد منطق التحول الخاص بنا. أولاً سنقسم كل التغريدات إلى كلمات ونضعها في كلمات RDD. ثم سنقوم بتصفية علامات التصنيف فقط من جميع الكلمات ونقوم بتعيينها على زوج من (hashtag, 1)
ووضعها في علامات التجزئة RDD.
ثم نحتاج إلى حساب عدد المرات التي تم فيها ذكر علامة التصنيف. يمكننا القيام بذلك عن طريق استخدام وظيفة reduceByKey
. ستحسب هذه الوظيفة عدد المرات التي تم فيها ذكر علامة التصنيف لكل دفعة ، أي ستعيد تعيين الأعداد في كل دفعة.
في حالتنا ، نحتاج إلى حساب الأعداد عبر جميع الدُفعات ، لذلك سنستخدم وظيفة أخرى تسمى updateStateByKey
، حيث تتيح لك هذه الوظيفة الحفاظ على حالة RDD أثناء تحديثها ببيانات جديدة. هذه الطريقة تسمى Stateful Transformation
ذو الحالة.
لاحظ أنه من أجل استخدام updateStateByKey
، يجب عليك تكوين نقطة تفتيش ، وهذا ما فعلناه في الخطوة السابقة.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
تأخذ updateStateByKey
وظيفة كمعامل يسمى وظيفة update
. يتم تشغيله على كل عنصر في RDD ويقوم بالمنطق المطلوب.
في حالتنا ، أنشأنا وظيفة تحديث تسمى aggregate_tags_count
والتي ستجمع كل tags_totals
total_sum
new_values
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
ثم نقوم بالمعالجة على tags_totals
RDD في كل دفعة من أجل تحويلها إلى جدول مؤقت باستخدام Spark SQL Context ثم إجراء عبارة تحديد لاسترداد علامات التجزئة العشرة الأولى مع أعدادها ووضعها في إطار بيانات hashtag_counts_df
.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)
تتمثل الخطوة الأخيرة في تطبيق Spark في إرسال إطار بيانات hashtag_counts_df
إلى تطبيق لوحة القيادة. لذلك سنقوم بتحويل إطار البيانات إلى مصفوفتين ، واحدة لعلامات التصنيف والأخرى لتعدادها. ثم سنرسلهم إلى تطبيق لوحة القيادة من خلال واجهة برمجة تطبيقات REST.

def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
أخيرًا ، إليك عينة من إخراج Spark Streaming أثناء تشغيل وطباعة hashtag_counts_df
، ستلاحظ أن الإخراج تتم طباعته كل ثانيتين بالضبط حسب فترات الدُفعات.
قم بإنشاء لوحة تحكم بسيطة في الوقت الفعلي لتمثيل البيانات
الآن ، سننشئ تطبيق لوحة معلومات بسيطًا سيتم تحديثه في الوقت الفعلي بواسطة Spark. سنقوم ببنائه باستخدام Python و Flask و Charts.js.
أولاً ، لنقم بإنشاء مشروع Python بالبنية الموضحة أدناه وقم بتنزيل ملف Chart.js وإضافته إلى الدليل الثابت.
بعد ذلك ، في ملف app.py
، سننشئ وظيفة تسمى update_data
، والتي سيتم استدعاؤها بواسطة Spark من خلال عنوان URL http://localhost:5001/updateData
لتحديث مصفوفات القيم والتسميات العالمية.
أيضًا ، يتم إنشاء وظيفة refresh_graph_data
ليتم استدعاؤها من خلال طلب AJAX لإرجاع مجموعات العلامات والقيم المحدثة الجديدة مثل JSON. ستعرض الوظيفة get_chart_page
صفحة chart.html
عند استدعائها.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)
الآن ، لنقم بإنشاء مخطط بسيط في ملف chart.html
لعرض بيانات الهاشتاج وتحديثها في الوقت الفعلي. كما هو موضح أدناه ، نحتاج إلى استيراد مكتبات JavaScript و Chart.js
و jquery.min.js
.
في علامة النص الأساسي ، يتعين علينا إنشاء لوحة قماشية وإعطائها معرفًا للإشارة إليها أثناء عرض المخطط باستخدام JavaScript في الخطوة التالية.
<!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>
الآن ، دعنا نبني المخطط باستخدام كود JavaScript أدناه. أولاً ، نحصل على عنصر Canvas ، ثم نقوم بإنشاء كائن مخطط جديد ونمرر عنصر Canvas إليه ونحدد كائن البيانات الخاص به كما هو موضح أدناه.
لاحظ أن تسميات البيانات والبيانات مقيدة بمتغيرات التسميات والقيم التي يتم إرجاعها أثناء عرض الصفحة عند استدعاء دالة get_chart_page
في ملف app.py
الجزء الأخير المتبقي هو الوظيفة التي تم تكوينها للقيام بطلب Ajax كل ثانية واستدعاء عنوان URL /refreshData
، والذي سينفذ refresh_graph_data
في app.py
البيانات المحدثة الجديدة ، ثم تحديث الحرف الذي يعرض البيانات الجديدة.
<script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>
تشغيل التطبيقات معًا
لنقم بتشغيل التطبيقات الثلاثة بالترتيب التالي: 1. Twitter App Client. 2. تطبيق سبارك. 3. تطبيق ويب لوحة القيادة.
ثم يمكنك الوصول إلى لوحة التحكم في الوقت الفعلي باستخدام عنوان URL <http://localhost:5001/>
الآن ، يمكنك رؤية المخطط الخاص بك قيد التحديث ، على النحو التالي:
حالات استخدام الحياة الحقيقية المتدفقة من Apache
لقد تعلمنا كيفية إجراء تحليلات بيانات بسيطة على البيانات في الوقت الفعلي باستخدام Spark Streaming ودمجها مباشرة مع لوحة معلومات بسيطة باستخدام خدمة ويب RESTful. من هذا المثال ، يمكننا أن نرى مدى قوة Spark ، حيث إنها تلتقط دفقًا هائلاً من البيانات وتحولها وتستخرج رؤى قيمة يمكن استخدامها بسهولة لاتخاذ القرارات في أي وقت من الأوقات. هناك العديد من حالات الاستخدام المفيدة التي يمكن تنفيذها والتي يمكن أن تخدم صناعات مختلفة ، مثل الأخبار أو التسويق.
مثال صناعة الأخبار
يمكننا تتبع علامات التصنيف الأكثر تكرارًا لمعرفة الموضوعات التي يتحدث عنها الأشخاص كثيرًا على وسائل التواصل الاجتماعي. أيضًا ، يمكننا تتبع علامات تصنيف معينة وتغريداتهم من أجل معرفة ما يقوله الناس حول مواضيع أو أحداث معينة في العالم.
مثال تسويقي
يمكننا جمع دفق التغريدات ، ومن خلال تحليل المشاعر ، تصنيفها وتحديد اهتمامات الأشخاص من أجل استهدافهم بعروض تتعلق باهتماماتهم.
أيضًا ، هناك الكثير من حالات الاستخدام التي يمكن تطبيقها خصيصًا لتحليلات البيانات الضخمة ويمكن أن تخدم الكثير من الصناعات. لمزيد من حالات استخدام Apache Spark بشكل عام ، أقترح عليك مراجعة إحدى مشاركاتنا السابقة.
أشجعك على قراءة المزيد عن Spark Streaming من هنا لمعرفة المزيد عن إمكانياتها والقيام بتحويل أكثر تقدمًا على البيانات للحصول على مزيد من الأفكار في الوقت الفعلي باستخدامها.