使用 Redis Pub/Sub 實現實時

已發表: 2022-03-11

無論涉及的複雜性如何,擴展 Web 應用程序幾乎總是一個有趣的挑戰。 但是,實時 Web 應用程序會帶來獨特的可擴展性問題。 例如,為了能夠水平擴展使用 WebSocket 與其客戶端通信的消息傳遞 Web 應用程序,它需要以某種方式同步其所有服務器節點。 如果應用程序的構建沒有考慮到這一點,那麼水平擴展它可能不是一個簡單的選擇。

在本文中,我們將介紹一個簡單的實時圖像共享和消息傳遞 Web 應用程序的架構。 在這裡,我們將重點關注構建實時應用程序所涉及的各種組件,例如 Redis Pub/Sub,並了解它們如何在整體架構中發揮作用。

使用 Redis Pub/Sub 實現實時

使用 Redis Pub/Sub 實現實時
鳴叫

功能方面,應用程序非常輕巧。 它允許上傳圖像和對這些圖像的實時評論。 此外,任何用戶都可以點擊圖像,其他用戶將能夠在他們的屏幕上看到漣漪效果。

該應用程序的完整源代碼可在 GitHub 上找到。

我們需要的東西

我們將使用編程語言 Go。 我們在本文中選擇 Go 並沒有什麼特別的原因,除了 Go 的語法簡潔而且它的語義更容易理解之外。 當然還有作者的偏見。 但是,本文中討論的所有概念都可以輕鬆翻譯成您選擇的語言。

Go 入門很容易。 它的二進制發行版可以從官方網站下載。 如果您使用的是 Windows,則在其下載頁面上有一個適用於 Go 的 MSI 安裝程序。 或者,如果您的操作系統(幸運的是)提供了包管理器:

Arch Linux:

 pacman -S go

Ubuntu:

 apt-get install golang

Mac OS X:

 brew install go

這個只有在我們安裝了 Homebrew 的情況下才能工作。

MongoDB

如果我們有 Redis,為什麼要使用 MongoDB,你問? 如前所述,Redis 是一種內存數據存儲。 儘管它可以將數據持久化到磁盤,但為此目的使用 Redis 可能不是最好的方法。 我們將使用 MongoDB 來存儲上傳的圖像元數據和消息。

我們可以從他們的官方網站下載 MongoDB。 在某些 Linux 發行版中,這是安裝 MongoDB 的首選方式。 儘管如此,它仍然應該可以使用大多數發行版的包管理器進行安裝。

Arch Linux:

 pacman -S mongodb

Ubuntu:

 apt-get install mongodb

Mac OS X:

 brew install mongodb

在我們的 Go 代碼中,我們將使用包 mgo(發音為 mango)。 它不僅經過實戰測試,驅動程序包還提供了一個非常乾淨和簡單的 API。

如果您不是 MongoDB 專家,請不要擔心。 在我們的示例應用程序中,此數據庫服務的使用很少,並且與本文的重點幾乎無關:Pub/Sub 架構。

亞馬遜 S3

我們將使用 Amazon S3 來存儲用戶上傳的圖像。 這裡沒有什麼可做的,除了確保我們有一個 Amazon Web Services 就緒賬戶和一個臨時存儲桶。

將上傳的文件存儲到本地磁盤不是一種選擇,因為我們不想以任何方式依賴我們的 Web 節點的身份。 我們希望用戶能夠連接到任何可用的 Web 節點,並且仍然能夠看到相同的內容。

為了通過我們的 Go 代碼與 Amazon S3 存儲桶進行交互,我們將使用 AdRoll/goamz,它是 Canonical 的 goamz 包的一個分支,但有一些不同之處。

雷迪斯

最後但並非最不重要的:Redis。 我們可以使用我們發行版的包管理器來安裝它:

Arch Linux:

 pacman -S redis

Ubuntu:

 apt-get install redis-server

Mac OS X:

 brew install redis

或者,獲取它的源代碼並自己編譯。 Redis 除了 GCC 和 libc 之外沒有其他依賴項來構建它:

 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

Redis 安裝並運行後,啟動終端並進入 Redis 的 CLI:

 redis-cli

嘗試輸入以下命令,看看是否得到預期的輸出:

 SET answer 41 INCR answer GET answer

第一個命令針對鍵“answer”存儲“41”,第二個命令增加值,第三個命令打印針對給定鍵存儲的值。 結果應為“42”。

您可以在其官方網站上了解有關 Redis 支持的所有命令的更多信息。

我們將使用 Go 包 redigo 從我們的應用程序代碼中連接到 Redis。

查看 Redis Pub/Sub

發布-訂閱模式是一種將消息傳遞給任意數量的發送者的方式。 這些消息的發件人(發布者)沒有明確標識目標收件人。 取而代之的是,消息在一個通道上發送出去,任何數量的接收者(訂閱者)都可以在該通道上等待它們。

簡單的發布訂閱配置

在我們的例子中,我們可以在負載均衡器後面運行任意數量的 Web 節點。 在任何給定時刻,查看同一圖像的兩個用戶可能不會連接到同一個節點。 這就是 Redis Pub/Sub 發揮作用的地方。 每當一個 Web 節點需要觀察變化時(例如用戶創建了一條新消息),它將使用 Redis Pub/Sub 將該信息廣播到所有相關的 Web 節點。 反過來,這會將信息傳播給相關的客戶端,以便他們可以獲取更新的消息列表。

由於發布-訂閱模式允許我們在命名通道上發送消息,我們可以讓每個 Web 節點連接到 Redis,並且只訂閱他們連接的用戶感興趣的通道。例如,如果兩個用戶都在看相同的圖像,但連接到許多 web 節點中的兩個不同的 web 節點,則只有這兩個 web 節點需要訂閱相應的頻道。 在該頻道上發布的任何消息都將僅傳遞到這兩個 Web 節點。

聽起來好得令人難以置信? 我們可以使用 Redis 的 CLI 進行嘗試。 啟動三個redis-cli實例。 在第一個實例中執行以下命令:

 SUBSCRIBE somechannel

在第二個 Redis CLI 實例中執行以下命令:

 SUBSCRIBE someotherchannel

在 Redis CLI 的第三個實例中執行以下命令:

 PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

請注意第一個實例如何收到“lorem”但沒有收到“ipsum”,以及第二個實例如何收到“ipsum”但沒有收到“lorem”。

Redis Pub/Sub 實戰

值得一提的是,Redis 客戶端一旦進入訂閱者模式,除了訂閱更多頻道或退訂已訂閱的頻道外,無法再進行任何操作。 這意味著每個 Web 節點都需要維護兩個到 Redis 的連接,一個作為訂閱者連接到 Redis,另一個在通道上發布消息,以便訂閱這些通道的任何 Web 節點都可以接收它們。

實時和可擴展

在我們開始探索幕後發生的事情之前,讓我們克隆存儲庫:

 mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

...並編譯它:

 go build ./cmd/tonesad

要運行應用程序,首先創建一個名為 .env 的文件(最好通過複製文件 env-sample.txt):

 cp env-sample.txt .env

使用所有必要的環境變量填寫 .env 文件:

 MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

最後運行構建的二進製文件:

 PORT=9091 ./tonesad -env-file=.env

Web 節點現在應該正在運行並且可以通過 http://localhost:9091 訪問。

活生生的例子

要測試它在水平擴展時是否仍然有效,您可以通過使用不同的端口號啟動多個 Web 節點:

 PORT=9092 ./tonesad -env-file=.env
 PORT=9093 ./tonesad -env-file=.env

…並通過相應的 URL 訪問它們:http://localhost:9092 和 http://localhost:9093。

活生生的例子

幕後花絮

我們將專注於一些最重要的部分,而不是遍歷應用程序開發的每一步。 儘管並非所有這些都與 Redis Pub/Sub 及其實時影響 100% 相關,但它們仍然與應用程序的整體結構相關,並且一旦我們深入研究,它們將更容易遵循。

為簡單起見,我們不會為用戶身份驗證而煩惱。 上傳將是匿名的,所有知道該 URL 的人都可以使用。 所有查看者都可以發送消息,並且可以選擇自己的別名。 調整適當的身份驗證機制和隱私功能應該是微不足道的,並且超出了本文的範圍。

持久化數據

這很容易。

每當用戶上傳圖像時,我們將其存儲在 Amazon S3 中,然後針對兩個 ID 將其路徑存儲在 MongoDB 中:一個是 BSON 對象 ID(MongoDB 最喜歡的),另一個是 8 個字符長的短 ID(有點令人賞心悅目)。 這進入我們數據庫的“上傳”集合,並具有如下結構:

 type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }

字段Kind用於指示此“上傳”包含的媒體類型。 這是否意味著我們支持圖像以外的媒體? 很不幸的是,不行。 但是該字段已留在那里以提醒我們,我們不一定限於此處的圖像。

當用戶互相發送消息時,它們被存儲在不同的集合中。 是的,您已經猜到了:“消息”。

 type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

這裡唯一有趣的是 UploadID 字段,它用於將消息與特定的上傳相關聯。

API 端點

該應用程序本質上具有三個端點。

發布 /api/上傳

此端點的處理程序需要“multipart/form-data”提交,其中包含“file”字段中的圖像。 處理程序的行為大致如下:

 func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go 要求明確處理所有錯誤。 這已在原型中完成,但在本文的片段中省略以保持對關鍵部分的關注。

在此 API 端點的處理程序中,我們實際上是在讀取文件,但將其大小限制為特定值。 如果上傳超過此值,則請求被拒絕。 否則,將生成一個 BSON ID 並將其用於將圖像上傳到 Amazon S3,然後再將上傳實體保存到 MongoDB。

BSON 對象 ID 的生成方式有利有弊。 它們是在客戶端生成的。 然而,用於生成對象 ID 的策略使得衝突的概率如此之小,以至於在客戶端生成它們是安全的。 另一方面,生成的對象 ID 的值通常是連續的,這是 Amazon S3 不太喜歡的。 一個簡單的解決方法是在文件名前面加上一個隨機字符串。

獲取 /api/uploads/{id}/messages

此 API 用於獲取最近的消息,以及在特定時間之後發布的消息。

 func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

當用戶的瀏覽器收到有關用戶當前正在查看的上傳的新消息的通知時,它會使用此端點獲取新消息。

POST /api/uploads/{id}/messages

最後是創建消息並通知所有人的處理程序:

 func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

這個處理程序與其他處理程序非常相似,以至於將它包含在此處幾乎是無聊的。 或者是嗎? 注意在函數的最後有一個函數調用hub.Emit() 。 你說的樞紐是什麼? 這就是所有 Pub/Sub 魔法發生的地方。

Hub:WebSockets 與 Redis 相遇的地方

Hub 是我們將 WebSocket 與 Redis 的 Pub/Sub 通道粘合在一起的地方。 而且,巧合的是,我們用來在 Web 服務器中處理 WebSocket 的包稱為膠水。

Hub 本質上維護了一些數據結構,這些數據結構在所有連接的 WebSocket 與他們感興趣的所有通道之間創建映射。例如,用戶瀏覽器選項卡上指向特定上傳圖像的 WebSocket 自然應該對所有相關的通知感興趣給它。

hub包實現了六個功能:

  • 訂閱
  • 全部退訂
  • 發射
  • 發射本地
  • 初始化集線器
  • 手柄插座

訂閱和取消訂閱全部

func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

這個函數,就像這個包中的大多數其他函數一樣,在執行時持有一個讀/寫互斥鎖。 這樣我們就可以安全地修改原始數據結構變量socketstopics 。 第一個變量sockets將套接字映射到通道名稱,而第二個變量topic將通道名稱映射到套接字。 在這個函數中,我們構建了這些映射。 每當我們看到 socket 訂閱了一個新的頻道名稱時,我們都會使用subconn.Subscribe使我們的 Redis 連接subconn訂閱 Redis 上的該頻道。 這使得 Redis 將該通道上的所有通知轉發到該 Web 節點。

同樣,在UnsubscribeAll函數中,我們拆掉映射:

 func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

當我們從對特定頻道感興趣的數據結構中刪除最後一個套接字時,我們使用subconn.Unsubscribe取消訂閱 Redis 中的頻道。

發射

func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

此函數使用與 Redis 的發布連接在通道t上發布消息m

發射本地

func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

初始化集線器

func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }

InitHub函數中,我們創建了兩個到 Redis 的連接:一個用於訂閱該 Web 節點感興趣的頻道,另一個用於發布消息。 建立連接後,我們將啟動一個新的 Go 例程,其中一個循環永遠運行,等待通過訂閱者連接到 Redis 接收消息。 每次它接收到消息時,它都會在本地發出它(即發送到連接到此 Web 節點的所有 WebSockets)。

手柄插座

最後, HandleSocket是我們等待消息通過 WebSockets 或在連接關閉後清理的地方:

 func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }

前端 JavaScript

由於glue自帶了自己的前端JavaScript庫,處理WebSockets(或者當WebSockets不可用時回退到XHR輪詢)要容易得多:

 var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

在客戶端,我們正在監聽通過 WebSocket 傳入的任何消息。 由於膠水將所有消息作為字符串傳輸,我們使用特定模式對其中的所有信息進行編碼:

  • 新消息:“消息:{messageID}”
  • 點擊圖片:“touch:{coordX},{coordY}”,其中coordX和coordY是用戶在圖片上點擊位置的百分比坐標

當用戶創建新消息時,我們使用“POST /api/uploads/{uploadID}/messages” API 來創建新消息。 這是在消息的主幹集合上使用create方法完成的:

 messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

當用戶點擊圖片時,我們計算點擊位置佔圖片寬度和高度的百分比,並直接通過 WebSocket 發送信息。

 socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

概述

應用程序架構概述

當用戶輸入消息並按下回車鍵時,客戶端會調用“POST /api/uploads/{id}/messages”API 端點。 這反過來又在數據庫中創建了一個消息實體,並通過集線器包在通道“upload:{uploadID}”上通過 Redis Pub/Sub 發布字符串“message:{messageID}”。

Redis 將此字符串轉發給對頻道“upload:{uploadID}”感興趣的每個 Web 節點(訂閱者)。 接收此字符串的 Web 節點遍歷與通道相關的所有 WebSocket,並通過其 WebSocket 連接將字符串發送到客戶端。 接收此字符串的客戶端開始使用“GET /api/uploads/{id}/messages”從服務器獲取新消息。

類似地,為了在圖像上傳播點擊事件,客戶端直接通過 WebSocket 發送一條消息,類似於“touch upload:{uploadID} {coordX} {coordY}”。 此消息最終在集線器包中發布,並在同一頻道“upload:{uploadID}”上發布。 結果,字符串被分發給查看上傳圖像的所有用戶。 客戶端在收到此字符串後對其進行解析以提取坐標,並呈現一個逐漸消失的圓圈以暫時突出單擊位置。

包起來

在本文中,我們看到了發布-訂閱模式如何幫助解決在很大程度上相對輕鬆地擴展實時 Web 應用程序的問題。

示例應用程序可用作試驗 Redis Pub/Sub 的遊樂場。 但是,如前所述,這些想法幾乎可以用任何其他流行的編程語言來實現。