Accesați în timp real cu Redis Pub/Sub
Publicat: 2022-03-11Scalarea unei aplicații web este aproape întotdeauna o provocare interesantă, indiferent de complexitatea implicată. Cu toate acestea, aplicațiile web în timp real prezintă probleme unice de scalabilitate. De exemplu, pentru a putea scala orizontal o aplicație web de mesagerie care utilizează WebSockets pentru a comunica cu clienții săi, va trebui să sincronizeze cumva toate nodurile sale de server. Dacă aplicația nu a fost construită având în vedere acest lucru, atunci scalarea orizontală poate să nu fie o opțiune ușoară.
În acest articol, vom parcurge arhitectura unei aplicații web simple de partajare a imaginilor și mesagerie în timp real. Aici, ne vom concentra asupra diferitelor componente, cum ar fi Redis Pub/Sub, implicate în construirea unei aplicații în timp real și vom vedea cum își joacă toate rolul în arhitectura generală.
În ceea ce privește funcționalitatea, aplicația este foarte ușoară. Permite încărcarea de imagini și comentarii în timp real la acele imagini. În plus, orice utilizator poate atinge imaginea, iar alți utilizatori vor putea vedea un efect de ondulare pe ecranul lor.
Întregul cod sursă al acestei aplicații este disponibil pe GitHub.
Lucruri de care avem nevoie
Merge
Vom folosi limbajul de programare Go. Nu există niciun motiv special pentru care alegem Go pentru acest articol, în afară de faptul că sintaxa lui Go este curată și semantica sa este mai ușor de urmărit. Și apoi există, desigur, părtinirea autorului. Cu toate acestea, toate conceptele discutate în acest articol pot fi traduse cu ușurință în limba pe care o alegeți.
Începeți cu Go este ușor. Distribuția sa binară poate fi descărcată de pe site-ul oficial. În cazul în care sunteți pe Windows, există un program de instalare MSI pentru Go pe pagina lor de descărcare. Sau, în cazul în care sistemul dvs. de operare (din fericire) oferă un manager de pachete:
Arch Linux:
pacman -S go
Ubuntu:
apt-get install golang
Mac OS X:
brew install go
Acesta va funcționa numai dacă avem Homebrew instalat.
MongoDB
De ce să folosim MongoDB dacă avem Redis, vă întrebați? După cum am menționat mai devreme, Redis este un depozit de date în memorie. Deși poate persista datele pe disc, utilizarea Redis în acest scop probabil nu este cea mai bună cale de a merge. Vom folosi MongoDB pentru a stoca metadatele și mesajele imaginilor încărcate.
Putem descărca MongoDB de pe site-ul lor oficial. În unele distribuții Linux, aceasta este modalitatea preferată de instalare a MongoDB. Totuși, ar trebui să fie instalabil folosind majoritatea managerului de pachete al distribuției.
Arch Linux:
pacman -S mongodb
Ubuntu:
apt-get install mongodb
Mac OS X:
brew install mongodb
În codul nostru Go, vom folosi pachetul mgo (pronunțat mango). Nu numai că este testat în luptă, ci și pachetul de drivere oferă un API cu adevărat curat și simplu.
Dacă nu sunteți un expert MongoDB, nu vă faceți griji deloc. Utilizarea acestui serviciu de bază de date este minimă în aplicația noastră exemplu și este aproape irelevantă pentru subiectul acestui articol: arhitectura Pub/Sub.
Amazon S3
Vom folosi Amazon S3 pentru a stoca imaginile încărcate de utilizator. Nu sunt multe de făcut aici, cu excepția faptului că ne asigurăm că avem un cont Amazon Web Services pregătit și un bucket temporar creat.
Stocarea fișierelor încărcate pe discul local nu este o opțiune, deoarece nu vrem să ne bazăm în niciun fel pe identitatea nodurilor noastre web. Dorim ca utilizatorii să se poată conecta la oricare dintre nodurile web disponibile și să poată vedea în continuare același conținut.
Pentru a interacționa cu bucket-ul Amazon S3 din codul nostru Go, vom folosi AdRoll/goamz, o furcă a pachetului goamz de la Canonical cu unele diferențe.
Redis
Ultimul, dar nu în ultimul rând: Redis. Îl putem instala folosind managerul de pachete al distribuției noastre:
Arch Linux:
pacman -S redis
Ubuntu:
apt-get install redis-server
Mac OS X:
brew install redis
Sau, preluați codul sursă și compilați-l singur. Redis nu are alte dependențe decât GCC și libc pentru a-l construi:
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
Odată ce Redis este instalat și rulează, porniți un terminal și introduceți CLI-ul Redis:
redis-cli
Încercați să introduceți următoarele comenzi și vedeți dacă obțineți rezultatul așteptat:
SET answer 41 INCR answer GET answer
Prima comandă stochează „41” împotriva tastei „răspuns”, a doua comandă crește valoarea, a treia comandă imprimă valoarea stocată pe cheia dată. Rezultatul ar trebui să fie „42”.
Puteți afla mai multe despre toate comenzile pe care Redis le acceptă pe site-ul lor oficial.
Vom folosi pachetul Go redigo pentru a ne conecta la Redis din codul aplicației noastre.
Aruncă o privire la Redis Pub/Sub
Modelul de publicare-abonare este o modalitate de a transmite mesaje unui număr arbitrar de expeditori. Expeditorii acestor mesaje (editorii) nu identifică în mod explicit destinatarii vizați. În schimb, mesajele sunt trimise pe un canal pe care îi pot aștepta orice număr de destinatari (abonați).
În cazul nostru, putem avea orice număr de noduri web care rulează în spatele unui echilibrator de încărcare. În orice moment, doi utilizatori care se uită la aceeași imagine pot să nu fie conectați la același nod. Aici intervine Redis Pub/Sub. Ori de câte ori un nod web trebuie să observe o schimbare (de exemplu, un nou mesaj este creat de către utilizator), va folosi Redis Pub/Sub pentru a difuza informațiile către toate nodurile web relevante. Care, la rândul său, va propaga informațiile către clienții relevanți, astfel încât aceștia să poată prelua lista actualizată de mesajeredis.
Întrucât modelul de publicare-abonare ne permite să trimitem mesaje pe canale numite, putem avea fiecare nod web conectat la Redis și să ne abonam numai la acele canale de care sunt interesați utilizatorii lor conectați. De exemplu, dacă doi utilizatori se uită amândoi la aceeași imagine, dar sunt conectate la două noduri web diferite din mai multe noduri web, atunci numai acele două noduri web trebuie să se aboneze la canalul corespunzător. Orice mesaj publicat pe canalul respectiv va fi livrat numai către acele două noduri web.
Sună prea frumos pentru a fi adevărat? Îl putem încerca folosind CLI-ul Redis. Porniți trei instanțe de redis-cli
. Executați următoarea comandă în primă instanță:
SUBSCRIBE somechannel
Executați următoarea comandă în a doua instanță CLI Redis:
SUBSCRIBE someotherchannel
Executați următoarele comenzi în a treia instanță a CLI Redis:
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum
Observați cum prima instanță a primit „lorem”, dar nu „ipsum”, și cum a doua instanță a primit „ipsum”, dar nu „lorem”.
De menționat este faptul că odată ce un client Redis intră în modul abonat, acesta nu mai poate efectua nicio operațiune decât să se aboneze la mai multe canale sau să se dezaboneze de la cele abonate. Aceasta înseamnă că fiecare nod web va trebui să mențină două conexiuni la Redis, una pentru a se conecta la Redis ca abonat și cealaltă pentru a publica mesaje pe canale, astfel încât orice nod web abonat la acele canale să le poată primi.
În timp real și scalabil
Înainte de a începe să explorăm ce se întâmplă în spatele scenei, haideți să clonăm depozitul:
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...
... și compilați-l:
go build ./cmd/tonesad
Pentru a rula aplicația, în primul rând creați un fișier numit .env (de preferință prin copierea fișierului env-sample.txt):
cp env-sample.txt .env
Completați fișierul .env cu toate variabilele de mediu necesare:
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}
În cele din urmă rulați binarul construit:
PORT=9091 ./tonesad -env-file=.env
Nodul web ar trebui să ruleze acum și să fie accesibil prin http://localhost:9091.
Pentru a testa dacă încă funcționează atunci când este scalat orizontal, puteți învârti mai multe noduri web, pornindu-l cu numere de porturi diferite:
PORT=9092 ./tonesad -env-file=.env
PORT=9093 ./tonesad -env-file=.env
… și accesarea acestora prin adresele URL corespunzătoare: http://localhost:9092 și http://localhost:9093.
În spatele scenelor
În loc să trecem prin fiecare pas în dezvoltarea aplicației, ne vom concentra pe unele dintre cele mai importante părți. Deși nu toate acestea sunt 100% relevante pentru Redis Pub/Sub și implicațiile sale în timp real, ele sunt încă relevante pentru structura generală a aplicației și vor face mai ușor de urmărit odată ce ne aprofundăm.
Pentru a menține lucrurile simple, nu ne vom deranja cu autentificarea utilizatorilor. Încărcările vor fi anonime și vor fi disponibile pentru toți cei care cunosc adresa URL. Toți spectatorii pot trimite mesaje și vor avea posibilitatea de a-și alege propriul alias. Adaptarea mecanismului de autentificare adecvat și a capabilităților de confidențialitate ar trebui să fie trivială și depășește domeniul de aplicare al acestui articol.
Date persistente
Acesta este ușor.
Ori de câte ori un utilizator încarcă o imagine, o stocăm în Amazon S3 și apoi stocăm calea către ea în MongoDB împotriva a două ID-uri: un ID de obiect BSON (favoritul lui MongoDB) și un alt ID scurt de 8 caractere (oarecum plăcut ochilor). Aceasta intră în colecția „încărcări” a bazei noastre de date și are o structură ca aceasta:
type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }
Câmpul Kind este folosit pentru a indica tipul de conținut media pe care îl conține această „încărcare”. Înseamnă asta că acceptăm alte media decât imaginile? Din pacate, nu. Dar domeniul a fost lăsat acolo pentru a acționa ca o reamintire că nu ne limităm neapărat la imagini aici.
Pe măsură ce utilizatorii își trimit mesaje unul altuia, acestea sunt stocate într-o colecție diferită. Da, ați ghicit: „mesaje”.

type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }
Singurul bit interesant aici este câmpul UploadID, care este folosit pentru a asocia mesaje unei anumite încărcări.
Puncte finale API
Această aplicație are în esență trei puncte finale.
POST /api/uploads
Managerul pentru acest punct final se așteaptă la o trimitere „multipart/form-data” cu imaginea în câmpul „fișier”. Comportamentul handlerului este aproximativ după cum urmează:
func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }
Go necesită ca toate erorile să fie tratate în mod explicit. Acest lucru a fost făcut în prototip, dar este omis din fragmentele din acest articol pentru a menține accentul pe părțile critice.
În handlerul acestui punct final API, citim în esență fișierul, dar limităm dimensiunea acestuia la o anumită valoare. Dacă încărcarea depășește această valoare, cererea este respinsă. În caz contrar, un ID BSON este generat și utilizat pentru a încărca imaginea pe Amazon S3 înainte de a persista entitatea de încărcare în MongoDB.
Există un avantaj și un contra pentru modul în care sunt generate ID-urile obiectelor BSON. Ele sunt generate la nivelul clientului. Cu toate acestea, strategia folosită pentru a genera ID-ul obiectului face ca probabilitatea de coliziune să fie atât de mică încât este sigur să le generați pe partea clientului. Pe de altă parte, valorile ID-urilor de obiect generate sunt, de obicei, secvențiale și asta este ceva de care Amazon S3 nu-i place prea mult. O soluție ușoară la aceasta este să prefixați numele fișierului cu un șir aleatoriu.
GET /api/uploads/{id}/messages
Acest API este folosit pentru a prelua mesajele recente și mesajele care au fost postate după un anumit timp.
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }
Când browserul unui utilizator este notificat despre un mesaj nou pe o încărcare pe care utilizatorul îl privește în prezent, preia noile mesaje folosind acest punct final.
POST /api/uploads/{id}/messages
Și, în sfârșit, handlerul care creează mesaje și anunță pe toată lumea:
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }
Acest handler este atât de asemănător cu celelalte, încât este aproape plictisitor chiar și să-l includ aici. Sau este? Observați cum există un hub de apeluri de funcție. Emit() la sfârșitul funcției. Ce este hub zici? Acolo se întâmplă toată magia Pub/Sub.
Hub: Unde WebSockets întâlnesc Redis
Hub este locul în care lipim WebSocket-urile cu canalele Pub/Sub ale Redis. Și, întâmplător, pachetul pe care îl folosim pentru a gestiona WebSockets în serverele noastre web se numește glue.
Hub-ul menține, în esență, câteva structuri de date care creează o mapare între toate WebSocket-urile conectate la toate canalele de care sunt interesați. De exemplu, un WebSocket din fila browserului utilizatorului care indică o anumită imagine încărcată ar trebui să fie în mod natural interesat de toate notificările relevante. la ea.
Pachetul hub implementează șase funcții:
- Abonati-va
- DezabonareToate
- Emite
- EmitLocal
- InitHub
- ManerSocket
Abonare și dezabonareToate
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }
Această funcție, la fel ca majoritatea celorlalte din acest pachet, deține o blocare pe un mutex de citire/scriere în timp ce se execută. Acest lucru este astfel încât să putem modifica în siguranță socket -urile și subiectele variabilelor structurilor de date primitive. Prima variabilă, sockets , mapează socket-uri cu numele canalelor, în timp ce a doua, topics , mapează socket-uri cu socket-uri. În această funcție construim aceste mapări. Ori de câte ori vedem socket abonați-vă la un nou nume de canal, ne facem conexiunea Redis, subconn , abonați-vă la acel canal pe Redis folosind subconn.Subscribe . Acest lucru face ca Redis să trimită toate notificările de pe acel canal către acest nod web.
Și, de asemenea, în funcția UnsubscribeAll , dărâmăm maparea:
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }
Când eliminăm ultimul soclu din structura de date interesată de un anumit canal, ne dezabonăm de la canal în Redis folosind subconn.Unsubscribe .
Emite
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }
Această funcție publică un mesaj m pe canalul t folosind conexiunea de publicare la Redis.
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }
InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }
În funcția InitHub , creăm două conexiuni la Redis: una pentru abonarea la canalele de care este interesat acest nod web și cealaltă pentru a publica mesaje. Odată ce conexiunile sunt stabilite, începem o nouă rutină Go, cu o buclă care rulează mereu, așteptând să primească mesaje prin conexiunea de abonat la Redis. De fiecare dată când primește un mesaj, îl emite local (adică către toate WebSocket-urile conectate la acest nod web).
ManerSocket
Și, în sfârșit, HandleSocket este locul în care așteptăm ca mesajele să vină prin WebSockets sau să curățăm după ce conexiunea se închide:
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }
JavaScript front-end
Deoarece glue vine cu propria sa bibliotecă JavaScript front-end, este mult mai ușor să gestionați WebSockets (sau alternativă la sondarea XHR atunci când WebSockets nu sunt disponibile):
var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)
Pe partea clientului, ascultăm orice mesaj care vine prin WebSocket. Deoarece glue transmite toate mesajele ca șiruri de caractere, codificăm toate informațiile din el folosind modele specifice:
- Mesaj nou: „message:{messageID}”
- Faceți clic pe imagine: „touch:{coordX},{coordY}”, unde coordX și coordY sunt coordonatele bazate pe procente ale locației clicului utilizatorului pe imagine
Când utilizatorul creează un mesaj nou, folosim API-ul „POST /api/uploads/{uploadID}/messages” pentru a crea un mesaj nou. Acest lucru se face utilizând metoda de creare a colecției backbone pentru mesaje:
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })
Când utilizatorul face clic pe imagine, calculăm poziția clicului în procente din lățimea și înălțimea imaginii și trimitem informațiile direct prin WebSocket.
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())
Privire de ansamblu
Când utilizatorul introduce un mesaj și apasă tasta Enter, clientul invocă punctul final API „POST /api/uploads/{id}/messages”. Aceasta, la rândul său, creează o entitate de mesaj în baza de date și publică un șir „message:{messageID}” prin Redis Pub/Sub pe canalul „upload:{uploadID}” prin pachetul hub.
Redis redirecționează acest șir către fiecare nod web (abonat) interesat de canalul „upload:{uploadID}”. Nodurile Web care primesc acest șir iterează peste toate WebSocket-urile relevante pentru canal și trimite șirul către client prin conexiunile lor WebSocket. Clienții care primesc acest șir încep să preia mesaje noi de pe server folosind „GET /api/uploads/{id}/messages”.
În mod similar, pentru propagarea evenimentelor de clic pe imagine, clientul trimite direct un mesaj prin WebSocket care arată ceva de genul „încărcare la atingere:{uploadID} {coordX} {coordY}”. Acest mesaj ajunge în pachetul hub unde este publicat pe același canal „upload:{uploadID}”. Ca rezultat, șirul este distribuit tuturor utilizatorilor care se uită la imaginea încărcată. Clientul, la primirea acestui șir, îl analizează pentru a extrage coordonatele și redă un cerc care crește și se estompează pentru a evidenția locația clicului pe moment.
Învelire
În acest articol, am văzut o privire a modului în care modelul de publicare-abonare poate ajuta la rezolvarea problemei de scalare a aplicațiilor web în timp real într-o mare măsură și cu relativă ușurință.
Exemplul de aplicație există pentru a servi drept loc de joacă pentru experimentarea cu Redis Pub/Sub. Dar, așa cum am menționat mai devreme, ideile pot fi implementate în aproape orice alt limbaj de programare popular.