Работа в режиме реального времени с Redis Pub/Sub
Опубликовано: 2022-03-11Масштабирование веб-приложения почти всегда является интересной задачей, независимо от сложности. Однако веб-приложения реального времени создают уникальные проблемы с масштабируемостью. Например, чтобы иметь возможность горизонтального масштабирования веб-приложения для обмена сообщениями, которое использует WebSockets для связи со своими клиентами, ему потребуется каким-то образом синхронизировать все его серверные узлы. Если приложение не было создано с учетом этого, то его горизонтальное масштабирование может быть непростым вариантом.
В этой статье мы рассмотрим архитектуру простого веб-приложения для обмена изображениями и обмена сообщениями в реальном времени. Здесь мы сосредоточимся на различных компонентах, таких как Redis Pub/Sub, участвующих в создании приложения реального времени, и посмотрим, как все они играют свою роль в общей архитектуре.
По функциональности приложение очень легкое. Он позволяет загружать изображения и комментировать их в реальном времени. Кроме того, любой пользователь может нажать на изображение, и другие пользователи смогут увидеть эффект ряби на своем экране.
Весь исходный код этого приложения доступен на GitHub.
Вещи, которые нам нужны
Идти
Мы будем использовать язык программирования Go. Нет особой причины, по которой мы выбираем Go для этой статьи, кроме того, что синтаксис Go понятен, а его семантика легче для понимания. Ну и тут конечно авторская необъективность. Однако все концепции, обсуждаемые в этой статье, можно легко перевести на любой язык по вашему выбору.
Начать работу с Go легко. Его бинарный дистрибутив можно скачать с официального сайта. Если вы работаете в Windows, на их странице загрузки есть установщик MSI для Go. Или, если ваша операционная система (к счастью) предлагает менеджер пакетов:
Арх Линукс:
pacman -S go
Убунту:
apt-get install golang
Mac OS X:
brew install go
Этот будет работать, только если у нас установлен Homebrew.
MongoDB
Вы спросите, зачем использовать MongoDB, если у нас есть Redis? Как упоминалось ранее, Redis — это хранилище данных в памяти. Хотя он может сохранять данные на диск, использование Redis для этой цели, вероятно, не лучший способ. Мы будем использовать MongoDB для хранения загруженных метаданных изображений и сообщений.
Мы можем скачать MongoDB с их официального сайта. В некоторых дистрибутивах Linux это предпочтительный способ установки MongoDB. Тем не менее, его все равно можно установить с помощью диспетчера пакетов большинства дистрибутивов.
Арх Линукс:
pacman -S mongodb
Убунту:
apt-get install mongodb
Mac OS X:
brew install mongodb
В нашем коде Go мы будем использовать пакет mgo (произносится как манго). Пакет драйверов не только проверен в бою, но и предлагает действительно чистый и простой API.
Если вы не являетесь экспертом по MongoDB, не беспокойтесь. Использование этой службы базы данных минимально в нашем примере приложения и почти не имеет отношения к теме этой статьи: архитектуре Pub/Sub.
Амазонка S3
Мы будем использовать Amazon S3 для хранения загруженных пользователем изображений. Здесь нечего делать, кроме как убедиться, что у нас есть готовая учетная запись Amazon Web Services и создана временная корзина.
Сохранение загруженных файлов на локальный диск не вариант, потому что мы не хотим никоим образом полагаться на идентичность наших веб-узлов. Мы хотим, чтобы пользователи могли подключаться к любому из доступных веб-узлов и при этом видеть тот же контент.
Для взаимодействия с корзиной Amazon S3 из нашего кода Go мы будем использовать AdRoll/goamz, ответвление пакета goamz от Canonical с некоторыми отличиями.
Редис
И последнее, но не менее важное: Redis. Мы можем установить его с помощью менеджера пакетов нашего дистрибутива:
Арх Линукс:
pacman -S redis
Убунту:
apt-get install redis-server
Mac OS X:
brew install redis
Или возьмите его исходный код и скомпилируйте его самостоятельно. Redis не имеет никаких зависимостей, кроме GCC и libc для его сборки:
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
После установки и запуска Redis запустите терминал и войдите в интерфейс командной строки Redis:
redis-cli
Попробуйте ввести следующие команды и посмотрите, получите ли вы ожидаемый результат:
SET answer 41 INCR answer GET answer
Первая команда записывает «41» для ключа «ответ», вторая команда увеличивает значение, третья команда печатает значение, сохраненное для данного ключа. Результат должен читаться как «42».
Вы можете узнать больше обо всех командах, которые поддерживает Redis, на их официальном сайте.
Мы будем использовать пакет Go redigo для подключения к Redis из кода нашего приложения.
Загляните в Redis Pub/Sub
Шаблон публикации-подписки — это способ передачи сообщений произвольному числу отправителей. Отправители этих сообщений (издатели) явно не идентифицируют целевых получателей. Вместо этого сообщения отправляются по каналу, на котором их может ожидать любое количество получателей (подписчиков).
В нашем случае у нас может быть любое количество веб-узлов, работающих за балансировщиком нагрузки. В любой момент два пользователя, просматривающие одно и то же изображение, не могут быть подключены к одному и тому же узлу. Именно здесь в игру вступает Redis Pub/Sub. Всякий раз, когда веб-узлу требуется наблюдение за изменением (например, новое сообщение создается пользователем), он будет использовать Redis Pub/Sub для передачи этой информации всем соответствующим веб-узлам. Что, в свою очередь, передаст информацию соответствующим клиентам, чтобы они могли получить обновленный список сообщенийredis.
Поскольку шаблон публикации-подписки позволяет нам отправлять сообщения по именованным каналам, мы можем подключить каждый веб-узел к Redis и подписаться только на те каналы, которые интересуют их подключенных пользователей. Например, если два пользователя просматривают одно и то же изображение, но подключены к двум разным веб-узлам из множества веб-узлов, то только эти два веб-узла должны подписаться на соответствующий канал. Любое сообщение, опубликованное на этом канале, будет доставлено только этим двум веб-узлам.
Звучит слишком хорошо, чтобы быть правдой? Мы можем попробовать это с помощью интерфейса командной строки Redis. Запустите три экземпляра redis-cli
. Выполните следующую команду в первом экземпляре:
SUBSCRIBE somechannel
Выполните следующую команду во втором экземпляре Redis CLI:
SUBSCRIBE someotherchannel
Выполните следующие команды в третьем экземпляре Redis CLI:
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum
Обратите внимание, как первый экземпляр получил «lorem», но не «ipsum», и как второй экземпляр получил «ipsum», но не «lorem».
Стоит отметить, что как только клиент Redis переходит в режим подписчика, он больше не может выполнять какие-либо операции, кроме как подписаться на большее количество каналов или отказаться от подписки на подписанные. Это означает, что каждый веб-узел должен будет поддерживать два подключения к Redis: одно для подключения к Redis в качестве подписчика, а другое для публикации сообщений в каналах, чтобы любой веб-узел, подписанный на эти каналы, мог их получать.
В режиме реального времени и масштабируемость
Прежде чем мы начнем исследовать, что происходит за кулисами, давайте клонируем репозиторий:
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...
… и скомпилируйте его:
go build ./cmd/tonesad
Чтобы запустить приложение, прежде всего создайте файл с именем .env (желательно, скопировав файл env-sample.txt):
cp env-sample.txt .env
Заполните файл .env всеми необходимыми переменными среды:
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}
Наконец, запустите построенный двоичный файл:
PORT=9091 ./tonesad -env-file=.env
Теперь веб-узел должен быть запущен и доступен через http://localhost:9091.
Чтобы проверить, работает ли он по-прежнему при горизонтальном масштабировании, вы можете развернуть несколько веб-узлов, запустив их с разными номерами портов:
PORT=9092 ./tonesad -env-file=.env
PORT=9093 ./tonesad -env-file=.env
… и доступ к ним через соответствующие URL-адреса: http://localhost:9092 и http://localhost:9093.
За кулисами
Вместо того, чтобы рассматривать каждый шаг разработки приложения, мы сосредоточимся на некоторых наиболее важных частях. Хотя не все из них на 100 % относятся к Redis Pub/Sub и его последствиям в реальном времени, они по-прежнему имеют отношение к общей структуре приложения и облегчат отслеживание, когда мы углубимся.
Для простоты мы не будем беспокоиться об аутентификации пользователя. Загрузки будут анонимными и доступными для всех, кто знает URL-адрес. Все зрители могут отправлять сообщения и иметь возможность выбрать свой собственный псевдоним. Адаптация надлежащего механизма аутентификации и возможностей конфиденциальности должна быть тривиальной и выходит за рамки этой статьи.
Сохраняющиеся данные
Это легко.
Всякий раз, когда пользователь загружает изображение, мы сохраняем его в Amazon S3, а затем сохраняем путь к нему в MongoDB с двумя идентификаторами: один идентификатор объекта BSON (любимый в MongoDB) и другой короткий идентификатор длиной 8 символов (несколько приятный для глаз). Это входит в коллекцию «uploads» нашей базы данных и имеет такую структуру:
type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }
Поле « Вид » используется для указания типа медиафайлов, которые содержит эта «загрузка». Означает ли это, что мы поддерживаем другие носители, кроме изображений? К сожалению нет. Но поле было оставлено там, чтобы служить напоминанием о том, что здесь мы не обязательно ограничены изображениями.
Когда пользователи отправляют сообщения друг другу, они сохраняются в другой коллекции. Да, вы уже догадались: «сообщения».

type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }
Единственным интересным моментом здесь является поле UploadID, которое используется для связывания сообщений с конкретной загрузкой.
Конечные точки API
Это приложение по существу имеет три конечных точки.
ПОСТ/апи/загрузки
Обработчик этой конечной точки ожидает отправки «multipart/form-data» с изображением в поле «file». Поведение обработчика примерно следующее:
func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }
Go требует, чтобы все ошибки обрабатывались явно. Это было сделано в прототипе, но опущено во фрагментах этой статьи, чтобы сосредоточить внимание на критических частях.
В обработчике этой конечной точки API мы, по сути, читаем файл, но ограничиваем его размер до определенного значения. Если загрузка превышает это значение, запрос отклоняется. В противном случае создается идентификатор BSON, который используется для загрузки изображения в Amazon S3 перед сохранением объекта загрузки в MongoDB.
В способе генерации идентификаторов объектов BSON есть свои плюсы и минусы. Они генерируются на стороне клиента. Однако стратегия, используемая для генерации идентификатора объекта, делает вероятность столкновения настолько незначительной, что их безопасно генерировать на стороне клиента. С другой стороны, значения сгенерированных идентификаторов объектов обычно являются последовательными, и это то, что Amazon S3 не очень любит. Простым обходным путем для этого является префикс имени файла со случайной строкой.
ПОЛУЧИТЬ /api/uploads/{id}/messages
Этот API используется для получения последних сообщений и сообщений, которые были опубликованы после определенного времени.
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }
Когда браузер пользователя уведомляется о новом сообщении при загрузке, которую пользователь в данный момент просматривает, он извлекает новые сообщения, используя эту конечную точку.
POST /api/uploads/{id}/messages
И, наконец, обработчик, который создает сообщения и уведомляет всех:
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }
Этот обработчик настолько похож на другие, что даже включать его здесь почти скучно. Или это? Обратите внимание на вызов функции hub.Emit() в самом конце функции. Что такое хаб, вы говорите? Вот где происходит вся магия Pub/Sub.
Hub: Где WebSockets встречаются с Redis
Hub — это место, где мы склеиваем WebSockets с каналами Redis Pub/Sub. И, по совпадению, пакет, который мы используем для обработки веб-сокетов на наших веб-серверах, называется клей.
Hub по сути поддерживает несколько структур данных, которые создают сопоставление между всеми подключенными веб-сокетами и всеми интересующими их каналами. Например, веб-сокет на вкладке браузера пользователя, указывающий на конкретное загруженное изображение, естественно, должен быть заинтересован во всех соответствующих уведомлениях. к этому.
Пакет хаба реализует шесть функций:
- Подписаться
- ОтписатьсяВсе
- Испускают
- EmitLocal
- InitHub
- Ручка сокета
Подписаться и ОтписатьсяВсе
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }
Эта функция, как и большинство других в этом пакете, удерживает блокировку мьютекса чтения/записи во время своего выполнения. Это делается для того, чтобы мы могли безопасно модифицировать примитивные структуры данных, переменные сокеты и топики . Первая переменная, sockets , сопоставляет сокеты с именами каналов, а вторая, Topics , сопоставляет имена каналов с сокетами. В этой функции мы строим эти отображения. Всякий раз, когда мы видим, что сокет подписывается на новое имя канала, мы создаем наше соединение с Redis, subconn , подписываемся на этот канал в Redis, используя subconn.Subscribe . Это заставляет Redis пересылать все уведомления по этому каналу на этот веб-узел.
И, аналогично, в функции UnsubscribeAll мы разрываем маппинг:
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }
Когда мы удаляем последний сокет из структуры данных, интересующей конкретный канал, мы отписываемся от канала в Redis с помощью subconn.Unsubscribe .
Испускают
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }
Эта функция публикует сообщение m на канале t , используя соединение публикации с Redis.
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }
InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }
В функции InitHub мы создаем два подключения к Redis: одно для подписки на каналы, которые интересуют этот веб-узел, и другое для публикации сообщений. Как только соединения установлены, мы запускаем новую процедуру Go с вечно работающим циклом, ожидающим получения сообщений через абонентское соединение с Redis. Каждый раз, когда он получает сообщение, он отправляет его локально (т. е. всем веб-сокетам, подключенным к этому веб-узлу).
Ручка сокета
И, наконец, HandleSocket — это то место, где мы ждем сообщений, приходящих через WebSockets, или очищаем их после закрытия соединения:
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }
Внешний JavaScript
Поскольку Glue поставляется со своей собственной библиотекой JavaScript для внешнего интерфейса, намного проще обрабатывать WebSockets (или откат к опросу XHR, когда WebSockets недоступны):
var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)
На стороне клиента мы прослушиваем любое сообщение, поступающее через WebSocket. Поскольку клей передает все сообщения в виде строк, мы кодируем всю информацию в нем с помощью определенных шаблонов:
- Новое сообщение: «сообщение: {messageID}»
- Кликните по изображению: «touch:{coordX},{coordY}», где coordX и coordY — это процентная координата местоположения клика пользователя на изображении.
Когда пользователь создает новое сообщение, мы используем API «POST /api/uploads/{uploadID}/messages» для создания нового сообщения. Это делается с помощью метода create в базовой коллекции сообщений:
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })
Когда пользователь щелкает изображение, мы вычисляем положение щелчка в процентах от ширины и высоты изображения и отправляем информацию напрямую через WebSocket.
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())
Обзор
Когда пользователь вводит сообщение и нажимает клавишу ввода, клиент вызывает конечную точку API «POST /api/uploads/{id}/messages». Это, в свою очередь, создает объект сообщения в базе данных и публикует строку «message: {messageID}» через Redis Pub/Sub на канале «upload: {uploadID}» через пакет концентратора.
Redis пересылает эту строку каждому веб-узлу (подписчику), заинтересованному в канале «upload: {uploadID}». Веб-узлы, получающие эту строку, перебирают все веб-сокеты, относящиеся к каналу, и отправляют строку клиенту через свои соединения с веб-сокетами. Клиенты, получившие эту строку, начинают получать новые сообщения с сервера, используя команду «GET /api/uploads/{id}/messages».
Точно так же для распространения событий щелчка по изображению клиент напрямую отправляет сообщение через WebSocket, которое выглядит примерно так: «touch upload: {uploadID} {coordX} {coordY}». Это сообщение попадает в пакет-концентратор, где оно публикуется на том же канале «upload: {uploadID}». В результате строка распространяется среди всех пользователей, просматривающих загруженное изображение. Клиент, получив эту строку, анализирует ее, чтобы извлечь координаты, и отображает растущий и исчезающий круг, чтобы на мгновение выделить место щелчка.
Заворачивать
В этой статье мы увидели проблеск того, как шаблон публикации-подписки может помочь решить проблему масштабирования веб-приложений в реальном времени в значительной степени и с относительной легкостью.
Образец приложения служит площадкой для экспериментов с Redis Pub/Sub. Но, как упоминалось ранее, идеи можно реализовать практически на любом другом популярном языке программирования.