Ir en tiempo real con Redis Pub/Sub

Publicado: 2022-03-11

Escalar una aplicación web es casi siempre un desafío interesante, independientemente de la complejidad involucrada. Sin embargo, las aplicaciones web en tiempo real plantean problemas de escalabilidad únicos. Por ejemplo, para poder escalar horizontalmente una aplicación web de mensajería que usa WebSockets para comunicarse con sus clientes, deberá sincronizar de alguna manera todos sus nodos de servidor. Si la aplicación no se creó teniendo esto en cuenta, es posible que escalarla horizontalmente no sea una opción fácil.

En este artículo, analizaremos la arquitectura de una aplicación web simple para compartir imágenes y enviar mensajes en tiempo real. Aquí, nos centraremos en los diversos componentes, como Redis Pub/Sub, involucrados en la creación de una aplicación en tiempo real y veremos cómo todos desempeñan su función en la arquitectura general.

Ir en tiempo real con Redis Pub/Sub

Ir en tiempo real con Redis Pub/Sub
Pío

En cuanto a la funcionalidad, la aplicación es muy ligera. Permite la carga de imágenes y comentarios en tiempo real sobre esas imágenes. Además, cualquier usuario puede tocar la imagen y otros usuarios podrán ver un efecto ondulante en su pantalla.

El código fuente completo de esta aplicación está disponible en GitHub.

Cosas que necesitamos

Ir

Usaremos el lenguaje de programación Go. No hay ninguna razón especial por la que elegimos Go para este artículo, además de que la sintaxis de Go es limpia y su semántica es más fácil de seguir. Y luego está, por supuesto, el sesgo del autor. Sin embargo, todos los conceptos discutidos en este artículo se pueden traducir fácilmente al idioma de su elección.

Comenzar con Go es fácil. Su distribución binaria se puede descargar desde el sitio oficial. En caso de que esté en Windows, hay un instalador MSI para Go en su página de descarga. O, en caso de que su sistema operativo (afortunadamente) ofrezca un administrador de paquetes:

Arco Linux:

 pacman -S go

Ubuntu:

 apt-get install golang

Mac OS X:

 brew install go

Este funcionará solo si tenemos Homebrew instalado.

MongoDB

¿Por qué usar MongoDB si tenemos Redis, preguntas? Como se mencionó anteriormente, Redis es un almacén de datos en memoria. Aunque puede conservar los datos en el disco, usar Redis para ese propósito probablemente no sea la mejor manera de hacerlo. Usaremos MongoDB para almacenar metadatos y mensajes de imágenes cargadas.

Podemos descargar MongoDB desde su web oficial. En algunas distribuciones de Linux, esta es la forma preferida de instalar MongoDB. Sin embargo, aún debería poder instalarse utilizando el administrador de paquetes de la mayoría de las distribuciones.

Arco Linux:

 pacman -S mongodb

Ubuntu:

 apt-get install mongodb

Mac OS X:

 brew install mongodb

Dentro de nuestro código Go, usaremos el paquete mgo (pronunciado mango). No solo está probado en batalla, el paquete de controladores ofrece una API realmente limpia y simple.

Si no eres un experto en MongoDB, no te preocupes en absoluto. El uso de este servicio de base de datos es mínimo en nuestra aplicación de muestra y es casi irrelevante para el enfoque de este artículo: la arquitectura Pub/Sub.

Amazonas S3

Usaremos Amazon S3 para almacenar las imágenes cargadas por el usuario. No hay mucho que hacer aquí, excepto asegurarse de que tenemos una cuenta lista para Amazon Web Services y un depósito temporal creado.

Almacenar los archivos cargados en el disco local no es una opción porque no queremos confiar en la identidad de nuestros nodos web de ninguna manera. Queremos que los usuarios puedan conectarse a cualquiera de los nodos web disponibles y seguir viendo el mismo contenido.

Para interactuar con el depósito de Amazon S3 desde nuestro código Go, usaremos AdRoll/goamz, una bifurcación del paquete goamz de Canonical con algunas diferencias.

redis

Por último, pero no menos importante: Redis. Podemos instalarlo usando el administrador de paquetes de nuestra distribución:

Arco Linux:

 pacman -S redis

Ubuntu:

 apt-get install redis-server

Mac OS X:

 brew install redis

O busque su código fuente y compílelo usted mismo. Redis no tiene otras dependencias que GCC y libc para construirlo:

 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

Una vez que Redis esté instalado y funcionando, inicie una terminal e ingrese a la CLI de Redis:

 redis-cli

Intente ingresar los siguientes comandos y vea si obtiene el resultado esperado:

 SET answer 41 INCR answer GET answer

El primer comando almacena "41" contra la clave "respuesta", el segundo comando incrementa el valor, el tercer comando imprime el valor almacenado contra la clave dada. El resultado debe leer "42".

Puede obtener más información sobre todos los comandos que admite Redis en su sitio web oficial.

Usaremos el paquete Go redigo para conectarnos a Redis desde el código de nuestra aplicación.

Eche un vistazo a Redis Pub/Sub

El patrón de publicación-suscripción es una forma de pasar mensajes a un número arbitrario de remitentes. Los remitentes de estos mensajes (editores) no identifican explícitamente a los destinatarios. En su lugar, los mensajes se envían por un canal en el que cualquier cantidad de destinatarios (suscriptores) pueden estar esperándolos.

Configuración simple de publicación y suscripción

En nuestro caso, podemos tener cualquier cantidad de nodos web ejecutándose detrás de un balanceador de carga. En un momento dado, dos usuarios que miran la misma imagen pueden no estar conectados al mismo nodo. Aquí es donde entra en juego Redis Pub/Sub. Siempre que un nodo web necesite observar un cambio (por ejemplo, el usuario crea un nuevo mensaje), utilizará Redis Pub/Sub para transmitir esa información a todos los nodos web relevantes. Lo que, a su vez, propagará la información a los clientes relevantes para que puedan obtener la lista actualizada de mensajes redis.

Dado que el patrón de publicación-suscripción nos permite enviar mensajes en canales con nombre, podemos tener cada nodo web conectado a Redis y suscritos solo a aquellos canales en los que estén interesados ​​sus usuarios conectados. Por ejemplo, si dos usuarios miran el misma imagen pero están conectados a dos nodos web diferentes de muchos nodos web, entonces solo esos dos nodos web necesitan suscribirse al canal correspondiente. Cualquier mensaje publicado en ese canal se entregará solo a esos dos nodos web.

Suena demasiado bueno para ser verdad? Podemos probarlo usando la CLI de Redis. Inicie tres instancias de redis-cli . Ejecute el siguiente comando en primera instancia:

 SUBSCRIBE somechannel

Ejecute el siguiente comando en la segunda instancia de Redis CLI:

 SUBSCRIBE someotherchannel

Ejecute los siguientes comandos en la tercera instancia de Redis CLI:

 PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

Observe cómo la primera instancia recibió "lorem" pero no "ipsum", y cómo la segunda instancia recibió "ipsum" pero no "lorem".

Redis Pub/Sub en acción

Cabe mencionar que una vez que un cliente de Redis ingresa al modo suscriptor, ya no puede realizar ninguna operación más que suscribirse a más canales o darse de baja de los suscritos. Esto significa que cada nodo web deberá mantener dos conexiones a Redis, una para conectarse a Redis como suscriptor y la otra para publicar mensajes en los canales para que cualquier nodo web suscrito a esos canales pueda recibirlos.

En tiempo real y escalable

Antes de comenzar a explorar lo que sucede detrás de escena, clonemos el repositorio:

 mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

… y compilarlo:

 go build ./cmd/tonesad

Para ejecutar la aplicación, primero cree un archivo llamado .env (preferiblemente copiando el archivo env-sample.txt):

 cp env-sample.txt .env

Complete el archivo .env con todas las variables de entorno necesarias:

 MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

Finalmente ejecute el binario construido:

 PORT=9091 ./tonesad -env-file=.env

El nodo web ahora debería estar ejecutándose y ser accesible a través de http://localhost:9091.

ejemplo en vivo

Para probar si todavía funciona cuando se escala horizontalmente, puede activar varios nodos web iniciándolos con diferentes números de puerto:

 PORT=9092 ./tonesad -env-file=.env
 PORT=9093 ./tonesad -env-file=.env

… y accediendo a ellos a través de sus correspondientes URLs: http://localhost:9092 y http://localhost:9093.

ejemplo en vivo

Entre bastidores

En lugar de pasar por todos los pasos del desarrollo de la aplicación, nos centraremos en algunas de las partes más importantes. Aunque no todos estos son 100 % relevantes para Redis Pub/Sub y sus implicaciones en tiempo real, siguen siendo relevantes para la estructura general de la aplicación y facilitarán el seguimiento una vez que profundicemos más.

Para simplificar las cosas, no nos vamos a preocupar por la autenticación del usuario. Las cargas serán anónimas y estarán disponibles para todos los que conozcan la URL. Todos los espectadores pueden enviar mensajes y podrán elegir su propio alias. Adaptar el mecanismo de autenticación adecuado y las capacidades de privacidad debería ser trivial y está más allá del alcance de este artículo.

Datos persistentes

Este es fácil.

Cada vez que un usuario carga una imagen, la almacenamos en Amazon S3 y luego almacenamos la ruta a ella en MongoDB contra dos ID: una ID de objeto BSON (la favorita de MongoDB) y otra ID corta de 8 caracteres (algo agradable a la vista). Esto va a la colección de "cargas" de nuestra base de datos y tiene una estructura como esta:

 type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }

El campo Tipo se utiliza para indicar el tipo de contenido multimedia que contiene esta "carga". ¿Significa esto que admitimos medios que no sean imágenes? Lamentablemente no. Pero el campo se ha dejado ahí para que actúe como un recordatorio de que aquí no estamos necesariamente limitados a las imágenes.

A medida que los usuarios se envían mensajes entre sí, estos se almacenan en una colección diferente. Sí, lo has adivinado: “mensajes”.

 type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

Lo único interesante aquí es el campo UploadID, que se usa para asociar mensajes a una carga en particular.

Puntos finales de la API

Esta aplicación tiene esencialmente tres puntos finales.

POST /api/cargas

El controlador de este punto final espera un envío de "datos de formulario/varias partes" con la imagen en el campo "archivo". El comportamiento del controlador es más o menos como sigue:

 func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go requiere que todos los errores se manejen explícitamente. Esto se hizo en el prototipo, pero se omite en los fragmentos de este artículo para mantener el enfoque en las partes críticas.

En el controlador de este punto final de API, esencialmente estamos leyendo el archivo pero limitando su tamaño a un valor específico. Si la carga supera este valor, se rechaza la solicitud. De lo contrario, se genera una ID de BSON y se utiliza para cargar la imagen en Amazon S3 antes de conservar la entidad de carga en MongoDB.

Hay una ventaja y una desventaja en la forma en que se generan los ID de objetos BSON. Se generan en el extremo del cliente. Sin embargo, la estrategia utilizada para generar Object ID hace que la probabilidad de colisión sea tan minúscula que es seguro generarlos en el lado del cliente. Por otro lado, los valores de los ID de objetos generados suelen ser secuenciales y eso es algo que a Amazon S3 no le gusta mucho. Una solución fácil a esto es prefijar el nombre del archivo con una cadena aleatoria.

OBTENER /api/cargas/{id}/mensajes

Esta API se utiliza para obtener mensajes recientes y mensajes que se han publicado después de un tiempo determinado.

 func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

Cuando se notifica al navegador de un usuario sobre un nuevo mensaje en una carga que el usuario está mirando actualmente, obtiene los nuevos mensajes utilizando este punto final.

POST /api/uploads/{id}/mensajes

Y finalmente el controlador que crea mensajes y notifica a todos:

 func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

Este controlador es tan similar a los otros que es casi aburrido incluso incluirlo aquí. ¿O es eso? Observe cómo hay una llamada de función hub.Emit() al final de la función. ¿Qué es el centro que dices? Ahí es donde ocurre toda la magia de Pub/Sub.

Hub: Donde WebSockets se encuentra con Redis

Hub es donde unimos WebSockets con los canales Pub/Sub de Redis. Y, casualmente, el paquete que estamos usando para manejar WebSockets dentro de nuestros servidores web se llama pegamento.

Hub esencialmente mantiene algunas estructuras de datos que crean un mapeo entre todos los WebSockets conectados a todos los canales que les interesan. Por ejemplo, un WebSocket en la pestaña del navegador del usuario que apunta a una imagen cargada en particular debería estar naturalmente interesado en todas las notificaciones relevantes. lo.

El paquete hub implementa seis funciones:

  • Suscribir
  • Darse de bajaTodos
  • Emitir
  • EmitLocal
  • InitHub
  • EmpuñaduraSocket

Suscribirse y darse de bajaTodos

 func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

Esta función, al igual que la mayoría de las otras en este paquete, bloquea un mutex de lectura/escritura mientras se ejecuta. Esto es para que podamos modificar con seguridad las estructuras de datos primitivos, variables, sockets y temas . La primera variable, sockets , asigna sockets a nombres de canales, mientras que la segunda, topics , asigna nombres de canales a sockets. En esta función construimos estos mapeos. Cada vez que vemos que el socket se suscribe a un nuevo nombre de canal, hacemos nuestra conexión Redis, subconn , suscribirse a ese canal en Redis usando subconn.Subscribe . Esto hace que Redis reenvíe todas las notificaciones de ese canal a este nodo web.

Y, del mismo modo, en la función UnsubscribeAll , derribamos el mapeo:

 func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

Cuando eliminamos el último socket de la estructura de datos interesada en un canal en particular, cancelamos la suscripción del canal en Redis usando subconn.Unsubscribe .

Emitir

 func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

Esta función publica un mensaje m en el canal t mediante la conexión de publicación a Redis.

EmitLocal

 func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

InitHub

 func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }

En la función InitHub , estamos creando dos conexiones a Redis: una para suscribirse a los canales que le interesan a este nodo web y otra para publicar mensajes. Una vez que se establecen las conexiones, comenzamos una nueva rutina Go con un ciclo que se ejecuta para siempre esperando recibir mensajes a través de la conexión del suscriptor a Redis. Cada vez que recibe un mensaje, lo emite localmente (es decir, a todos los WebSockets conectados a este nodo web).

EmpuñaduraSocket

Y finalmente, HandleSocket es donde esperamos que los mensajes lleguen a través de WebSockets o se limpien después de que se cierre la conexión:

 func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }

JavaScript de interfaz de usuario

Dado que Glue viene con su propia biblioteca JavaScript de front-end, es mucho más fácil manejar WebSockets (o retroceder al sondeo XHR cuando los WebSockets no están disponibles):

 var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

En el lado del cliente, escuchamos cualquier mensaje que ingrese a través de WebSocket. Dado que Glue transmite todos los mensajes como cadenas, codificamos toda la información usando patrones específicos:

  • Nuevo mensaje: “mensaje:{messageID}”
  • Haga clic en la imagen: "toque:{coordX},{coordY}", donde coordX y coordY son las coordenadas basadas en porcentajes de la ubicación del clic del usuario en la imagen

Cuando el usuario crea un nuevo mensaje, usamos la API "POST /api/uploads/{uploadID}/messages" para crear un nuevo mensaje. Esto se hace usando el método de creación en la colección de backbone para mensajes:

 messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

Cuando el usuario hace clic en la imagen, calculamos la posición del clic en porcentaje del ancho y el alto de la imagen y enviamos la información directamente a través de WebSocket.

 socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

El resumen

Descripción general de la arquitectura de la aplicación

Cuando el usuario escribe un mensaje y presiona la tecla Intro, el cliente invoca el punto final de la API "POST /api/uploads/{id}/messages". Esto, a su vez, crea una entidad de mensaje en la base de datos y publica una cadena "message:{messageID}" a través de Redis Pub/Sub en el canal "upload:{uploadID}" a través del paquete hub.

Redis reenvía esta cadena a cada nodo web (suscriptor) interesado en el canal "upload:{uploadID}". Los nodos web que reciben esta cadena iteran sobre todos los WebSockets relevantes para el canal y envían la cadena al cliente a través de sus conexiones WebSocket. Los clientes que reciben esta cadena comienzan a obtener nuevos mensajes del servidor utilizando "GET /api/uploads/{id}/messages".

De manera similar, para propagar eventos de clic en la imagen, el cliente envía directamente un mensaje a través del WebSocket que se parece a "toque cargar:{uploadID} {coordX} {coordY}". Este mensaje termina en el paquete concentrador donde se publica en el mismo canal "upload:{uploadID}". Como resultado, la cadena se distribuye a todos los usuarios que miran la imagen cargada. El cliente, al recibir esta cadena, la analiza para extraer las coordenadas y representa un círculo que crece y se desvanece para resaltar la ubicación del clic momentáneamente.

Envolver

En este artículo hemos visto un vistazo de cómo el patrón de publicación-suscripción puede ayudar a resolver el problema de escalar aplicaciones web en tiempo real en gran medida y con relativa facilidad.

La aplicación de muestra existe para servir como un campo de juego para experimentar con Redis Pub/Sub. Pero, como se mencionó anteriormente, las ideas se pueden implementar en casi cualquier otro lenguaje de programación popular.