Redis Pub/Subでリアルタイムに移行
公開: 2022-03-11Webアプリのスケーリングは、複雑さに関係なく、ほとんどの場合興味深い課題です。 ただし、リアルタイムWebアプリには、固有のスケーラビリティの問題があります。 たとえば、WebSocketを使用してクライアントと通信するメッセージングWebアプリを水平方向にスケーリングできるようにするには、何らかの方法ですべてのサーバーノードを同期する必要があります。 アプリがこれを念頭に置いて構築されていない場合は、水平方向にスケーリングするのは簡単なオプションではない可能性があります。
この記事では、シンプルなリアルタイムの画像共有およびメッセージングWebアプリのアーキテクチャについて説明します。 ここでは、リアルタイムアプリの構築に関与するRedis Pub / Subなどのさまざまなコンポーネントに焦点を当て、それらすべてがアーキテクチャ全体でどのように役割を果たすかを確認します。
機能面では、アプリケーションは非常に軽いです。 それは画像のアップロードとそれらの画像へのリアルタイムのコメントを可能にします。 さらに、どのユーザーも画像をタップでき、他のユーザーは画面に波打つ効果を見ることができます。
このアプリのソースコード全体はGitHubで入手できます。
必要なもの
行け
プログラミング言語Goを使用します。 この記事でGoを選択する特別な理由はありませんが、Goの構文はクリーンであり、そのセマンティクスはわかりやすいものです。 そしてもちろん、作者の偏見もあります。 ただし、この記事で説明するすべての概念は、選択した言語に簡単に翻訳できます。
Goの使用を開始するのは簡単です。 その二項分布は、公式サイトからダウンロードできます。 Windowsを使用している場合は、ダウンロードページにGo用のMSIインストーラーがあります。 または、オペレーティングシステムが(幸いなことに)パッケージマネージャーを提供している場合:
Arch Linux:
pacman -S go
Ubuntu:
apt-get install golang
Mac OS X:
brew install go
これは、Homebrewがインストールされている場合にのみ機能します。
MongoDB
RedisがあるのになぜMongoDBを使用するのですか? 前述のように、Redisはインメモリデータストアです。 データをディスクに永続化することはできますが、その目的でRedisを使用するのはおそらく最善の方法ではありません。 アップロードされた画像のメタデータとメッセージを保存するためにMongoDBを使用します。
MongoDBは、公式Webサイトからダウンロードできます。 一部のLinuxディストリビューションでは、これがMongoDBをインストールするための推奨される方法です。 それでも、ほとんどのディストリビューションのパッケージマネージャーを使用してインストールできるはずです。
Arch Linux:
pacman -S mongodb
Ubuntu:
apt-get install mongodb
Mac OS X:
brew install mongodb
Goコード内では、パッケージmgo(マンゴーと発音)を使用します。 バトルテストされているだけでなく、ドライバーパッケージは本当にクリーンでシンプルなAPIを提供します。
MongoDBの専門家でない場合でも、心配する必要はありません。 サンプルアプリでは、このデータベースサービスの使用は最小限であり、この記事の焦点であるPub/Subアーキテクチャとはほとんど関係ありません。
Amazon S3
ユーザーがアップロードした画像を保存するためにAmazonS3を使用します。 アマゾンウェブサービス対応のアカウントと一時的なバケットが作成されていることを確認する以外は、ここで行うことはあまりありません。
アップロードされたファイルをローカルディスクに保存することは、WebノードのIDにまったく依存したくないため、オプションではありません。 ユーザーが利用可能な任意のWebノードに接続できるようにし、それでも同じコンテンツを表示できるようにする必要があります。
GoコードからAmazonS3バケットを操作するには、CanonicalのgoamzパッケージのフォークであるAdRoll/goamzを使用しますがいくつかの違いがあります。
Redis
最後になりましたが、Redisです。 ディストリビューションのパッケージマネージャーを使用してインストールできます。
Arch Linux:
pacman -S redis
Ubuntu:
apt-get install redis-server
Mac OS X:
brew install redis
または、ソースコードを取得して、自分でコンパイルします。 Redisには、GCCとlibc以外の依存関係はありません。
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
Redisがインストールされて実行されたら、ターミナルを起動してRedisのCLIに入ります。
redis-cli
次のコマンドを入力して、期待どおりの出力が得られるかどうかを確認してください。
SET answer 41 INCR answer GET answer
最初のコマンドはキー「answer」に対して「41」を格納し、2番目のコマンドは値をインクリメントし、3番目のコマンドは指定されたキーに対して格納された値を出力します。 結果は「42」と表示されます。
Redisがサポートするすべてのコマンドの詳細については、公式Webサイトを参照してください。
Goパッケージredigoを使用して、アプリコード内からRedisに接続します。
Redis Pub/Subをのぞく
パブリッシュ/サブスクライブパターンは、任意の数の送信者にメッセージを渡す方法です。 これらのメッセージの送信者(発行者)は、対象の受信者を明示的に識別しません。 代わりに、メッセージは、任意の数の受信者(サブスクライバー)がメッセージを待機できるチャネルで送信されます。
この場合、ロードバランサーの背後で任意の数のWebノードを実行できます。 いつでも、同じ画像を見ている2人のユーザーが同じノードに接続していない可能性があります。 ここで、Redis Pub/Subが役立ちます。 Webノードが変更を監視する必要がある場合(たとえば、ユーザーが新しいメッセージを作成する場合)は常に、Redis Pub / Subを使用して、関連するすべてのWebノードにその情報をブロードキャストします。 これにより、関連するクライアントに情報が伝達され、更新されたmessagesredisのリストを取得できるようになります。
パブリッシュ/サブスクライブパターンでは、名前付きチャネルにメッセージをディスパッチできるため、各WebノードをRedisに接続し、接続されたユーザーが関心を持っているチャネルのみにサブスクライブできます。たとえば、2人のユーザーが両方とも同じ画像ですが、多くのWebノードのうち2つの異なるWebノードに接続されている場合、対応するチャネルにサブスクライブする必要があるのは、これら2つのWebノードのみです。 そのチャネルで公開されたメッセージは、これら2つのWebノードにのみ配信されます。
良すぎて真実ではないですか? RedisのCLIを使用して試すことができます。 redis-cli
の3つのインスタンスを開始します。 最初に次のコマンドを実行します。
SUBSCRIBE somechannel
2番目のRedisCLIインスタンスで次のコマンドを実行します。
SUBSCRIBE someotherchannel
RedisCLIの3番目のインスタンスで次のコマンドを実行します。
PUBLISH somechannel lorem PUBLISH someotherchannel ipsum
最初のインスタンスが「lorem」を受信したが「ipsum」を受信しなかった方法と、2番目のインスタンスが「ipsum」を受信したが「lorem」を受信しなかった方法に注目してください。
Redisクライアントがサブスクライバーモードに入ると、それ以上のチャネルにサブスクライブするか、サブスクライブされたチャネルからサブスクライブを解除する以外の操作を実行できなくなることに注意してください。 つまり、各WebノードはRedisへの2つの接続を維持する必要があります。1つはサブスクライバーとしてRedisに接続し、もう1つはチャネルでメッセージを公開して、それらのチャネルにサブスクライブしているWebノードがメッセージを受信できるようにします。
リアルタイムでスケーラブル
舞台裏で何が起こっているのかを探る前に、リポジトリのクローンを作成しましょう。
mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...
…そしてそれをコンパイルします:
go build ./cmd/tonesad
アプリを実行するには、まず.envという名前のファイルを作成します(できればファイルenv-sample.txtをコピーして)。
cp env-sample.txt .env
.envファイルに必要なすべての環境変数を入力します。
MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}
最後に、ビルドされたバイナリを実行します。
PORT=9091 ./tonesad -env-file=.env
これで、Webノードが実行され、http:// localhost:9091からアクセスできるようになります。
水平方向にスケーリングしても機能するかどうかをテストするには、異なるポート番号で起動して複数のWebノードを起動します。
PORT=9092 ./tonesad -env-file=.env
PORT=9093 ./tonesad -env-file=.env
…そして、対応するURL(http:// localhost:9092およびhttp:// localhost:9093)を介してそれらにアクセスします。
舞台裏
アプリの開発のすべてのステップを実行する代わりに、最も重要な部分のいくつかに焦点を当てます。 これらのすべてがRedisPub/ Subとそのリアルタイムの影響に100%関連しているわけではありませんが、アプリの全体的な構造に関連しているため、深く掘り下げていくとわかりやすくなります。
簡単にするために、ユーザー認証については気にしません。 アップロードは匿名で行われ、URLを知っているすべての人が利用できます。 すべての視聴者はメッセージを送信でき、自分のエイリアスを選択することができます。 適切な認証メカニズムとプライバシー機能を適応させることは簡単なことであり、この記事の範囲を超えています。
データの永続化
これは簡単です。
ユーザーが画像をアップロードするたびに、Amazon S3に画像を保存し、MongoDBに2つのID(1つはBSONオブジェクトID(MongoDBのお気に入り)、もう1つは短い8文字の長さのID(やや見た目が良い))に対してパスを保存します。 これはデータベースの「uploads」コレクションに入り、次のような構造になります。
type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }
[種類]フィールドは、この「アップロード」に含まれるメディアの種類を示すために使用されます。 これは、画像以外のメディアをサポートしているという意味ですか? 残念だけど違う。 ただし、ここでは必ずしも画像に限定されないことを思い出させるために、フィールドはそこに残されています。
ユーザーが互いにメッセージを送信すると、それらは別のコレクションに保存されます。 はい、あなたはそれを推測しました:「メッセージ」。
type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }
ここでの唯一の興味深い点は、メッセージを特定のアップロードに関連付けるために使用されるUploadIDフィールドです。
APIエンドポイント
このアプリケーションには、基本的に3つのエンドポイントがあります。
POST / api / uploads
このエンドポイントのハンドラーは、「ファイル」フィールドに画像を含む「multipart/form-data」送信を想定しています。 ハンドラーの動作はおおまかに次のとおりです。

func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }
Goでは、すべてのエラーを明示的に処理する必要があります。 これはプロトタイプで行われていますが、重要な部分に焦点を当て続けるために、この記事のスニペットからは省略されています。
このAPIエンドポイントのハンドラーでは、基本的にファイルを読み取っていますが、そのサイズを特定の値に制限しています。 アップロードがこの値を超えると、リクエストは拒否されます。 それ以外の場合は、BSON IDが生成され、アップロードエンティティをMongoDBに永続化する前に、画像をAmazonS3にアップロードするために使用されます。
BSONオブジェクトIDの生成方法には賛否両論があります。 それらはクライアント側で生成されます。 ただし、オブジェクトIDの生成に使用される戦略では、衝突の可能性が非常に小さいため、クライアント側で安全に生成できます。 一方、生成されたオブジェクトIDの値は通常シーケンシャルであり、AmazonS3はあまり好きではありません。 これに対する簡単な回避策は、ファイル名の前にランダムな文字列を付けることです。
GET / api / uploads / {id} / messages
このAPIは、最近のメッセージ、および特定の時間後に投稿されたメッセージをフェッチするために使用されます。
func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }
ユーザーのブラウザーは、ユーザーが現在見ているアップロードの新しいメッセージについて通知を受けると、このエンドポイントを使用して新しいメッセージをフェッチします。
POST / api / uploads / {id} / messages
そして最後に、メッセージを作成して全員に通知するハンドラー:
func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }
このハンドラーは他のハンドラーと非常に似ているため、ここに含めるのはほとんど退屈です。 またはそれは? 関数の最後に関数呼び出しhub.Emit()があることに注意してください。 あなたが言うハブは何ですか? ここですべてのパブ/サブマジックが発生します。
ハブ:WebSocketがRedisと出会う場所
ハブは、WebSocketをRedisのPub/Subチャネルに接着する場所です。 また、偶然にも、Webサーバー内でWebSocketを処理するために使用しているパッケージはglueと呼ばれています。
Hubは基本的に、接続されているすべてのWebSocketと、関心のあるすべてのチャネルとの間のマッピングを作成するいくつかのデータ構造を維持します。たとえば、特定のアップロードされた画像を指すユーザーのブラウザータブのWebSocketは、関連するすべての通知に当然関心があるはずです。それに。
ハブパッケージは、次の6つの機能を実装します。
- 購読
- UnsubscribeAll
- エミット
- EmitLocal
- InitHub
- HandleSocket
購読および購読解除すべて
func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }
この関数は、このパッケージの他のほとんどの関数と同様に、実行中に読み取り/書き込みミューテックスのロックを保持します。 これは、プリミティブデータ構造変数ソケットとトピックを安全に変更できるようにするためです。 最初の変数socketsは、ソケットをチャネル名にマップし、2番目の変数topicsは、チャネル名をソケットにマップします。 この関数では、これらのマッピングを作成します。 ソケットが新しいチャネル名にサブスクライブしているのを確認するたびに、Redis接続subconnを作成し、 subconn.Subscribeを使用してRedisでそのチャネルにサブスクライブします。 これにより、Redisはそのチャネル上のすべての通知をこのWebノードに転送します。
同様に、 UnsubscribeAll関数では、マッピングを破棄します。
func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }
特定のチャネルに関係するデータ構造から最後のソケットを削除すると、 subconn.Unsubscribeを使用してRedisのチャネルからサブスクライブを解除します。
エミット
func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }
この関数は、Redisへの公開接続を使用してチャネルtにメッセージmを公開します。
EmitLocal
func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }
InitHub
func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }
InitHub関数では、Redisへの2つの接続を作成しています。1つはこのWebノードが関心のあるチャネルにサブスクライブするためのもので、もう1つはメッセージを公開するためのものです。 接続が確立されると、Redisへのサブスクライバー接続を介してメッセージを受信するのを待機しているループで新しいGoルーチンを開始します。 メッセージを受信するたびに、ローカルで(つまり、このWebノードに接続されているすべてのWebSocketに)メッセージを送信します。
HandleSocket
そして最後に、 HandleSocketは、メッセージがWebSocketを通過するのを待つか、接続が閉じた後にクリーンアップする場所です。
func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }
フロントエンドJavaScript
接着剤には独自のフロントエンドJavaScriptライブラリが付属しているため、WebSocketの処理(またはWebSocketが利用できない場合のXHRポーリングへのフォールバック)がはるかに簡単になります。
var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)
クライアント側では、WebSocketを介して着信するメッセージをリッスンしています。 接着剤はすべてのメッセージを文字列として送信するため、特定のパターンを使用してすべての情報をエンコードします。
- 新しいメッセージ:「message:{messageID}」
- 画像をクリックします:「touch:{coordX}、{coordY}」。ここで、coordXとcoordYは、画像上のユーザーのクリック位置のパーセンテージベースの座標です。
ユーザーが新しいメッセージを作成するとき、「POST / api / uploads / {uploadID}/messages」APIを使用して新しいメッセージを作成します。 これは、メッセージのバックボーンコレクションでcreateメソッドを使用して行われます。
messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })
ユーザーが画像をクリックすると、クリックの位置が画像の幅と高さのパーセンテージで計算され、WebSocketを介して直接情報が送信されます。
socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())
概要
ユーザーがメッセージを入力してEnterキーを押すと、クライアントは「POST / api / uploads / {id}/messages」APIエンドポイントを呼び出します。 これにより、データベースにメッセージエンティティが作成され、ハブパッケージを介してチャネル「upload:{uploadID}」のRedis Pub / Subを介して文字列「message:{messageID}」がパブリッシュされます。
Redisは、この文字列をチャネル「upload:{uploadID}」に関心のあるすべてのWebノード(サブスクライバー)に転送します。 この文字列を受信するWebノードは、チャネルに関連するすべてのWebSocketを反復処理し、WebSocket接続を介して文字列をクライアントに送信します。 この文字列を受信したクライアントは、「GET / api / uploads / {id}/messages」を使用してサーバーから新しいメッセージのフェッチを開始します。
同様に、画像でクリックイベントを伝播する場合、クライアントはWebSocketを介して「touchupload:{uploadID}{coordX}{coordY}」のようなメッセージを直接送信します。 このメッセージは、チャネル「upload:{uploadID}」で公開されるハブパッケージで終了します。 その結果、文字列はアップロードされた画像を見ているすべてのユーザーに配布されます。 クライアントは、この文字列を受信すると、それを解析して座標を抽出し、徐々に消えていく円をレンダリングして、クリック位置を瞬間的に強調表示します。
要約
この記事では、パブリッシュ/サブスクライブパターンが、リアルタイムWebアプリのスケーリングの問題を比較的簡単に大幅に解決するのにどのように役立つかを垣間見ることができました。
サンプルアプリは、Redis Pub/Subを試すための遊び場として機能するために存在します。 ただし、前述のように、アイデアは他のほとんどすべての一般的なプログラミング言語で実装できます。