Passer au temps réel avec Redis Pub/Sub
Publié: 2022-03-11La mise à l'échelle d'une application Web est presque toujours un défi intéressant, quelle que soit la complexité impliquée. Cependant, les applications Web en temps réel posent des problèmes d'évolutivité uniques. Par exemple, pour pouvoir mettre à l'échelle horizontalement une application Web de messagerie qui utilise WebSockets pour communiquer avec ses clients, elle devra en quelque sorte synchroniser tous ses nœuds de serveur. Si l'application n'a pas été conçue dans cet esprit, sa mise à l'échelle horizontale peut ne pas être une option facile.
Dans cet article, nous allons parcourir l'architecture d'une simple application Web de partage d'images et de messagerie en temps réel. Ici, nous nous concentrerons sur les différents composants, tels que Redis Pub/Sub, impliqués dans la création d'une application en temps réel et verrons comment ils jouent tous leur rôle dans l'architecture globale.
Côté fonctionnalité, l'application est très légère. Il permet le téléchargement d'images et de commentaires en temps réel sur ces images. De plus, n'importe quel utilisateur peut appuyer sur l'image et les autres utilisateurs pourront voir un effet d'entraînement sur leur écran.
L'intégralité du code source de cette application est disponible sur GitHub.
Les choses dont nous avons besoin
Va
Nous utiliserons le langage de programmation Go. Il n'y a pas de raison particulière pour laquelle nous choisissons Go pour cet article, à part le fait que la syntaxe de Go est propre et que sa sémantique est plus facile à suivre. Et puis il y a bien sûr le parti pris de l'auteur. Cependant, tous les concepts abordés dans cet article peuvent facilement être traduits dans la langue de votre choix.
Démarrer avec Go est facile. Sa distribution binaire peut être téléchargée depuis le site officiel. Si vous êtes sous Windows, il existe un programme d'installation MSI pour Go sur leur page de téléchargement. Ou, au cas où votre système d'exploitation propose (heureusement) un gestionnaire de paquets :
ArchLinux :
pacman -S go
Ubuntu :
apt-get install golang
Mac OS X:
brew install go
Celui-ci ne fonctionnera que si Homebrew est installé.
MongoDB
Pourquoi utiliser MongoDB si nous avons Redis, demandez-vous ? Comme mentionné précédemment, Redis est un magasin de données en mémoire. Bien qu'il puisse conserver les données sur le disque, l'utilisation de Redis à cette fin n'est probablement pas la meilleure solution. Nous utiliserons MongoDB pour stocker les métadonnées et les messages des images téléchargées.
Nous pouvons télécharger MongoDB depuis leur site officiel. Dans certaines distributions Linux, c'est la méthode préférée d'installation de MongoDB. Il devrait néanmoins être installable à l'aide du gestionnaire de packages de la plupart des distributions.
ArchLinux :
pacman -S mongodb
Ubuntu :
apt-get install mongodb
Mac OS X:
brew install mongodb
Dans notre code Go, nous utiliserons le package mgo (prononcez mangue). Non seulement il est testé au combat, mais le package de pilotes offre une API vraiment propre et simple.
Si vous n'êtes pas un expert de MongoDB, ne vous inquiétez pas du tout. L'utilisation de ce service de base de données est minime dans notre exemple d'application et n'a pratiquement aucun rapport avec l'objet de cet article : l'architecture Pub/Sub.
AmazonS3
Nous utiliserons Amazon S3 pour stocker les images téléchargées par l'utilisateur. Il n'y a pas grand-chose à faire ici, sauf s'assurer que nous avons un compte Amazon Web Services prêt et qu'un compartiment temporaire a été créé.
Stocker les fichiers téléchargés sur le disque local n'est pas une option car nous ne voulons en aucun cas nous fier à l'identité de nos nœuds Web. Nous voulons que les utilisateurs puissent se connecter à n'importe lequel des nœuds Web disponibles et puissent toujours voir le même contenu.
Pour interagir avec le compartiment Amazon S3 à partir de notre code Go, nous utiliserons AdRoll/goamz, un fork du package goamz de Canonical avec quelques différences.
Redis
Dernier point, mais non le moindre : Redis. Nous pouvons l'installer à l'aide du gestionnaire de packages de notre distribution :
ArchLinux :
pacman -S redis
Ubuntu :
apt-get install redis-server
Mac OS X:
brew install redis
Ou récupérez son code source et compilez-le vous-même. Redis n'a pas de dépendances autres que GCC et libc pour le construire :
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
Une fois que Redis est installé et en cours d'exécution, démarrez un terminal et entrez la CLI de Redis :
redis-cli
Essayez d'entrer les commandes suivantes et voyez si vous obtenez le résultat attendu :
SET answer 41 INCR answer GET answer
La première commande stocke "41" contre la clé "réponse", la deuxième commande incrémente la valeur, la troisième commande imprime la valeur stockée contre la clé donnée. Le résultat devrait être "42".
Vous pouvez en savoir plus sur toutes les commandes prises en charge par Redis sur leur site officiel.
Nous utiliserons le package Go redigo pour nous connecter à Redis à partir de notre code d'application.
Coup d'œil sur Redis Pub/Sub
Le modèle de publication-abonnement est un moyen de transmettre des messages à un nombre arbitraire d'expéditeurs. Les expéditeurs de ces messages (éditeurs) n'identifient pas explicitement les destinataires ciblés. Au lieu de cela, les messages sont envoyés sur un canal sur lequel n'importe quel nombre de destinataires (abonnés) peuvent les attendre.
Dans notre cas, nous pouvons avoir n'importe quel nombre de nœuds Web exécutés derrière un équilibreur de charge. A un instant donné, deux utilisateurs regardant la même image peuvent ne pas être connectés au même nœud. C'est là que Redis Pub/Sub entre en jeu. Chaque fois qu'un nœud Web doit observer un changement (par exemple, un nouveau message est créé par l'utilisateur), il utilisera Redis Pub/Sub pour diffuser cette information à tous les nœuds Web concernés. Ce qui, à son tour, propagera les informations aux clients concernés afin qu'ils puissent récupérer la liste mise à jour des messagesredis.
Étant donné que le modèle de publication-abonnement nous permet d'envoyer des messages sur des canaux nommés, nous pouvons avoir chaque nœud Web connecté à Redis et s'abonner uniquement aux canaux qui intéressent leurs utilisateurs connectés. Par exemple, si deux utilisateurs regardent tous les deux le même image mais sont connectés à deux nœuds Web différents parmi de nombreux nœuds Web, seuls ces deux nœuds Web doivent s'abonner au canal correspondant. Tout message publié sur ce canal sera livré à ces deux nœuds Web uniquement.
Cela semble trop beau pour être vrai ? Nous pouvons l'essayer en utilisant la CLI de Redis. Démarrez trois instances de redis-cli
. Exécutez la commande suivante en premier lieu :
SUBSCRIBE somechannel
Exécutez la commande suivante dans la deuxième instance de CLI Redis :
SUBSCRIBE someotherchannel
Exécutez les commandes suivantes dans la troisième instance de Redis CLI :
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum
Remarquez comment la première instance a reçu « lorem » mais pas « ipsum », et comment la deuxième instance a reçu « ipsum » mais pas « lorem ».
Il convient de mentionner qu'une fois qu'un client Redis est entré en mode abonné, il ne peut plus effectuer aucune opération autre que s'abonner à plus de chaînes ou se désabonner de celles qui sont abonnées. Cela signifie que chaque nœud Web devra maintenir deux connexions à Redis, l'une pour se connecter à Redis en tant qu'abonné et l'autre pour publier des messages sur les canaux afin que tout nœud Web abonné à ces canaux puisse les recevoir.
Temps réel et évolutif
Avant de commencer à explorer ce qui se passe derrière la scène, clonons le référentiel :
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...
… et compilez-le :
go build ./cmd/tonesad
Pour exécuter l'application, créez tout d'abord un fichier nommé .env (de préférence en copiant le fichier env-sample.txt) :
cp env-sample.txt .env
Remplissez le fichier .env avec toutes les variables d'environnement nécessaires :
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}
Enfin, exécutez le binaire construit :
PORT=9091 ./tonesad -env-file=.env
Le nœud Web devrait maintenant fonctionner et être accessible via http://localhost:9091.
Pour tester s'il fonctionne toujours lorsqu'il est mis à l'échelle horizontalement, vous pouvez faire tourner plusieurs nœuds Web en le démarrant avec des numéros de port différents :
PORT=9092 ./tonesad -env-file=.env
PORT=9093 ./tonesad -env-file=.env
… et en y accédant via leurs URL correspondantes : http://localhost:9092 et http://localhost:9093.
Dans les coulisses
Au lieu de passer par chaque étape du développement de l'application, nous nous concentrerons sur certaines des parties les plus importantes. Bien que tous ne soient pas pertinents à 100 % pour Redis Pub/Sub et ses implications en temps réel, ils sont toujours pertinents pour la structure globale de l'application et faciliteront le suivi une fois que nous aurons approfondi.
Pour garder les choses simples, nous n'allons pas nous soucier de l'authentification des utilisateurs. Les téléchargements seront anonymes et disponibles pour tous ceux qui connaissent l'URL. Tous les téléspectateurs peuvent envoyer des messages et auront la possibilité de choisir leur propre alias. L'adaptation d'un mécanisme d'authentification et de fonctionnalités de confidentialité appropriés devrait être triviale et dépasse le cadre de cet article.
Données persistantes
Celui-ci est facile.
Chaque fois qu'un utilisateur télécharge une image, nous la stockons dans Amazon S3, puis stockons le chemin d'accès à celle-ci dans MongoDB avec deux ID : un ID d'objet BSON (le favori de MongoDB) et un autre ID court de 8 caractères (plutôt agréable à regarder). Cela va dans la collection "téléchargements" de notre base de données et a une structure comme celle-ci :
type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }
Le champ Kind est utilisé pour indiquer le type de média que contient ce « upload ». Cela signifie-t-il que nous prenons en charge d'autres médias que les images ? Malheureusement non. Mais le champ a été laissé là pour rappeler qu'ici nous ne sommes pas forcément limités aux images.

Au fur et à mesure que les utilisateurs s'envoient des messages, ils sont stockés dans une collection différente. Oui, vous l'avez deviné : « messages ».
type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }
Le seul élément intéressant ici est le champ UploadID, qui est utilisé pour associer des messages à un téléchargement particulier.
Points de terminaison de l'API
Cette application a essentiellement trois points de terminaison.
POST /api/téléchargements
Le gestionnaire de ce point de terminaison attend une soumission "multipart/form-data" avec l'image dans le champ "file". Le comportement du gestionnaire est à peu près le suivant :
func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }
Go nécessite que toutes les erreurs soient gérées explicitement. Cela a été fait dans le prototype, mais est omis des extraits de cet article pour garder l'accent sur les parties critiques.
Dans le gestionnaire de ce point de terminaison d'API, nous lisons essentiellement le fichier mais limitons sa taille à une valeur spécifique. Si le téléchargement dépasse cette valeur, la demande est rejetée. Sinon, un ID BSON est généré et utilisé pour télécharger l'image sur Amazon S3 avant de conserver l'entité de téléchargement sur MongoDB.
Il y a un pour et un contre dans la façon dont les ID d'objet BSON sont générés. Ils sont générés côté client. Cependant, la stratégie utilisée pour générer l'ID d'objet rend la probabilité de collision si minime qu'il est sûr de les générer côté client. D'un autre côté, les valeurs des ID d'objet générés sont généralement séquentielles et c'est quelque chose qu'Amazon S3 n'aime pas vraiment. Une solution simple consiste à préfixer le nom de fichier avec une chaîne aléatoire.
GET /api/uploads/{id}/messages
Cette API est utilisée pour récupérer les messages récents et les messages qui ont été publiés après un certain temps.
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }
Lorsque le navigateur d'un utilisateur est informé d'un nouveau message sur un téléchargement que l'utilisateur est en train de consulter, il récupère les nouveaux messages à l'aide de ce point de terminaison.
POST /api/uploads/{id}/messages
Et enfin le gestionnaire qui crée des messages et notifie tout le monde :
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }
Ce gestionnaire est tellement similaire aux autres qu'il est presque ennuyeux de l'inclure ici. Ou est-ce? Remarquez qu'il y a un appel de fonction hub.Emit() à la toute fin de la fonction. C'est quoi hub tu dis ? C'est là que toute la magie Pub/Sub se produit.
Hub : où WebSockets rencontre Redis
Hub est l'endroit où nous collons WebSockets avec les canaux Pub/Sub de Redis. Et, par coïncidence, le package que nous utilisons pour gérer WebSockets au sein de nos serveurs Web s'appelle glue.
Hub maintient essentiellement quelques structures de données qui créent un mappage entre tous les WebSockets connectés à tous les canaux qui les intéressent. Par exemple, un WebSocket sur l'onglet du navigateur de l'utilisateur pointé vers une image téléchargée particulière devrait naturellement être intéressé par toutes les notifications pertinentes à cela.
Le package hub implémente six fonctions :
- S'abonner
- Se désabonnerTout
- Émettre
- EmitLocal
- InitHub
- PoignéeDouille
S'abonner et se désabonnerTout
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }
Cette fonction, comme la plupart des autres dans ce paquet, détient un verrou sur un mutex en lecture/écriture pendant son exécution. Ceci afin que nous puissions modifier en toute sécurité les variables des structures de données primitives sockets et topic . La première variable, sockets , mappe les sockets aux noms de canaux, tandis que la seconde, topics , mappe les noms de canaux aux sockets. Dans cette fonction, nous construisons ces mappages. Chaque fois que nous voyons socket s'abonner à un nouveau nom de canal, nous établissons notre connexion Redis, subconn , abonnez-vous à ce canal sur Redis en utilisant subconn.Subscribe . Cela oblige Redis à transférer toutes les notifications de ce canal vers ce nœud Web.
Et, de même, dans la fonction UnsubscribeAll , nous supprimons le mappage :
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }
Lorsque nous supprimons le dernier socket de la structure de données intéressée par un canal particulier, nous nous désinscrivons du canal dans Redis en utilisant subconn.Unsubscribe .
Émettre
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }
Cette fonction publie un message m sur le canal t en utilisant la connexion de publication à Redis.
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }
InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }
Dans la fonction InitHub , nous créons deux connexions à Redis : une pour s'abonner aux canaux qui intéressent ce nœud Web et l'autre pour publier des messages. Une fois les connexions établies, nous commençons une nouvelle routine Go avec une boucle qui s'exécute en permanence en attendant de recevoir des messages via la connexion de l'abonné à Redis. Chaque fois qu'il reçoit un message, il l'émet localement (c'est-à-dire à tous les WebSockets connectés à ce nœud web).
PoignéeDouille
Et enfin, HandleSocket est l'endroit où nous attendons que les messages arrivent via WebSockets ou nettoyons après la fermeture de la connexion :
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }
JavaScript frontal
Étant donné que glue est livré avec sa propre bibliothèque JavaScript frontale, il est beaucoup plus facile de gérer les WebSockets (ou de recourir à l'interrogation XHR lorsque les WebSockets ne sont pas disponibles) :
var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)
Côté client, nous écoutons tout message entrant via WebSocket. Étant donné que glue transmet tous les messages sous forme de chaînes, nous encodons toutes les informations qu'il contient en utilisant des modèles spécifiques :
- Nouveau message : "message :{messageID}"
- Cliquez sur l'image : "touch :{coordX},{coordY}", où coordX et coordY sont les coordonnées basées sur le pourcentage de l'emplacement du clic de l'utilisateur sur l'image
Lorsque l'utilisateur crée un nouveau message, nous utilisons l'API "POST /api/uploads/{uploadID}/messages" pour créer un nouveau message. Cela se fait à l'aide de la méthode create sur la collection backbone pour les messages :
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })
Lorsque l'utilisateur clique sur l'image, nous calculons la position du clic en pourcentage de la largeur et de la hauteur de l'image et envoyons les informations directement via le WebSocket.
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())
L'aperçu
Lorsque l'utilisateur saisit un message et appuie sur la touche Entrée, le client appelle le point de terminaison de l'API « POST /api/uploads/{id}/messages ». Cela crée à son tour une entité de message dans la base de données et publie une chaîne "message :{messageID}" via Redis Pub/Sub sur le canal "upload :{uploadID}" via le package hub.
Redis transmet cette chaîne à chaque nœud Web (abonné) intéressé sur le canal "upload : {uploadID}". Les nœuds Web recevant cette chaîne parcourent tous les WebSockets pertinents pour le canal et envoient la chaîne au client via leurs connexions WebSocket. Les clients recevant cette chaîne commencent à récupérer les nouveaux messages du serveur à l'aide de « GET /api/uploads/{id}/messages ».
De même, pour propager les événements de clic sur l'image, le client envoie directement un message via le WebSocket qui ressemble à quelque chose comme "touch upload :{uploadID} {coordX} {coordY}". Ce message se retrouve dans le package hub où il est publié sur le même canal "upload :{uploadID}". En conséquence, la chaîne est distribuée à tous les utilisateurs regardant l'image téléchargée. Le client, lors de la réception de cette chaîne, l'analyse pour extraire les coordonnées et restitue un cercle croissant et décroissant pour mettre en évidence momentanément l'emplacement du clic.
Emballer
Dans cet article, nous avons vu un aperçu de la façon dont le modèle de publication-abonnement peut aider à résoudre le problème de la mise à l'échelle des applications Web en temps réel dans une large mesure et avec une relative facilité.
L'exemple d'application existe pour servir de terrain de jeu pour expérimenter Redis Pub/Sub. Mais, comme mentionné précédemment, les idées peuvent être implémentées dans presque tous les autres langages de programmation populaires.