Andare in tempo reale con Redis Pub/Sub
Pubblicato: 2022-03-11Il ridimensionamento di un'app Web è quasi sempre una sfida interessante, indipendentemente dalla complessità coinvolta. Tuttavia, le app Web in tempo reale pongono problemi di scalabilità unici. Ad esempio, per poter ridimensionare orizzontalmente un'app Web di messaggistica che utilizza WebSocket per comunicare con i suoi client, dovrà sincronizzare in qualche modo tutti i suoi nodi server. Se l'app non è stata creata con questo in mente, ridimensionarla orizzontalmente potrebbe non essere un'opzione facile.
In questo articolo, illustreremo l'architettura di una semplice app Web di messaggistica e condivisione di immagini in tempo reale. Qui, ci concentreremo sui vari componenti, come Redis Pub/Sub, coinvolti nella creazione di un'app in tempo reale e vedremo come svolgono tutti il loro ruolo nell'architettura generale.
Per quanto riguarda la funzionalità, l'applicazione è molto leggera. Consente il caricamento di immagini e commenti in tempo reale su tali immagini. Inoltre, qualsiasi utente può toccare l'immagine e gli altri utenti potranno vedere un effetto increspato sul proprio schermo.
L'intero codice sorgente di questa app è disponibile su GitHub.
Cose di cui abbiamo bisogno
andare
Useremo il linguaggio di programmazione Go. Non c'è alcun motivo speciale per cui abbiamo scelto Go per questo articolo, a parte il fatto che la sintassi di Go è pulita e la sua semantica è più facile da seguire. E poi c'è ovviamente il pregiudizio dell'autore. Tuttavia, tutti i concetti discussi in questo articolo possono essere facilmente tradotti nella lingua di tua scelta.
Iniziare con Go è facile. La sua distribuzione binaria può essere scaricata dal sito ufficiale. Nel caso in cui tu sia su Windows, c'è un programma di installazione MSI per Go nella loro pagina di download. Oppure, nel caso in cui il tuo sistema operativo (fortunatamente) offra un gestore di pacchetti:
Arch Linux:
pacman -S go
Ubuntu:
apt-get install golang
Mac OS X:
brew install go
Questo funzionerà solo se abbiamo installato Homebrew.
MongoDB
Perché usare MongoDB se abbiamo Redis, chiedi? Come accennato in precedenza, Redis è un archivio dati in memoria. Sebbene possa persistere i dati su disco, l'utilizzo di Redis a tale scopo probabilmente non è il modo migliore per procedere. Utilizzeremo MongoDB per archiviare i metadati e i messaggi delle immagini caricati.
Possiamo scaricare MongoDB dal loro sito ufficiale. In alcune distribuzioni Linux, questo è il modo preferito per installare MongoDB. Dovrebbe comunque essere ancora installabile utilizzando il gestore di pacchetti della maggior parte della distribuzione.
Arch Linux:
pacman -S mongodb
Ubuntu:
apt-get install mongodb
Mac OS X:
brew install mongodb
All'interno del nostro codice Go, utilizzeremo il pacchetto mgo (pronunciato mango). Non solo è testato in battaglia, il pacchetto driver offre un'API davvero pulita e semplice.
Se non sei un esperto di MongoDB, non preoccuparti affatto. L'uso di questo servizio di database è minimo nella nostra app di esempio ed è quasi irrilevante per il focus di questo articolo: architettura Pub/Sub.
Amazon S3
Utilizzeremo Amazon S3 per archiviare le immagini caricate dall'utente. Non c'è molto da fare qui, tranne assicurarsi di avere un account Amazon Web Services pronto e un bucket temporaneo creato.
La memorizzazione dei file caricati sul disco locale non è un'opzione perché non vogliamo fare affidamento sull'identità dei nostri nodi Web in alcun modo. Vogliamo che gli utenti siano in grado di connettersi a uno qualsiasi dei nodi Web disponibili e possano comunque vedere lo stesso contenuto.
Per interagire con il bucket Amazon S3 dal nostro codice Go, utilizzeremo AdRoll/goamz, un fork del pacchetto goamz di Canonical con alcune differenze.
Redis
Ultimo, ma non meno importante: Redis. Possiamo installarlo usando il gestore di pacchetti della nostra distribuzione:
Arch Linux:
pacman -S redis
Ubuntu:
apt-get install redis-server
Mac OS X:
brew install redis
Oppure, prendi il suo codice sorgente e compilalo tu stesso. Redis non ha dipendenze diverse da GCC e libc per compilarlo:
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
Una volta che Redis è installato e funzionante, avvia un terminale e accedi alla CLI di Redis:
redis-cli
Prova a inserire i seguenti comandi e vedi se ottieni l'output previsto:
SET answer 41 INCR answer GET answer
Il primo comando memorizza “41” rispetto alla chiave “risposta”, il secondo comando incrementa il valore, il terzo comando stampa il valore memorizzato rispetto alla chiave data. Il risultato dovrebbe essere "42".
Puoi saperne di più su tutti i comandi supportati da Redis sul loro sito Web ufficiale.
Utilizzeremo il pacchetto Go redigo per connetterci a Redis dall'interno del nostro codice dell'app.
Dai un'occhiata a Redis Pub/Sub
Il modello publish-subscribe è un modo per passare messaggi a un numero arbitrario di mittenti. I mittenti di questi messaggi (editori) non identificano esplicitamente i destinatari di destinazione. Invece, i messaggi vengono inviati su un canale su cui un numero qualsiasi di destinatari (abbonati) può attenderli.
Nel nostro caso, possiamo avere un numero qualsiasi di nodi Web in esecuzione dietro un sistema di bilanciamento del carico. In un dato momento, due utenti che guardano la stessa immagine potrebbero non essere collegati allo stesso nodo. È qui che entra in gioco Redis Pub/Sub. Ogni volta che un nodo web ha bisogno di osservare una modifica (ad esempio un nuovo messaggio viene creato dall'utente), utilizzerà Redis Pub/Sub per trasmettere tali informazioni a tutti i nodi web rilevanti. Che, a sua volta, propagherà le informazioni ai client interessati in modo che possano recuperare l'elenco aggiornato dei messaggi redis.
Poiché il modello di pubblicazione-sottoscrizione ci consente di inviare messaggi su canali con nome, possiamo avere ogni nodo Web connesso a Redis e iscriversi solo a quei canali a cui i loro utenti connessi sono interessati. Ad esempio, se due utenti stanno entrambi guardando il stessa immagine ma sono collegati a due nodi Web diversi su molti nodi Web, solo questi due nodi Web devono iscriversi al canale corrispondente. Qualsiasi messaggio pubblicato su quel canale verrà consegnato solo a quei due nodi web.
Sembra troppo bello per essere vero? Possiamo provarlo utilizzando la CLI di Redis. Avvia tre istanze di redis-cli
. Eseguire il comando seguente in prima istanza:
SUBSCRIBE somechannel
Eseguire il comando seguente nella seconda istanza della CLI Redis:
SUBSCRIBE someotherchannel
Eseguire i seguenti comandi nella terza istanza di Redis CLI:
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum
Si noti come il primo grado abbia ricevuto "lorem" ma non "ipsum" e come il secondo grado abbia ricevuto "ipsum" ma non "lorem".
Vale la pena ricordare che una volta che un client Redis entra in modalità abbonato, non può più eseguire alcuna operazione se non quella di iscriversi a più canali o disiscriversi da quelli iscritti. Ciò significa che ogni nodo web dovrà mantenere due connessioni a Redis, una per connettersi a Redis come abbonato e l'altra per pubblicare messaggi sui canali in modo che qualsiasi nodo web sottoscritto a quei canali possa riceverli.
In tempo reale e scalabile
Prima di iniziare a esplorare cosa sta succedendo dietro le quinte, cloniamo il repository:
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...
… e compilalo:
go build ./cmd/tonesad
Per eseguire l'app, creare prima di tutto un file denominato .env (preferibilmente copiando il file env-sample.txt):
cp env-sample.txt .env
Compila il file .env con tutte le variabili di ambiente necessarie:
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}
Infine esegui il binario compilato:
PORT=9091 ./tonesad -env-file=.env
Il nodo Web dovrebbe ora essere in esecuzione ed essere accessibile tramite http://localhost:9091.
Per verificare se funziona ancora quando ridimensionato orizzontalmente, puoi avviare più nodi Web avviandolo con numeri di porta diversi:
PORT=9092 ./tonesad -env-file=.env
PORT=9093 ./tonesad -env-file=.env
… e accedendovi tramite gli URL corrispondenti: http://localhost:9092 e http://localhost:9093.
Dietro le quinte
Invece di seguire ogni fase dello sviluppo dell'app, ci concentreremo su alcune delle parti più importanti. Sebbene non tutti questi siano rilevanti al 100% per Redis Pub/Sub e le sue implicazioni in tempo reale, sono comunque rilevanti per la struttura generale dell'app e renderanno più facile seguirli una volta che ci addentreremo più in profondità.
Per semplificare le cose, non ci preoccuperemo dell'autenticazione dell'utente. I caricamenti saranno anonimi e disponibili per tutti coloro che conoscono l'URL. Tutti gli spettatori possono inviare messaggi e avranno la possibilità di scegliere il proprio alias. L'adattamento del meccanismo di autenticazione e delle funzionalità di privacy adeguati dovrebbe essere banale e va oltre lo scopo di questo articolo.
Dati persistenti
Questo è facile.
Ogni volta che un utente carica un'immagine, la memorizziamo in Amazon S3 e quindi memorizziamo il percorso in MongoDB rispetto a due ID: un ID oggetto BSON (il preferito di MongoDB) e un altro ID corto di 8 caratteri (piuttosto piacevole per gli occhi). Questo va nella raccolta dei "caricamenti" del nostro database e ha una struttura come questa:
type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }
Il campo Tipo viene utilizzato per indicare il tipo di supporto contenuto in questo "caricamento". Questo significa che supportiamo media diversi dalle immagini? Sfortunatamente no. Ma il campo è stato lasciato lì per ricordare che non siamo necessariamente limitati alle immagini qui.
Quando gli utenti si scambiano messaggi, questi vengono archiviati in una raccolta diversa. Sì, avete indovinato: "messaggi".

type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }
L'unico bit interessante qui è il campo UploadID, che viene utilizzato per associare i messaggi a un caricamento particolare.
Endpoint API
Questa applicazione ha essenzialmente tre endpoint.
POST /api/caricamenti
Il gestore per questo endpoint prevede un invio "multipart/form-data" con l'immagine nel campo "file". Il comportamento del gestore è più o meno il seguente:
func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }
Go richiede che tutti gli errori vengano gestiti in modo esplicito. Questo è stato fatto nel prototipo, ma è stato omesso dagli snippet in questo articolo per mantenere l'attenzione sulle parti critiche.
Nel gestore di questo endpoint API, stiamo essenzialmente leggendo il file ma ne limitiamo le dimensioni a un valore specifico. Se il caricamento supera questo valore, la richiesta viene rifiutata. In caso contrario, viene generato un ID BSON e utilizzato per caricare l'immagine su Amazon S3 prima di rendere persistente l'entità di caricamento su MongoDB.
C'è un pro e un contro nel modo in cui vengono generati gli ID oggetto BSON. Sono generati sul lato client. Tuttavia, la strategia utilizzata per generare l'ID oggetto rende la probabilità di collisione così minima che è sicuro generarli sul lato client. D'altra parte, i valori degli ID oggetto generati sono generalmente sequenziali e questo è qualcosa che Amazon S3 non ama. Una soluzione semplice a questo è anteporre al nome del file una stringa casuale.
OTTIENI /api/uploads/{id}/messages
Questa API viene utilizzata per recuperare i messaggi recenti e i messaggi che sono stati pubblicati dopo un determinato periodo di tempo.
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }
Quando il browser di un utente riceve una notifica di un nuovo messaggio su un caricamento che l'utente sta attualmente guardando, recupera i nuovi messaggi utilizzando questo endpoint.
POST /api/uploads/{id}/messages
E infine il gestore che crea messaggi e notifica a tutti:
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }
Questo gestore è così simile agli altri che è quasi noioso includerlo anche qui. O è? Nota come c'è una chiamata di funzione hub.Emit() alla fine della funzione. Qual è l'hub dici? È qui che accade tutta la magia Pub/Sub.
Hub: dove WebSockets incontra Redis
Hub è il punto in cui incolliamo WebSocket con i canali Pub/Sub di Redis. E, guarda caso, il pacchetto che stiamo usando per gestire i WebSocket all'interno dei nostri server web si chiama colla.
Hub mantiene essenzialmente alcune strutture di dati che creano una mappatura tra tutti i WebSocket collegati a tutti i canali a cui sono interessati. Ad esempio, un WebSocket nella scheda del browser dell'utente puntato a una particolare immagine caricata dovrebbe naturalmente essere interessato a tutte le notifiche pertinenti ad esso.
Il pacchetto hub implementa sei funzioni:
- sottoscrivi
- Annulla iscrizioneTutto
- Emettere
- EmitLocal
- InitHub
- Presa per maniglia
Iscriviti e cancellatiTutto
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }
Questa funzione, proprio come la maggior parte delle altre in questo pacchetto, mantiene un blocco su un mutex di lettura/scrittura durante l'esecuzione. Questo è così che possiamo modificare in sicurezza le primitive strutture dati variabili socket e argomenti . La prima variabile, sockets , associa i socket ai nomi dei canali, mentre la seconda, topics , associa i nomi dei canali ai socket. In questa funzione costruiamo queste mappature. Ogni volta che vediamo socket iscriversi a un nuovo nome di canale, effettuiamo la nostra connessione Redis, subconn , iscriviamo a quel canale su Redis usando subconn.Subscribe . Questo fa sì che Redis inoltri tutte le notifiche su quel canale a questo nodo web.
E, allo stesso modo, nella funzione UnsubscribeAll , abbattiamo la mappatura:
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }
Quando rimuoviamo l'ultimo socket dalla struttura dati interessata a un particolare canale, annulliamo l'iscrizione al canale in Redis usando subconn.Unsubscribe .
Emettere
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }
Questa funzione pubblica un messaggio m sul canale t utilizzando la connessione di pubblicazione a Redis.
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }
InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }
Nella funzione InitHub , stiamo creando due connessioni a Redis: una per iscriversi ai canali a cui questo nodo web è interessato e l'altra per pubblicare messaggi. Una volta stabilite le connessioni, iniziamo una nuova routine Go con un ciclo in esecuzione per sempre in attesa di ricevere messaggi attraverso la connessione dell'abbonato a Redis. Ogni volta che riceve un messaggio, lo emette localmente (cioè a tutti i WebSocket collegati a questo nodo web).
Presa per maniglia
E infine, HandleSocket è il punto in cui attendiamo che i messaggi arrivino tramite WebSocket o puliscano dopo la chiusura della connessione:
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }
JavaScript front-end
Poiché la colla viene fornita con la propria libreria JavaScript front-end, è molto più semplice gestire i WebSocket (o eseguire il fallback al polling XHR quando i WebSocket non sono disponibili):
var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)
Sul lato client, stiamo ascoltando qualsiasi messaggio in arrivo tramite WebSocket. Poiché la colla trasmette tutti i messaggi come stringhe, codifichiamo tutte le informazioni in essa contenute utilizzando schemi specifici:
- Nuovo messaggio: "messaggio:{ID messaggio}"
- Fare clic sull'immagine: "touch:{coordX},{coordY}", dove coordX e coordY sono le coordinate in base alla percentuale della posizione del clic dell'utente sull'immagine
Quando l'utente crea un nuovo messaggio, utilizziamo l'API "POST /api/uploads/{uploadID}/messages" per creare un nuovo messaggio. Questo viene fatto usando il metodo create sulla raccolta backbone per i messaggi:
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })
Quando l'utente fa clic sull'immagine, calcoliamo la posizione del clic in percentuale della larghezza e dell'altezza dell'immagine e inviamo le informazioni direttamente tramite WebSocket.
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())
La panoramica
Quando l'utente digita un messaggio e preme il tasto Invio, il client richiama l'endpoint API "POST /api/uploads/{id}/messages". Questo a sua volta crea un'entità messaggio nel database e pubblica una stringa "message:{messageID}" tramite Redis Pub/Sub sul canale "upload:{uploadID}" tramite il pacchetto hub.
Redis inoltra questa stringa ad ogni nodo web (abbonato) interessato al canale “upload:{uploadID}”. I nodi Web che ricevono questa stringa eseguono l'iterazione su tutti i WebSocket rilevanti per il canale e inviano la stringa al client tramite le loro connessioni WebSocket. I client che ricevono questa stringa iniziano a recuperare nuovi messaggi dal server utilizzando "GET /api/uploads/{id}/messages".
Allo stesso modo, per propagare eventi di clic sull'immagine, il client invia direttamente un messaggio tramite WebSocket che assomiglia a "touch upload:{uploadID} {coordX} {coordY}". Questo messaggio finisce nel pacchetto hub dove viene pubblicato sullo stesso canale "upload:{uploadID}". Di conseguenza, la stringa viene distribuita a tutti gli utenti che guardano l'immagine caricata. Il client, dopo aver ricevuto questa stringa, la analizza per estrarre le coordinate e crea un cerchio in dissolvenza crescente per evidenziare momentaneamente la posizione del clic.
Incartare
In questo articolo abbiamo visto un assaggio di come il modello publish-subscribe può aiutare a risolvere il problema del ridimensionamento delle app Web in tempo reale in larga misura e con relativa facilità.
L'app di esempio esiste per fungere da playground per la sperimentazione con Redis Pub/Sub. Ma, come accennato in precedenza, le idee possono essere implementate in quasi tutti gli altri linguaggi di programmazione popolari.