Tutorial Streaming Apache Spark: Mengidentifikasi Tren Hashtag Twitter

Diterbitkan: 2022-03-11

Saat ini, data tumbuh dan terakumulasi lebih cepat dari sebelumnya. Saat ini, sekitar 90% dari semua data yang dihasilkan di dunia kita hanya dihasilkan dalam dua tahun terakhir. Karena tingkat pertumbuhan yang mengejutkan ini, platform data besar harus mengadopsi solusi radikal untuk mempertahankan volume data yang begitu besar.

Salah satu sumber utama data saat ini adalah jejaring sosial. Izinkan saya untuk mendemonstrasikan contoh kehidupan nyata: menangani, menganalisis, dan mengekstraksi wawasan dari data jejaring sosial secara real time menggunakan salah satu solusi gema data besar terpenting di luar sana—Apache Spark, dan Python.

Apache Spark Streaming dapat digunakan untuk mengekstrak wawasan dari media sosial, seperti tagar Twitter yang sedang tren

Dalam artikel ini, saya akan mengajari Anda cara membuat aplikasi sederhana yang membaca aliran online dari Twitter menggunakan Python, kemudian memproses tweet menggunakan Apache Spark Streaming untuk mengidentifikasi tagar dan, akhirnya, mengembalikan tagar trending teratas dan merepresentasikan data ini secara nyata. -dasbor waktu.

Membuat Kredensial Anda Sendiri untuk API Twitter

Untuk mendapatkan tweet dari Twitter, Anda harus mendaftar di TwitterApps dengan mengklik "Buat aplikasi baru" dan kemudian isi formulir di bawah ini, klik "Buat aplikasi Twitter Anda".

Tangkapan layar: Cara membuat aplikasi Twitter Anda.

Kedua, buka aplikasi yang baru Anda buat dan buka tab "Kunci dan Token Akses". Kemudian klik "Hasilkan token akses saya."

Tangkapan layar: Menyiapkan kredensial, kunci, dan token akses aplikasi Twitter.

Token akses baru Anda akan muncul seperti di bawah ini.

Tangkapan layar: Penyiapan token akses aplikasi Twitter.

Dan sekarang Anda siap untuk langkah selanjutnya.

Membangun Klien HTTP Twitter

Pada langkah ini, saya akan menunjukkan cara membangun klien sederhana yang akan mendapatkan tweet dari Twitter API menggunakan Python dan meneruskannya ke instance Spark Streaming. Seharusnya mudah diikuti untuk pengembang Python profesional mana pun.

Pertama, mari kita buat file bernama twitter_app.py dan kemudian kita akan menambahkan kode di dalamnya bersama-sama seperti di bawah ini.

Impor perpustakaan yang akan kita gunakan seperti di bawah ini:

 import socket import sys import requests import requests_oauthlib import json

Dan tambahkan variabel yang akan digunakan di OAuth untuk menghubungkan ke Twitter seperti di bawah ini:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Sekarang, kita akan membuat fungsi baru bernama get_tweets yang akan memanggil URL API Twitter dan mengembalikan respons untuk aliran tweet.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Kemudian, buat fungsi yang mengambil respons dari yang di atas dan mengekstrak teks tweet dari seluruh objek JSON tweet. Setelah itu, ia mengirimkan setiap tweet ke instance Spark Streaming (akan dibahas nanti) melalui koneksi TCP.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Sekarang, kita akan membuat bagian utama yang akan membuat koneksi soket host aplikasi yang akan terhubung dengan spark. Kami akan mengonfigurasi IP di sini menjadi localhost karena semua akan berjalan pada mesin yang sama dan port 9009 . Kemudian kita akan memanggil metode get_tweets , yang kita buat di atas, untuk mendapatkan tweet dari Twitter dan meneruskan responsnya bersama dengan koneksi soket ke send_tweets_to_spark untuk mengirim tweet ke Spark.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Menyiapkan Aplikasi Streaming Apache Spark kami

Mari buat aplikasi streaming Spark kami yang akan melakukan pemrosesan waktu nyata untuk tweet yang masuk, mengekstrak tagar darinya, dan menghitung berapa banyak tagar yang telah disebutkan.

Ilustrasi: Spark streaming memungkinkan pemrosesan tweet masuk dan ekstraksi hashtag secara real-time

Pertama, kita harus membuat instance Spark Context sc , lalu kita membuat Streaming Context ssc dari sc dengan interval batch dua detik yang akan melakukan transformasi pada semua aliran yang diterima setiap dua detik. Perhatikan bahwa kami telah menyetel level log ke ERROR untuk menonaktifkan sebagian besar log yang ditulis Spark.

Kami mendefinisikan sebuah pos pemeriksaan di sini untuk memungkinkan pos pemeriksaan RDD berkala; ini wajib digunakan di aplikasi kita, karena kita akan menggunakan transformasi stateful (akan dibahas nanti di bagian yang sama).

Kemudian kita mendefinisikan DStream dataStream utama kita yang akan terhubung ke server socket yang kita buat sebelumnya pada port 9009 dan membaca tweet dari port tersebut. Setiap record di DStream akan menjadi tweet.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Sekarang, kita akan mendefinisikan logika transformasi kita. Pertama kita akan membagi semua tweet menjadi kata-kata dan memasukkannya ke dalam kata-kata RDD. Kemudian kami hanya akan memfilter hashtag dari semua kata dan memetakannya ke pasangan (hashtag, 1) dan menempatkannya di hashtag RDD.

Kemudian kita perlu menghitung berapa kali hashtag tersebut telah disebutkan. Kita bisa melakukannya dengan menggunakan fungsi reduceByKey . Fungsi ini akan menghitung berapa kali hashtag telah disebutkan per setiap batch, yaitu akan mengatur ulang hitungan di setiap batch.

Dalam kasus kami, kami perlu menghitung jumlah di semua batch, jadi kami akan menggunakan fungsi lain yang disebut updateStateByKey , karena fungsi ini memungkinkan Anda untuk mempertahankan status RDD saat memperbaruinya dengan data baru. Cara ini disebut Stateful Transformation .

Perhatikan bahwa untuk menggunakan updateStateByKey , Anda harus mengonfigurasi pos pemeriksaan, dan apa yang telah kita lakukan di langkah sebelumnya.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey mengambil fungsi sebagai parameter yang disebut fungsi update . Ini berjalan pada setiap item dalam RDD dan melakukan logika yang diinginkan.

Dalam kasus kami, kami telah membuat fungsi pembaruan yang disebut aggregate_tags_count yang akan menjumlahkan semua new_values untuk setiap tagar dan menambahkannya ke total_sum yang merupakan jumlah di semua kumpulan dan menyimpan data ke tags_totals RDD.

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Kemudian kami melakukan pemrosesan pada tags_totals RDD di setiap batch untuk mengubahnya menjadi tabel temp menggunakan Spark SQL Context dan kemudian melakukan pernyataan pilih untuk mengambil sepuluh tagar teratas dengan jumlah mereka dan memasukkannya ke dalam bingkai data hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Langkah terakhir dalam aplikasi Spark kita adalah mengirimkan data frame hashtag_counts_df ke aplikasi dashboard. Jadi kami akan mengubah bingkai data menjadi dua larik, satu untuk tagar dan yang lainnya untuk jumlah mereka. Kemudian kami akan mengirimkannya ke aplikasi dashboard melalui REST API.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Terakhir, berikut adalah contoh output dari Spark Streaming saat menjalankan dan mencetak hashtag_counts_df , Anda akan melihat bahwa output dicetak tepat setiap dua detik sesuai interval batch.

Contoh output streaming Twitter Spark, dicetak per pengaturan interval batch

Buat Dasbor Real-time Sederhana untuk Mewakili Data

Sekarang, kita akan membuat aplikasi dashboard sederhana yang akan diupdate secara real time oleh Spark. Kami akan membangunnya menggunakan Python, Flask, dan Charts.js.

Pertama, mari buat proyek Python dengan struktur yang terlihat di bawah ini dan unduh dan tambahkan file Chart.js ke dalam direktori statis.

Ilustrasi: Membuat proyek Python untuk digunakan dalam analisis tagar Twitter

Kemudian, di file app.py , kita akan membuat fungsi yang disebut update_data , yang akan dipanggil oleh Spark melalui URL http://localhost:5001/updateData untuk memperbarui label Global dan array nilai.

Juga, fungsi refresh_graph_data dibuat untuk dipanggil oleh permintaan AJAX untuk mengembalikan label dan nilai array yang diperbarui sebagai JSON. Fungsi get_chart_page akan merender halaman chart.html saat dipanggil.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Sekarang, mari kita buat grafik sederhana di file chart.html untuk menampilkan data hashtag dan memperbaruinya secara real time. Seperti yang didefinisikan di bawah, kita perlu mengimpor library JavaScript Chart.js dan jquery.min.js .

Di tag body, kita harus membuat kanvas dan memberikan ID untuk mereferensikannya saat menampilkan bagan menggunakan JavaScript di langkah berikutnya.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Sekarang, mari buat bagan menggunakan kode JavaScript di bawah ini. Pertama, kita mendapatkan elemen kanvas, dan kemudian kita membuat objek bagan baru dan meneruskan elemen kanvas ke dalamnya dan mendefinisikan objek datanya seperti di bawah ini.

Perhatikan bahwa label dan data data dibatasi dengan label dan variabel nilai yang dikembalikan saat merender halaman saat memanggil fungsi get_chart_page di file app.py

Bagian terakhir yang tersisa adalah fungsi yang dikonfigurasi untuk melakukan permintaan Ajax setiap detik dan memanggil URL /refreshData , yang akan mengeksekusi refresh_graph_data di app.py dan mengembalikan data baru yang diperbarui, lalu memperbarui char yang merender data baru.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Menjalankan aplikasi bersama-sama

Mari kita jalankan ketiga aplikasi tersebut dengan urutan di bawah ini: 1. Twitter App Client. 2. Aplikasi Percikan. 3. Aplikasi Web Dasbor.

Kemudian Anda dapat mengakses dasbor waktu nyata menggunakan URL <http://localhost:5001/>

Sekarang, Anda dapat melihat grafik Anda diperbarui, seperti di bawah ini:

Animasi: Grafik tagar tren Twitter waktu-nyata

Apache Streaming Kasus Penggunaan Kehidupan Nyata

Kami telah mempelajari cara melakukan analisis data sederhana pada data secara real time menggunakan Spark Streaming dan mengintegrasikannya secara langsung dengan dasbor sederhana menggunakan layanan web RESTful. Dari contoh ini, kita dapat melihat betapa hebatnya Spark, karena ia menangkap aliran data yang sangat besar, mengubahnya, dan mengekstrak wawasan berharga yang dapat digunakan dengan mudah untuk membuat keputusan dalam waktu singkat. Ada banyak kasus penggunaan bermanfaat yang dapat diterapkan dan dapat melayani berbagai industri, seperti berita atau pemasaran.

Ilustrasi: Tagar dapat digunakan untuk mengekstrak wawasan dan sentimen yang berharga, yang dapat diterapkan di berbagai industri.

Contoh industri berita

Kami dapat melacak tagar yang paling sering disebutkan untuk mengetahui topik apa yang paling banyak dibicarakan orang di media sosial. Selain itu, kami dapat melacak tagar tertentu dan tweet mereka untuk mengetahui apa yang dikatakan orang tentang topik atau peristiwa tertentu di dunia.

Contoh pemasaran

Kami dapat mengumpulkan aliran tweet dan, dengan melakukan analisis sentimen, mengkategorikan mereka dan menentukan minat orang untuk menargetkan mereka dengan penawaran yang terkait dengan minat mereka.

Juga, ada banyak kasus penggunaan yang dapat diterapkan khusus untuk analitik data besar dan dapat melayani banyak industri. Untuk lebih banyak kasus penggunaan Apache Spark secara umum, saya sarankan Anda memeriksa salah satu posting kami sebelumnya.

Saya mendorong Anda untuk membaca lebih lanjut tentang Spark Streaming dari sini untuk mengetahui lebih banyak tentang kemampuannya dan melakukan transformasi lebih lanjut pada data untuk lebih banyak wawasan secara real time menggunakannya.