Em tempo real com o Redis Pub/Sub

Publicados: 2022-03-11

Dimensionar um aplicativo da Web é quase sempre um desafio interessante, independentemente da complexidade envolvida. No entanto, os aplicativos da Web em tempo real apresentam problemas exclusivos de escalabilidade. Por exemplo, para poder dimensionar horizontalmente um aplicativo Web de mensagens que usa WebSockets para se comunicar com seus clientes, ele precisará sincronizar de alguma forma todos os seus nós de servidor. Se o aplicativo não foi construído com isso em mente, dimensioná-lo horizontalmente pode não ser uma opção fácil.

Neste artigo, vamos percorrer a arquitetura de um aplicativo web simples de compartilhamento de imagens e mensagens em tempo real. Aqui, focaremos nos vários componentes, como o Redis Pub/Sub, envolvidos na criação de um aplicativo em tempo real e veremos como todos eles desempenham seu papel na arquitetura geral.

Em tempo real com o Redis Pub/Sub

Em tempo real com o Redis Pub/Sub
Tweet

Em termos de funcionalidade, o aplicativo é muito leve. Permite o upload de imagens e comentários em tempo real sobre essas imagens. Além disso, qualquer usuário pode tocar na imagem e outros usuários poderão ver um efeito de ondulação em sua tela.

Todo o código-fonte deste aplicativo está disponível no GitHub.

Coisas que precisamos

Ir

Usaremos a linguagem de programação Go. Não há nenhuma razão especial para escolhermos Go para este artigo, além de que a sintaxe do Go é limpa e sua semântica é mais fácil de seguir. E depois há, é claro, o viés do autor. No entanto, todos os conceitos discutidos neste artigo podem ser facilmente traduzidos para o idioma de sua escolha.

Começar a usar o Go é fácil. Sua distribuição binária pode ser baixada do site oficial. Caso você esteja no Windows, há um instalador MSI para Go em sua página de download. Ou, caso seu sistema operacional (felizmente) ofereça um gerenciador de pacotes:

Arco Linux:

 pacman -S go

Ubuntu:

 apt-get install golang

Mac OS X:

 brew install go

Este só funcionará se tivermos o Homebrew instalado.

MongoDB

Por que usar o MongoDB se temos Redis, você pergunta? Como mencionado anteriormente, o Redis é um armazenamento de dados na memória. Embora possa persistir dados em disco, usar o Redis para essa finalidade provavelmente não é o melhor caminho a seguir. Usaremos o MongoDB para armazenar metadados e mensagens de imagens carregadas.

Podemos baixar o MongoDB de seu site oficial. Em algumas distribuições Linux, essa é a maneira preferida de instalar o MongoDB. Ele ainda deve ser instalável usando o gerenciador de pacotes da maioria das distribuições.

Arco Linux:

 pacman -S mongodb

Ubuntu:

 apt-get install mongodb

Mac OS X:

 brew install mongodb

Dentro do nosso código Go, usaremos o pacote mgo (pronuncia-se mango). Além de ser testado em batalha, o pacote de drivers oferece uma API realmente limpa e simples.

Se você não é um especialista em MongoDB, não se preocupe. O uso desse serviço de banco de dados é mínimo em nosso aplicativo de amostra e é quase irrelevante para o foco deste artigo: arquitetura Pub/Sub.

Amazon S3

Usaremos o Amazon S3 para armazenar as imagens carregadas pelo usuário. Não há muito o que fazer aqui, exceto certificar-se de que temos uma conta pronta para Amazon Web Services e um bucket temporário criado.

Armazenar os arquivos carregados no disco local não é uma opção porque não queremos confiar na identidade de nossos nós da web de forma alguma. Queremos que os usuários possam se conectar a qualquer um dos nós da web disponíveis e ainda possam ver o mesmo conteúdo.

Para interagir com o bucket do Amazon S3 a partir do nosso código Go, usaremos AdRoll/goamz, um fork do pacote goamz da Canonical com algumas diferenças.

Redis

Por último, mas não menos importante: Redis. Podemos instalá-lo usando o gerenciador de pacotes da nossa distribuição:

Arco Linux:

 pacman -S redis

Ubuntu:

 apt-get install redis-server

Mac OS X:

 brew install redis

Ou busque seu código-fonte e compile você mesmo. O Redis não tem dependências além do GCC e libc para construí-lo:

 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

Depois que o Redis estiver instalado e em execução, inicie um terminal e entre na CLI do Redis:

 redis-cli

Tente digitar os seguintes comandos e veja se você obtém a saída esperada:

 SET answer 41 INCR answer GET answer

O primeiro comando armazena “41” na chave “answer”, o segundo comando incrementa o valor, o terceiro comando imprime o valor armazenado na chave fornecida. O resultado deve ser “42”.

Você pode saber mais sobre todos os comandos que o Redis suporta em seu site oficial.

Usaremos o pacote Go redigo para conectar ao Redis de dentro do código do nosso aplicativo.

Dê uma olhada no Redis Pub/Sub

O padrão de publicação-assinatura é uma maneira de passar mensagens para um número arbitrário de remetentes. Os remetentes dessas mensagens (editores) não identificam explicitamente os destinatários visados. Em vez disso, as mensagens são enviadas em um canal no qual qualquer número de destinatários (assinantes) pode estar esperando por elas.

Configuração simples de publicação-assinatura

No nosso caso, podemos ter qualquer número de nós da web rodando atrás de um balanceador de carga. A qualquer momento, dois usuários olhando para a mesma imagem podem não estar conectados ao mesmo nó. É aqui que o Redis Pub/Sub entra em ação. Sempre que um nó da Web precisar observar uma alteração (por exemplo, uma nova mensagem é criada pelo usuário), ele usará o Redis Pub/Sub para transmitir essas informações para todos os nós da Web relevantes. Que, por sua vez, propagará a informação para os clientes relevantes para que eles possam buscar a lista atualizada de messagesredis.

Como o padrão de publicação-assinatura nos permite despachar mensagens em canais nomeados, podemos ter cada nó da Web conectado ao Redis e inscrito apenas nos canais nos quais seus usuários conectados estão interessados. Por exemplo, se dois usuários estiverem olhando para o mesma imagem, mas estão conectados a dois nós da web diferentes de muitos nós da web, então apenas esses dois nós da web precisam se inscrever no canal correspondente. Qualquer mensagem publicada nesse canal será entregue apenas a esses dois nós da web.

Parece bom demais para ser verdade? Podemos experimentá-lo usando a CLI do Redis. Inicie três instâncias de redis-cli . Execute o seguinte comando na primeira instância:

 SUBSCRIBE somechannel

Execute o seguinte comando na segunda instância da CLI do Redis:

 SUBSCRIBE someotherchannel

Execute os seguintes comandos na terceira instância do Redis CLI:

 PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

Observe como a primeira instância recebeu “lorem” mas não “ipsum”, e como a segunda instância recebeu “ipsum” mas não “lorem”.

Redis Pub/Sub em ação

Vale ressaltar que, uma vez que um cliente Redis entra no modo de assinante, ele não pode mais realizar nenhuma operação além de assinar mais canais ou cancelar a assinatura dos assinantes. Isso significa que cada web node precisará manter duas conexões com o Redis, uma para se conectar ao Redis como assinante e outra para publicar mensagens nos canais para que qualquer web node inscrito nesses canais possa recebê-las.

Em tempo real e escalável

Antes de começarmos a explorar o que está acontecendo nos bastidores, vamos clonar o repositório:

 mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

… e compile:

 go build ./cmd/tonesad

Para executar o aplicativo, primeiro crie um arquivo chamado .env (de preferência copiando o arquivo env-sample.txt):

 cp env-sample.txt .env

Preencha o arquivo .env com todas as variáveis ​​de ambiente necessárias:

 MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

Por fim, execute o binário construído:

 PORT=9091 ./tonesad -env-file=.env

O nó da web agora deve estar em execução e acessível via http://localhost:9091.

Exemplo ao vivo

Para testar se ainda funciona quando dimensionado horizontalmente, você pode ativar vários nós da Web iniciando-o com diferentes números de porta:

 PORT=9092 ./tonesad -env-file=.env
 PORT=9093 ./tonesad -env-file=.env

… e acessando-os por meio de seus URLs correspondentes: http://localhost:9092 e http://localhost:9093.

Exemplo ao vivo

Por trás das cenas

Em vez de passar por todas as etapas do desenvolvimento do aplicativo, vamos nos concentrar em algumas das partes mais importantes. Embora nem todos sejam 100% relevantes para o Redis Pub/Sub e suas implicações em tempo real, eles ainda são relevantes para a estrutura geral do aplicativo e facilitarão o acompanhamento quando nos aprofundarmos.

Para manter as coisas simples, não vamos nos preocupar com a autenticação do usuário. Os uploads serão anônimos e estarão disponíveis para todos que souberem a URL. Todos os espectadores podem enviar mensagens e terão a capacidade de escolher seu próprio alias. Adaptar o mecanismo de autenticação adequado e os recursos de privacidade deve ser trivial e está além do escopo deste artigo.

Dados persistentes

Este é fácil.

Sempre que um usuário carrega uma imagem, nós a armazenamos no Amazon S3 e, em seguida, armazenamos o caminho para ela no MongoDB em relação a dois IDs: um BSON Object ID (o favorito do MongoDB) e outro ID curto de 8 caracteres (um pouco agradável aos olhos). Isso vai para a coleção de “uploads” do nosso banco de dados e tem uma estrutura assim:

 type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }

O campo Tipo é usado para indicar o tipo de mídia que este “upload” contém. Isso significa que oferecemos suporte a outras mídias além de imagens? Infelizmente não. Mas o campo foi deixado lá para servir de lembrete de que não estamos necessariamente limitados a imagens aqui.

À medida que os usuários enviam mensagens uns aos outros, eles são armazenados em uma coleção diferente. Sim, você adivinhou: “mensagens”.

 type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

A única parte interessante aqui é o campo UploadID, que é usado para associar mensagens a um upload específico.

Pontos de extremidade da API

Esse aplicativo tem essencialmente três endpoints.

POST /api/uploads

O manipulador para este endpoint espera um envio “multipart/form-data” com a imagem no campo “file”. O comportamento do manipulador é aproximadamente o seguinte:

 func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go requer que todos os erros sejam tratados explicitamente. Isso foi feito no protótipo, mas foi omitido nos trechos deste artigo para manter o foco nas partes críticas.

No manipulador desse endpoint da API, estamos essencialmente lendo o arquivo, mas limitando seu tamanho a um valor específico. Se o upload exceder esse valor, a solicitação será rejeitada. Caso contrário, um BSON ID é gerado e usado para fazer upload da imagem no Amazon S3 antes de persistir a entidade de upload no MongoDB.

Há um pró e um contra na maneira como os IDs de objeto BSON são gerados. Eles são gerados no cliente final. No entanto, a estratégia usada para gerar o ID do objeto torna a probabilidade de colisão tão minúscula que é seguro gerá-los no lado do cliente. Por outro lado, os valores de IDs de objeto gerados geralmente são sequenciais e isso é algo que o Amazon S3 não gosta muito. Uma solução fácil para isso é prefixar o nome do arquivo com uma string aleatória.

GET /api/uploads/{id}/messages

Essa API é usada para buscar mensagens recentes e mensagens que foram postadas após um determinado horário.

 func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

Quando o navegador de um usuário é notificado sobre uma nova mensagem em um upload que o usuário está visualizando no momento, ele busca as novas mensagens usando esse ponto de extremidade.

POST /api/uploads/{id}/messages

E finalmente o handler que cria mensagens e notifica a todos:

 func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

Este manipulador é tão parecido com os outros que é quase chato incluí-lo aqui. Ou é? Observe como há uma chamada de função hub.Emit() bem no final da função. O que é hub você diz? É aí que toda a mágica do Pub/Sub acontece.

Hub: onde os WebSockets encontram o Redis

Hub é onde colamos WebSockets com os canais Pub/Sub do Redis. E, coincidentemente, o pacote que estamos usando para lidar com WebSockets em nossos servidores web é chamado de cola.

O Hub mantém essencialmente algumas estruturas de dados que criam um mapeamento entre todos os WebSockets conectados para todos os canais nos quais eles estão interessados. Por exemplo, um WebSocket na guia do navegador do usuário apontado para uma determinada imagem carregada deve naturalmente estar interessado em todas as notificações relevantes para isso.

O pacote hub implementa seis funções:

  • Se inscrever
  • Cancelar inscrição
  • Emitir
  • Emitir Local
  • InitHub
  • HandleSocket

Assinar e Cancelar Todas

 func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

Esta função, assim como a maioria das outras neste pacote, mantém um bloqueio em um mutex de leitura/gravação enquanto está sendo executado. Isso é para que possamos modificar com segurança as variáveis ​​de estruturas de dados primitivas sockets e tópicos . A primeira variável, sockets , mapeia sockets para nomes de canais, enquanto a segunda, topics , mapeia nomes de canais para sockets. Nesta função construímos esses mapeamentos. Sempre que vemos socket subscribe para um novo nome de canal, fazemos nossa conexão Redis, subconn , assinamos esse canal no Redis usando subconn.Subscribe . Isso faz com que o Redis encaminhe todas as notificações nesse canal para este nó da web.

E, da mesma forma, na função UnsubscribeAll , desmontamos o mapeamento:

 func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

Quando removemos o último soquete da estrutura de dados interessado em um canal específico, cancelamos a assinatura do canal no Redis usando subconn.Unsubscribe .

Emitir

 func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

Essa função publica uma mensagem m no canal t usando a conexão de publicação com o Redis.

Emitir Local

 func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

InitHub

 func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }

Na função InitHub , estamos criando duas conexões com o Redis: uma para assinar os canais que este web node está interessado e outra para publicar mensagens. Uma vez que as conexões são estabelecidas, iniciamos uma nova rotina Go com um loop rodando eternamente esperando para receber mensagens através da conexão do assinante com o Redis. Toda vez que recebe uma mensagem, ele a emite localmente (ou seja, para todos os WebSockets conectados a este web node).

HandleSocket

E, finalmente, HandleSocket é onde esperamos que as mensagens cheguem através do WebSockets ou sejam limpas após o fechamento da conexão:

 func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }

JavaScript de front-end

Como a cola vem com sua própria biblioteca JavaScript de front-end, é muito mais fácil lidar com WebSockets (ou fallback para pesquisa XHR quando WebSockets não estão disponíveis):

 var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

Do lado do cliente, estamos atentos a qualquer mensagem recebida pelo WebSocket. Como a cola transmite todas as mensagens como strings, codificamos todas as informações nela usando padrões específicos:

  • Nova mensagem: “message:{messageID}”
  • Clique na imagem: “touch:{coordX},{coordY}”, onde coordX e coordY são a coordenada baseada em porcentagem do local de clique do usuário na imagem

Quando o usuário cria uma nova mensagem, usamos a API “POST /api/uploads/{uploadID}/messages” para criar uma nova mensagem. Isso é feito usando o método create na coleção de backbone para mensagens:

 messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

Quando o usuário clica na imagem, calculamos a posição do clique em porcentagem da largura e altura da imagem e enviamos as informações diretamente pelo WebSocket.

 socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

A visão geral

Visão geral da arquitetura do aplicativo

Quando o usuário digita uma mensagem e pressiona a tecla Enter, o cliente invoca o endpoint da API “POST /api/uploads/{id}/messages”. Isso, por sua vez, cria uma entidade de mensagem no banco de dados e publica uma string “message:{messageID}” via Redis Pub/Sub no canal “upload:{uploadID}” por meio do pacote do hub.

O Redis encaminha essa string para cada nó da web (assinante) interessado no canal “upload:{uploadID}”. Os nós da Web que recebem essa string iteram em todos os WebSockets relevantes para o canal e enviam a string para o cliente por meio de suas conexões WebSocket. Os clientes que recebem esta string começam a buscar novas mensagens do servidor usando o “GET /api/uploads/{id}/messages”.

Da mesma forma, para propagar eventos de clique na imagem, o cliente envia diretamente uma mensagem através do WebSocket que se parece com “touch upload:{uploadID} {coordX} {coordY}”. Esta mensagem acaba no pacote do hub onde é publicada no mesmo canal “upload:{uploadID}”. Como resultado, a string é distribuída para todos os usuários que observam a imagem carregada. O cliente, ao receber essa string, a analisa para extrair as coordenadas e renderiza um círculo crescente e desvanecido para destacar momentaneamente o local do clique.

Embrulhar

Neste artigo, vimos um vislumbre de como o padrão de publicação-assinatura pode ajudar a resolver o problema de dimensionar aplicativos da Web em tempo real em grande medida e com relativa facilidade.

O aplicativo de amostra existe para servir como um playground para experimentar o Redis Pub/Sub. Mas, como mencionado anteriormente, as ideias podem ser implementadas em quase qualquer outra linguagem de programação popular.