Учебное пособие по потоковой передаче Apache Spark: определение популярных хэштегов Twitter

Опубликовано: 2022-03-11

В настоящее время данные растут и накапливаются быстрее, чем когда-либо прежде. В настоящее время около 90% всех данных, генерируемых в нашем мире, были сгенерированы только за последние два года. Из-за таких ошеломляющих темпов роста платформам больших данных пришлось принять радикальные решения, чтобы поддерживать такие огромные объемы данных.

Одним из основных источников данных сегодня являются социальные сети. Позвольте мне продемонстрировать пример из реальной жизни: обработка, анализ и извлечение информации из данных социальных сетей в режиме реального времени с использованием одного из самых важных решений эха для больших данных — Apache Spark и Python.

Apache Spark Streaming можно использовать для извлечения информации из социальных сетей, таких как популярные хэштеги Twitter.

В этой статье я научу вас, как создать простое приложение, которое читает онлайн-потоки из Twitter с помощью Python, затем обрабатывает твиты с помощью Apache Spark Streaming для определения хэштегов и, наконец, возвращает самые популярные хэштеги и представляет эти данные на реальном приборная панель времени.

Создание собственных учетных данных для API Twitter

Чтобы получать твиты из Twitter, вам необходимо зарегистрироваться в TwitterApps, нажав «Создать новое приложение», а затем заполнить форму ниже, нажав «Создать приложение Twitter».

Скриншот: Как создать приложение Twitter.

Во-вторых, перейдите в только что созданное приложение и откройте вкладку «Ключи и токены доступа». Затем нажмите «Создать мой токен доступа».

Снимок экрана: настройка учетных данных, ключей и токенов доступа для приложения Twitter.

Ваши новые токены доступа будут выглядеть, как показано ниже.

Скриншот: настройка токена доступа к приложению Twitter.

И теперь вы готовы к следующему шагу.

Создание HTTP-клиента Twitter

На этом этапе я покажу вам, как создать простой клиент, который будет получать твиты из Twitter API с помощью Python и передавать их экземпляру Spark Streaming. Этому должно быть легко следовать любому профессиональному разработчику Python.

Сначала давайте создадим файл с именем twitter_app.py а затем добавим в него код, как показано ниже.

Импортируйте библиотеки, которые мы будем использовать, как показано ниже:

 import socket import sys import requests import requests_oauthlib import json

И добавьте переменные, которые будут использоваться в OAuth для подключения к Twitter, как показано ниже:

 # Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Теперь мы создадим новую функцию с именем get_tweets , которая будет вызывать URL-адрес Twitter API и возвращать ответ для потока твитов.

 def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Затем создайте функцию, которая берет ответ из приведенного выше и извлекает текст твитов из всего объекта JSON твитов. После этого он отправляет каждый твит в экземпляр Spark Streaming (будет обсуждаться позже) через TCP-соединение.

 def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print("Tweet Text: " + tweet_text) print ("------------------------------------------") tcp_connection.send(tweet_text + '\n') except: e = sys.exc_info()[0] print("Error: %s" % e)

Теперь мы сделаем основную часть, которая создаст соединения сокетов хоста приложения, с которыми будет соединяться искра. Здесь мы настроим IP-адрес как localhost , так как все они будут работать на одной машине и на порту 9009 . Затем мы вызовем метод get_tweets , который мы сделали выше, для получения твитов из Twitter и передаем его ответ вместе с подключением к сокету в send_tweets_to_spark для отправки твитов в Spark.

 TCP_IP = "localhost" TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print("Waiting for TCP connection...") conn, addr = s.accept() print("Connected... Starting getting tweets.") resp = get_tweets() send_tweets_to_spark(resp, conn)

Настройка нашего потокового приложения Apache Spark

Давайте создадим наше потоковое приложение Spark, которое будет обрабатывать входящие твиты в режиме реального времени, извлекать из них хэштеги и подсчитывать, сколько хэштегов было упомянуто.

Иллюстрация: потоковая передача Spark позволяет обрабатывать входящие твиты и извлекать хэштеги в режиме реального времени.

Во-первых, мы должны создать экземпляр Spark Context sc , затем мы создали Streaming Context ssc из sc с пакетным интервалом в две секунды, который будет выполнять преобразование для всех потоков, получаемых каждые две секунды. Обратите внимание, что мы установили уровень журнала ERROR , чтобы отключить большинство журналов, которые пишет Spark.

Здесь мы определили контрольную точку, чтобы разрешить периодическую контрольную точку RDD; это обязательно для использования в нашем приложении, так как мы будем использовать преобразования с отслеживанием состояния (будет обсуждаться позже в том же разделе).

Затем мы определяем наш основной поток данных DStream, который будет подключаться к серверу сокетов, который мы создали ранее на порту 9009 , и читать твиты с этого порта. Каждая запись в DStream будет твитом.

 from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName("TwitterStreamApp") # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint("checkpoint_TwitterApp") # read data from port 9009 dataStream = ssc.socketTextStream("localhost",9009)

Теперь мы определим нашу логику преобразования. Сначала мы разделим все твиты на слова и поместим их в слова RDD. Затем мы будем фильтровать только хэштеги из всех слов и сопоставлять их с парой (hashtag, 1) и помещать их в хэштеги RDD.

Затем нам нужно подсчитать, сколько раз упоминался хэштег. Мы можем сделать это с помощью функции reduceByKey . Эта функция подсчитает, сколько раз хэштег упоминался в каждом пакете, т. е. сбросит счетчики в каждом пакете.

В нашем случае нам нужно рассчитать количество для всех пакетов, поэтому мы будем использовать другую функцию с именем updateStateByKey , так как эта функция позволяет вам поддерживать состояние RDD при обновлении его новыми данными. Этот способ называется Stateful Transformation .

Обратите внимание, что для использования updateStateByKey вам необходимо настроить контрольную точку, что мы и сделали на предыдущем шаге.

 # split each tweet into words words = dataStream.flatMap(lambda line: line.split(" ")) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()

updateStateByKey принимает в качестве параметра функцию, называемую функцией update . Он работает с каждым элементом в RDD и выполняет требуемую логику.

В нашем случае мы создали функцию обновления с aggregate_tags_count , которая будет суммировать все new_values ​​для каждого хэштега и добавлять их к total_sum , который является суммой по всем пакетам, и сохранять данные в RDD tags_totals .

 def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Затем мы выполняем обработку RDD tags_totals в каждом пакете, чтобы преобразовать ее во временную таблицу с помощью Spark SQL Context, а затем выполняем оператор select, чтобы получить десять первых хэштегов с их количеством и поместить их в фрейм данных hashtag_counts_df .

 def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print("----------- %s -----------" % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable("hashtags") # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10") hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print("Error: %s" % e)

Последним шагом в нашем приложении Spark является отправка фрейма данных hashtag_counts_df в приложение панели мониторинга. Итак, мы преобразуем фрейм данных в два массива, один для хэштегов, а другой для их количества. Затем мы отправим их в приложение панели мониторинга через REST API.

 def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

Наконец, вот пример вывода Spark Streaming во время запуска и печати hashtag_counts_df , вы заметите, что вывод печатается ровно каждые две секунды в соответствии с пакетными интервалами.

Пример вывода потоковой передачи Twitter Spark, напечатанный в соответствии с настройками пакетного интервала.

Создайте простую панель мониторинга в реальном времени для представления данных

Теперь мы создадим простое приложение панели мониторинга, которое Spark будет обновлять в режиме реального времени. Мы создадим его с помощью Python, Flask и Charts.js.

Во-первых, давайте создадим проект Python со структурой, показанной ниже, загрузим и добавим файл Chart.js в статический каталог.

Иллюстрация: Создание проекта Python для использования в анализе хэштегов Twitter

Затем в файле app.py мы создадим функцию с именем update_data , которая будет вызываться Spark через URL-адрес http://localhost:5001/updateData для обновления глобальных массивов меток и значений.

Кроме того, создается функция refresh_graph_data , которая вызывается запросом AJAX для возврата новых обновленных меток и массивов значений в виде JSON. Функция get_chart_page будет отображать страницу chart.html при вызове.

 from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route("/") def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print("labels now: " + str(labels)) print("data now: " + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return "error",400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print("labels received: " + str(labels)) print("data received: " + str(values)) return "success",201 if __name__ == "__main__": app.run(host='localhost', port=5001)

Теперь давайте создадим простую диаграмму в файле chart.html , чтобы отображать данные хэштега и обновлять их в режиме реального времени. Как определено ниже, нам нужно импортировать библиотеки JavaScript Chart.js и jquery.min.js .

В теге body мы должны создать холст и присвоить ему идентификатор, чтобы ссылаться на него при отображении диаграммы с помощью JavaScript на следующем шаге.

 <!DOCTYPE html> <html> <head> <meta charset="utf-8"/> <title>Top Trending Twitter Hashtags</title> <script src='static/Chart.js'></script> <script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script> </head> <body> <h2>Top Trending Twitter Hashtags</h2> <div> <canvas></canvas> </div> </body> </html>

Теперь давайте построим диаграмму, используя приведенный ниже код JavaScript. Сначала мы получаем элемент холста, а затем создаем новый объект диаграммы, передаем ему элемент холста и определяем его объект данных, как показано ниже.

Обратите внимание, что метки данных и сами данные ограничены метками и переменными значений, которые возвращаются при рендеринге страницы при вызове функции get_chart_page в файле app.py

Последняя оставшаяся часть — это функция, настроенная на выполнение Ajax-запроса каждую секунду и вызов URL-адреса /refreshData , который выполнит refresh_graph_data в app.py и вернет новые обновленные данные, а затем обновит char, который отображает новые данные.

 <script> var ctx = document.getElementById("chart"); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} "{{item}}", {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000); </script>

Совместный запуск приложений

Давайте запустим три приложения в указанном ниже порядке: 1. Клиент приложения Twitter. 2. Приложение Искра. 3. Веб-приложение информационной панели.

Затем вы можете получить доступ к панели мониторинга в реальном времени, используя URL-адрес <http://localhost:5001/>

Теперь вы можете видеть, как ваша диаграмма обновляется, как показано ниже:

Анимация: диаграмма хэштегов в Твиттере в режиме реального времени

Примеры использования потоковой передачи Apache в реальной жизни

Мы узнали, как выполнять простую аналитику данных в режиме реального времени с помощью Spark Streaming и интегрировать ее напрямую с простой информационной панелью с помощью веб-службы RESTful. Из этого примера видно, насколько мощным является Spark, поскольку он захватывает огромный поток данных, преобразует их и извлекает ценную информацию, которую можно легко использовать для принятия решений в кратчайшие сроки. Существует множество полезных вариантов использования, которые можно реализовать и которые могут служить различным отраслям, таким как новости или маркетинг.

Иллюстрация: хэштеги можно использовать для извлечения ценной информации и настроений, применимых в различных отраслях.

Пример новостной индустрии

Мы можем отслеживать наиболее часто упоминаемые хэштеги, чтобы знать, о каких темах люди говорят больше всего в социальных сетях. Кроме того, мы можем отслеживать определенные хэштеги и их твиты, чтобы знать, что люди говорят о конкретных темах или событиях в мире.

Маркетинговый пример

Мы можем собирать поток твитов и, проводя анализ настроений, классифицировать их и определять интересы людей, чтобы нацеливать их на предложения, связанные с их интересами.

Кроме того, существует множество вариантов использования, которые можно применять специально для анализа больших данных и которые могут использоваться во многих отраслях. Чтобы узнать больше о примерах использования Apache Spark, я предлагаю вам ознакомиться с одним из наших предыдущих постов.

Я рекомендую вам прочитать больше о Spark Streaming здесь, чтобы узнать больше о его возможностях и выполнить более продвинутое преобразование данных для получения дополнительной информации в режиме реального времени с его использованием.