使用 Redis Pub/Sub 实现实时

已发表: 2022-03-11

无论涉及的复杂性如何,扩展 Web 应用程序几乎总是一个有趣的挑战。 但是,实时 Web 应用程序会带来独特的可扩展性问题。 例如,为了能够水平扩展使用 WebSocket 与其客户端通信的消息传递 Web 应用程序,它需要以某种方式同步其所有服务器节点。 如果应用程序的构建没有考虑到这一点,那么水平扩展它可能不是一个简单的选择。

在本文中,我们将介绍一个简单的实时图像共享和消息传递 Web 应用程序的架构。 在这里,我们将重点关注构建实时应用程序所涉及的各种组件,例如 Redis Pub/Sub,并了解它们如何在整体架构中发挥作用。

使用 Redis Pub/Sub 实现实时

使用 Redis Pub/Sub 实现实时
鸣叫

功能方面,应用程序非常轻巧。 它允许上传图像和对这些图像的实时评论。 此外,任何用户都可以点击图像,其他用户将能够在他们的屏幕上看到涟漪效果。

该应用程序的完整源代码可在 GitHub 上找到。

我们需要的东西

我们将使用编程语言 Go。 我们在本文中选择 Go 并没有什么特别的原因,除了 Go 的语法简洁而且它的语义更容易理解之外。 当然还有作者的偏见。 但是,本文中讨论的所有概念都可以轻松翻译成您选择的语言。

Go 入门很容易。 它的二进制发行版可以从官方网站下载。 如果您使用的是 Windows,则在其下载页面上有一个适用于 Go 的 MSI 安装程序。 或者,如果您的操作系统(幸运的是)提供了包管理器:

Arch Linux:

 pacman -S go

Ubuntu:

 apt-get install golang

Mac OS X:

 brew install go

这个只有在我们安装了 Homebrew 的情况下才能工作。

MongoDB

如果我们有 Redis,为什么要使用 MongoDB,你问? 如前所述,Redis 是一种内存数据存储。 尽管它可以将数据持久化到磁盘,但为此目的使用 Redis 可能不是最好的方法。 我们将使用 MongoDB 来存储上传的图像元数据和消息。

我们可以从他们的官方网站下载 MongoDB。 在某些 Linux 发行版中,这是安装 MongoDB 的首选方式。 尽管如此,它仍然应该可以使用大多数发行版的包管理器进行安装。

Arch Linux:

 pacman -S mongodb

Ubuntu:

 apt-get install mongodb

Mac OS X:

 brew install mongodb

在我们的 Go 代码中,我们将使用包 mgo(发音为 mango)。 它不仅经过实战测试,驱动程序包还提供了一个非常干净和简单的 API。

如果您不是 MongoDB 专家,请不要担心。 在我们的示例应用程序中,此数据库服务的使用很少,并且与本文的重点几乎无关:Pub/Sub 架构。

亚马逊 S3

我们将使用 Amazon S3 来存储用户上传的图像。 这里没有什么可做的,除了确保我们有一个 Amazon Web Services 就绪账户和一个临时存储桶。

将上传的文件存储到本地磁盘不是一种选择,因为我们不想以任何方式依赖我们的 Web 节点的身份。 我们希望用户能够连接到任何可用的 Web 节点,并且仍然能够看到相同的内容。

为了通过我们的 Go 代码与 Amazon S3 存储桶进行交互,我们将使用 AdRoll/goamz,它是 Canonical 的 goamz 包的一个分支,但有一些不同之处。

雷迪斯

最后但并非最不重要的:Redis。 我们可以使用我们发行版的包管理器来安装它:

Arch Linux:

 pacman -S redis

Ubuntu:

 apt-get install redis-server

Mac OS X:

 brew install redis

或者,获取它的源代码并自己编译。 Redis 除了 GCC 和 libc 之外没有其他依赖项来构建它:

 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

Redis 安装并运行后,启动终端并进入 Redis 的 CLI:

 redis-cli

尝试输入以下命令,看看是否得到预期的输出:

 SET answer 41 INCR answer GET answer

第一个命令针对键“answer”存储“41”,第二个命令增加值,第三个命令打印针对给定键存储的值。 结果应为“42”。

您可以在其官方网站上了解有关 Redis 支持的所有命令的更多信息。

我们将使用 Go 包 redigo 从我们的应用程序代码中连接到 Redis。

查看 Redis Pub/Sub

发布-订阅模式是一种将消息传递给任意数量的发送者的方式。 这些消息的发件人(发布者)没有明确标识目标收件人。 取而代之的是,消息在一个通道上发送出去,任何数量的接收者(订阅者)都可以在该通道上等待它们。

简单的发布订阅配置

在我们的例子中,我们可以在负载均衡器后面运行任意数量的 Web 节点。 在任何给定时刻,查看同一图像的两个用户可能不会连接到同一个节点。 这就是 Redis Pub/Sub 发挥作用的地方。 每当一个 Web 节点需要观察变化时(例如用户创建了一条新消息),它将使用 Redis Pub/Sub 将该信息广播到所有相关的 Web 节点。 反过来,这会将信息传播给相关的客户端,以便他们可以获取更新的消息列表。

由于发布-订阅模式允许我们在命名通道上发送消息,我们可以让每个 Web 节点连接到 Redis,并且只订阅他们连接的用户感兴趣的通道。例如,如果两个用户都在看相同的图像,但连接到许多 web 节点中的两个不同的 web 节点,则只有这两个 web 节点需要订阅相应的频道。 在该频道上发布的任何消息都将仅传递到这两个 Web 节点。

听起来好得令人难以置信? 我们可以使用 Redis 的 CLI 进行尝试。 启动三个redis-cli实例。 在第一个实例中执行以下命令:

 SUBSCRIBE somechannel

在第二个 Redis CLI 实例中执行以下命令:

 SUBSCRIBE someotherchannel

在 Redis CLI 的第三个实例中执行以下命令:

 PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

请注意第一个实例如何收到“lorem”但没有收到“ipsum”,以及第二个实例如何收到“ipsum”但没有收到“lorem”。

Redis Pub/Sub 实战

值得一提的是,Redis 客户端一旦进入订阅者模式,除了订阅更多频道或退订已订阅的频道外,无法再进行任何操作。 这意味着每个 Web 节点都需要维护两个到 Redis 的连接,一个作为订阅者连接到 Redis,另一个在通道上发布消息,以便订阅这些通道的任何 Web 节点都可以接收它们。

实时和可扩展

在我们开始探索幕后发生的事情之前,让我们克隆存储库:

 mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

...并编译它:

 go build ./cmd/tonesad

要运行应用程序,首先创建一个名为 .env 的文件(最好通过复制文件 env-sample.txt):

 cp env-sample.txt .env

使用所有必要的环境变量填写 .env 文件:

 MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

最后运行构建的二进制文件:

 PORT=9091 ./tonesad -env-file=.env

Web 节点现在应该正在运行并且可以通过 http://localhost:9091 访问。

活生生的例子

要测试它在水平扩展时是否仍然有效,您可以通过使用不同的端口号启动多个 Web 节点:

 PORT=9092 ./tonesad -env-file=.env
 PORT=9093 ./tonesad -env-file=.env

…并通过相应的 URL 访问它们:http://localhost:9092 和 http://localhost:9093。

活生生的例子

幕后花絮

我们将专注于一些最重要的部分,而不是遍历应用程序开发的每一步。 尽管并非所有这些都与 Redis Pub/Sub 及其实时影响 100% 相关,但它们仍然与应用程序的整体结构相关,并且一旦我们深入研究,它们将更容易遵循。

为简单起见,我们不会为用户身份验证而烦恼。 上传将是匿名的,所有知道该 URL 的人都可以使用。 所有查看者都可以发送消息,并且可以选择自己的别名。 调整适当的身份验证机制和隐私功能应该是微不足道的,并且超出了本文的范围。

持久化数据

这很容易。

每当用户上传图像时,我们将其存储在 Amazon S3 中,然后针对两个 ID 将其路径存储在 MongoDB 中:一个是 BSON 对象 ID(MongoDB 最喜欢的),另一个是 8 个字符长的短 ID(有点令人赏心悦目)。 这进入我们数据库的“上传”集合,并具有如下结构:

 type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }

字段Kind用于指示此“上传”包含的媒体类型。 这是否意味着我们支持图像以外的媒体? 很不幸的是,不行。 但是该字段已留在那里以提醒我们,我们不一定限于此处的图像。

当用户互相发送消息时,它们被存储在不同的集合中。 是的,您已经猜到了:“消息”。

 type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

这里唯一有趣的是 UploadID 字段,它用于将消息与特定的上传相关联。

API 端点

该应用程序本质上具有三个端点。

发布 /api/上传

此端点的处理程序需要“multipart/form-data”提交,其中包含“file”字段中的图像。 处理程序的行为大致如下:

 func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go 要求明确处理所有错误。 这已在原型中完成,但在本文的片段中省略以保持对关键部分的关注。

在此 API 端点的处理程序中,我们实际上是在读取文件,但将其大小限制为特定值。 如果上传超过此值,则请求被拒绝。 否则,将生成一个 BSON ID 并将其用于将图像上传到 Amazon S3,然后再将上传实体保存到 MongoDB。

BSON 对象 ID 的生成方式有利有弊。 它们是在客户端生成的。 然而,用于生成对象 ID 的策略使得冲突的概率如此之小,以至于在客户端生成它们是安全的。 另一方面,生成的对象 ID 的值通常是连续的,这是 Amazon S3 不太喜欢的。 一个简单的解决方法是在文件名前面加上一个随机字符串。

获取 /api/uploads/{id}/messages

此 API 用于获取最近的消息,以及在特定时间之后发布的消息。

 func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

当用户的浏览器收到有关用户当前正在查看的上传的新消息的通知时,它会使用此端点获取新消息。

POST /api/uploads/{id}/messages

最后是创建消息并通知所有人的处理程序:

 func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

这个处理程序与其他处理程序非常相似,以至于将它包含在此处几乎是无聊的。 或者是吗? 注意在函数的最后有一个函数调用hub.Emit() 。 你说的枢纽是什么? 这就是所有 Pub/Sub 魔法发生的地方。

Hub:WebSockets 与 Redis 相遇的地方

Hub 是我们将 WebSocket 与 Redis 的 Pub/Sub 通道粘合在一起的地方。 而且,巧合的是,我们用来在 Web 服务器中处理 WebSocket 的包称为胶水。

Hub 本质上维护了一些数据结构,这些数据结构在所有连接的 WebSocket 与他们感兴趣的所有通道之间创建映射。例如,用户浏览器选项卡上指向特定上传图像的 WebSocket 自然应该对所有相关的通知感兴趣给它。

hub包实现了六个功能:

  • 订阅
  • 全部退订
  • 发射
  • 发射本地
  • 初始化集线器
  • 手柄插座

订阅和取消订阅全部

func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

这个函数,就像这个包中的大多数其他函数一样,在执行时持有一个读/写互斥锁。 这样我们就可以安全地修改原始数据结构变量socketstopics 。 第一个变量sockets将套接字映射到通道名称,而第二个变量topic将通道名称映射到套接字。 在这个函数中,我们构建了这些映射。 每当我们看到 socket 订阅了一个新的频道名称时,我们都会使用subconn.Subscribe使我们的 Redis 连接subconn订阅 Redis 上的该频道。 这使得 Redis 将该通道上的所有通知转发到该 Web 节点。

同样,在UnsubscribeAll函数中,我们拆掉映射:

 func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

当我们从对特定频道感兴趣的数据结构中删除最后一个套接字时,我们使用subconn.Unsubscribe取消订阅 Redis 中的频道。

发射

func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

此函数使用与 Redis 的发布连接在通道t上发布消息m

发射本地

func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

初始化集线器

func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }

InitHub函数中,我们创建了两个到 Redis 的连接:一个用于订阅该 Web 节点感兴趣的频道,另一个用于发布消息。 建立连接后,我们将启动一个新的 Go 例程,其中一个循环永远运行,等待通过订阅者连接到 Redis 接收消息。 每次它接收到消息时,它都会在本地发出它(即发送到连接到此 Web 节点的所有 WebSockets)。

手柄插座

最后, HandleSocket是我们等待消息通过 WebSockets 或在连接关闭后清理的地方:

 func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }

前端 JavaScript

由于glue自带了自己的前端JavaScript库,处理WebSockets(或者当WebSockets不可用时回退到XHR轮询)要容易得多:

 var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

在客户端,我们正在监听通过 WebSocket 传入的任何消息。 由于胶水将所有消息作为字符串传输,我们使用特定模式对其中的所有信息进行编码:

  • 新消息:“消息:{messageID}”
  • 点击图片:“touch:{coordX},{coordY}”,其中coordX和coordY是用户在图片上点击位置的百分比坐标

当用户创建新消息时,我们使用“POST /api/uploads/{uploadID}/messages” API 来创建新消息。 这是在消息的主干集合上使用create方法完成的:

 messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

当用户点击图片时,我们计算点击位置占图片宽度和高度的百分比,并直接通过 WebSocket 发送信息。

 socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

概述

应用程序架构概述

当用户输入消息并按下回车键时,客户端会调用“POST /api/uploads/{id}/messages”API 端点。 这反过来又在数据库中创建了一个消息实体,并通过集线器包在通道“upload:{uploadID}”上通过 Redis Pub/Sub 发布字符串“message:{messageID}”。

Redis 将此字符串转发给对频道“upload:{uploadID}”感兴趣的每个 Web 节点(订阅者)。 接收此字符串的 Web 节点遍历与通道相关的所有 WebSocket,并通过其 WebSocket 连接将字符串发送到客户端。 接收此字符串的客户端开始使用“GET /api/uploads/{id}/messages”从服务器获取新消息。

类似地,为了在图像上传播点击事件,客户端直接通过 WebSocket 发送一条消息,类似于“touch upload:{uploadID} {coordX} {coordY}”。 此消息最终在集线器包中发布,并在同一频道“upload:{uploadID}”上发布。 结果,字符串被分发给查看上传图像的所有用户。 客户端在收到此字符串后对其进行解析以提取坐标,并呈现一个逐渐消失的圆圈以暂时突出单击位置。

包起来

在本文中,我们看到了发布-订阅模式如何帮助解决在很大程度上相对轻松地扩展实时 Web 应用程序的问题。

示例应用程序可用作试验 Redis Pub/Sub 的游乐场。 但是,如前所述,这些想法几乎可以用任何其他流行的编程语言来实现。