เรียลไทม์กับ Redis Pub/Sub

เผยแพร่แล้ว: 2022-03-11

การปรับขนาดเว็บแอปมักเป็นความท้าทายที่น่าสนใจ โดยไม่คำนึงถึงความซับซ้อนที่เกี่ยวข้อง อย่างไรก็ตาม เว็บแอปแบบเรียลไทม์ก่อให้เกิดปัญหาด้านความสามารถในการปรับขนาดที่ไม่เหมือนใคร ตัวอย่างเช่น เพื่อให้สามารถปรับขนาดเว็บแอปข้อความที่ใช้ WebSockets เพื่อสื่อสารกับไคลเอ็นต์ในแนวนอนได้ จะต้องซิงโครไนซ์โหนดเซิร์ฟเวอร์ทั้งหมดด้วยวิธีใดวิธีหนึ่ง หากแอปไม่ได้สร้างขึ้นโดยคำนึงถึงสิ่งนี้ การปรับขนาดในแนวนอนอาจไม่ใช่ตัวเลือกที่ง่าย

ในบทความนี้ เราจะอธิบายเกี่ยวกับสถาปัตยกรรมของเว็บแอปการแชร์รูปภาพและการรับส่งข้อความแบบเรียลไทม์ที่เรียบง่าย ในที่นี้ เราจะเน้นที่องค์ประกอบต่างๆ เช่น Redis Pub/Sub ที่เกี่ยวข้องกับการสร้างแอปแบบเรียลไทม์และดูว่าองค์ประกอบทั้งหมดมีบทบาทอย่างไรในสถาปัตยกรรมโดยรวม

เรียลไทม์กับ Redis Pub/Sub

เรียลไทม์กับ Redis Pub/Sub
ทวีต

ในแง่ของการใช้งาน แอพพลิเคชั่นนั้นเบามาก อนุญาตให้อัปโหลดรูปภาพและแสดงความคิดเห็นแบบเรียลไทม์บนรูปภาพเหล่านั้น นอกจากนี้ ผู้ใช้ทุกคนสามารถแตะที่รูปภาพและผู้ใช้รายอื่นจะสามารถเห็นเอฟเฟกต์คลื่นบนหน้าจอของพวกเขา

ซอร์สโค้ดทั้งหมดของแอพนี้มีอยู่ใน GitHub

สิ่งที่เราต้องการ

ไป

เราจะใช้ภาษาโปรแกรม Go ไม่มีเหตุผลพิเศษว่าทำไมเราถึงเลือก Go สำหรับบทความนี้ นอกจากว่าวากยสัมพันธ์ของ Go นั้นสะอาดและความหมายของมันง่ายต่อการติดตาม และแน่นอนว่ามีอคติของผู้เขียน อย่างไรก็ตาม แนวคิดทั้งหมดที่กล่าวถึงในบทความนี้สามารถแปลเป็นภาษาที่คุณเลือกได้อย่างง่ายดาย

การเริ่มต้นใช้งาน Go เป็นเรื่องง่าย สามารถดาวน์โหลดการแจกแจงแบบไบนารีได้จากเว็บไซต์ทางการ ในกรณีที่คุณใช้ Windows จะมีตัวติดตั้ง MSI สำหรับ Go ในหน้าดาวน์โหลด หรือในกรณีที่ระบบปฏิบัติการของคุณ (โชคดี) มีตัวจัดการแพ็คเกจ:

อาร์คลินุกซ์:

 pacman -S go

อูบุนตู:

 apt-get install golang

Mac OS X:

 brew install go

อันนี้จะใช้ได้ก็ต่อเมื่อเราติดตั้ง Homebrew ไว้

MongoDB

ทำไมต้องใช้ MongoDB ถ้าเรามี Redis คุณถาม? ดังที่ได้กล่าวไว้ก่อนหน้านี้ Redis เป็นที่เก็บข้อมูลในหน่วยความจำ แม้ว่าจะสามารถเก็บข้อมูลไว้ในดิสก์ได้ แต่การใช้ Redis เพื่อจุดประสงค์นั้นอาจไม่ใช่วิธีที่ดีที่สุด เราจะใช้ MongoDB เพื่อจัดเก็บข้อมูลเมตาดาต้าและข้อความที่อัปโหลด

เราสามารถดาวน์โหลด MongoDB ได้จากเว็บไซต์ทางการของพวกเขา ในลีนุกซ์รุ่นบางรุ่น นี่เป็นวิธีที่แนะนำในการติดตั้ง MongoDB มันควรจะยังสามารถติดตั้งได้โดยใช้ตัวจัดการแพ็คเกจของการแจกจ่ายส่วนใหญ่

อาร์คลินุกซ์:

 pacman -S mongodb

อูบุนตู:

 apt-get install mongodb

Mac OS X:

 brew install mongodb

ภายในรหัส Go ของเรา เราจะใช้แพ็คเกจ mgo (ออกเสียงว่ามะม่วง) ไม่เพียงแต่จะทดสอบในสนามรบเท่านั้น แต่แพ็คเกจไดรเวอร์ยังนำเสนอ API ที่เรียบง่ายและสะอาดตาอีกด้วย

หากคุณไม่ใช่ผู้เชี่ยวชาญ MongoDB ไม่ต้องกังวลเลย การใช้บริการฐานข้อมูลนี้มีน้อยในแอปตัวอย่างของเรา และแทบไม่เกี่ยวข้องกับจุดสนใจของบทความนี้เลย: สถาปัตยกรรม Pub/Sub

อเมซอน S3

เราจะใช้ Amazon S3 เพื่อจัดเก็บภาพที่ผู้ใช้อัปโหลด ไม่มีอะไรให้ทำมากนักที่นี่ ยกเว้นตรวจสอบให้แน่ใจว่าเรามีบัญชีที่พร้อมใช้งานของ Amazon Web Services และสร้างที่ฝากข้อมูลชั่วคราว

การจัดเก็บไฟล์ที่อัปโหลดไปยังดิสก์ในเครื่องไม่ใช่ตัวเลือกเพราะเราไม่ต้องการพึ่งพาข้อมูลประจำตัวของโหนดเว็บของเราไม่ว่าด้วยวิธีใด เราต้องการให้ผู้ใช้สามารถเชื่อมต่อกับเว็บโหนดใด ๆ ที่มีอยู่และยังสามารถดูเนื้อหาเดียวกันได้

ในการโต้ตอบกับบัคเก็ต Amazon S3 จากโค้ด Go เราจะใช้ AdRoll/goamz ซึ่งเป็นส่วนย่อยของแพ็กเกจ goamz ของ Canonical ซึ่งมีความแตกต่างบางประการ

Redis

สุดท้าย แต่ไม่ท้ายสุด: Redis เราสามารถติดตั้งได้โดยใช้ตัวจัดการแพ็คเกจของการแจกจ่ายของเรา:

อาร์คลินุกซ์:

 pacman -S redis

อูบุนตู:

 apt-get install redis-server

Mac OS X:

 brew install redis

หรือดึงซอร์สโค้ดและคอมไพล์ด้วยตัวเอง Redis ไม่มีการพึ่งพาอื่นนอกจาก GCC และ libc สำหรับการสร้าง:

 wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make

เมื่อติดตั้งและใช้งาน Redis แล้ว ให้เริ่มเทอร์มินัลแล้วป้อน CLI ของ Redis:

 redis-cli

ลองป้อนคำสั่งต่อไปนี้และดูว่าคุณได้รับผลลัพธ์ที่ต้องการหรือไม่:

 SET answer 41 INCR answer GET answer

คำสั่งแรกเก็บ "41" เทียบกับคีย์ "คำตอบ" คำสั่งที่สองเพิ่มค่า คำสั่งที่สามพิมพ์ค่าที่เก็บไว้กับคีย์ที่กำหนด ผลลัพธ์ควรอ่านว่า “42”

คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับคำสั่งทั้งหมดที่ Redis รองรับได้จากเว็บไซต์ทางการ

เราจะใช้แพ็คเกจ Go Redigo เพื่อเชื่อมต่อกับ Redis จากภายในรหัสแอปของเรา

แอบดู Redis Pub/Sub

รูปแบบการเผยแพร่สมัครสมาชิกเป็นวิธีส่งข้อความไปยังผู้ส่งตามจำนวนที่กำหนด ผู้ส่งข้อความเหล่านี้ (ผู้เผยแพร่) ไม่ได้ระบุผู้รับที่เป็นเป้าหมายอย่างชัดเจน แต่ข้อความจะถูกส่งออกไปในช่องที่ผู้รับ (สมาชิก) จำนวนเท่าใดก็ได้สามารถรอพวกเขาอยู่

การกำหนดค่าการเผยแพร่สมัครอย่างง่าย

ในกรณีของเรา เราสามารถมีเว็บโหนดจำนวนเท่าใดก็ได้ที่ทำงานอยู่เบื้องหลังโหลดบาลานเซอร์ ในช่วงเวลาใดก็ตาม ผู้ใช้สองคนที่ดูภาพเดียวกันอาจไม่สามารถเชื่อมต่อกับโหนดเดียวกันได้ นี่คือที่มาของ Redis Pub/Sub เมื่อใดก็ตามที่เว็บโหนดต้องการสังเกตการเปลี่ยนแปลง (เช่น ข้อความใหม่ถูกสร้างขึ้นโดยผู้ใช้) จะใช้ Redis Pub/Sub เพื่อเผยแพร่ข้อมูลนั้นไปยังโหนดเว็บที่เกี่ยวข้องทั้งหมด ซึ่งในทางกลับกันจะเผยแพร่ข้อมูลไปยังไคลเอนต์ที่เกี่ยวข้องเพื่อที่พวกเขาจะได้ดึงรายการอัพเดทของ messageredis

เนื่องจากรูปแบบการเผยแพร่-สมัครรับข้อมูลช่วยให้เราสามารถส่งข้อความบนช่องที่มีชื่อ เราจึงสามารถให้แต่ละเว็บโหนดเชื่อมต่อกับ Redis และสมัครรับข้อมูลเฉพาะช่องที่ผู้ใช้ที่เชื่อมต่อของพวกเขาสนใจ ตัวอย่างเช่น หากผู้ใช้สองคนกำลังดู ภาพเดียวกันแต่เชื่อมต่อกับโหนดเว็บที่แตกต่างกันสองโหนดจากโหนดเว็บจำนวนมาก ดังนั้นมีเพียงสองโหนดเว็บเท่านั้นที่ต้องสมัครรับข้อมูลจากแชนเนลที่เกี่ยวข้อง ข้อความใดๆ ที่เผยแพร่บนแชนเนลนั้นจะถูกส่งไปยังโหนดเว็บทั้งสองนั้นเท่านั้น

ฟังดูดีเกินไปที่จะเป็นจริง? เราสามารถทดลองใช้ CLI ของ Redis เริ่มต้นสามอินสแตนซ์ของ redis-cli ดำเนินการคำสั่งต่อไปนี้ในอินสแตนซ์แรก:

 SUBSCRIBE somechannel

ดำเนินการคำสั่งต่อไปนี้ในอินสแตนซ์ Redis CLI ที่สอง:

 SUBSCRIBE someotherchannel

ดำเนินการคำสั่งต่อไปนี้ในอินสแตนซ์ที่สามของ Redis CLI:

 PUBLISH somechannel lorem PUBLISH someotherchannel ipsum

สังเกตว่าตัวอย่างแรกได้รับ "lorem" แต่ไม่ได้รับ "ipsum" และวิธีที่อินสแตนซ์ที่สองได้รับ "ipsum" แต่ไม่ได้รับ "lorem"

Redis Pub/Sub ในการดำเนินการ

เป็นมูลค่าการกล่าวขวัญว่าเมื่อไคลเอนต์ Redis เข้าสู่โหมดสมาชิก จะไม่สามารถดำเนินการใด ๆ นอกเหนือจากการสมัครรับข้อมูลจากช่องเพิ่มเติมหรือยกเลิกการสมัครรับข้อมูลจากช่อง ซึ่งหมายความว่าแต่ละโหนดของเว็บจะต้องรักษาการเชื่อมต่อกับ Redis สองครั้ง โหนดหนึ่งเชื่อมต่อกับ Redis ในฐานะสมาชิก และอีกโหนดเพื่อเผยแพร่ข้อความบนแชนเนล เพื่อให้เว็บโหนดใดๆ ที่สมัครรับข้อมูลจากแชนเนลเหล่านั้นสามารถรับได้

เรียลไทม์และปรับขนาดได้

ก่อนที่เราจะเริ่มสำรวจสิ่งที่เกิดขึ้นเบื้องหลัง ให้เราทำการโคลนที่เก็บ:

 mkdir tonesa cd tonesa export GOPATH=`pwd` mkdir -p src/github.com/hjr265/tonesa cd src/github.com/hjr265/tonesa git clone https://github.com/hjr265/tonesa.git . go get ./...

… และรวบรวมมัน:

 go build ./cmd/tonesad

ในการเรียกใช้แอป ก่อนอื่นให้สร้างไฟล์ชื่อ .env (ควรคัดลอกไฟล์ env-sample.txt):

 cp env-sample.txt .env

กรอกไฟล์ .env ด้วยตัวแปรสภาพแวดล้อมที่จำเป็นทั้งหมด:

 MONGO_URL=mongodb://127.0.0.1/tonesa REDIS_URL=redis://127.0.0.1 AWS_ACCESS_KEY_ID={Your-AWS-Access-Key-ID-Goes-Here} AWS_SECRET_ACCESS_KEY={And-Your-AWS-Secret-Access-Key} S3_BUCKET_NAME={And-S3-Bucket-Name}

ในที่สุดก็รันไบนารีที่สร้างขึ้น:

 PORT=9091 ./tonesad -env-file=.env

โหนดเว็บควรจะทำงานอยู่ในขณะนี้และสามารถเข้าถึงได้ผ่าน http://localhost:9091

ตัวอย่างสด

หากต้องการทดสอบว่ายังคงใช้งานได้เมื่อปรับขนาดในแนวนอน ให้สร้างโหนดเว็บหลายโหนดโดยเริ่มด้วยหมายเลขพอร์ตที่แตกต่างกัน:

 PORT=9092 ./tonesad -env-file=.env
 PORT=9093 ./tonesad -env-file=.env

… และเข้าถึงได้ผ่าน URL ที่เกี่ยวข้อง: http://localhost:9092 และ http://localhost:9093

ตัวอย่างสด

เบื้องหลัง

แทนที่จะทำทุกขั้นตอนในการพัฒนาแอพ เราจะเน้นที่ส่วนที่สำคัญที่สุดบางส่วน แม้ว่าจะไม่ใช่ทั้งหมดที่เกี่ยวข้องกับ Redis Pub/Sub และนัยตามเวลาจริง 100% แต่ก็ยังมีความเกี่ยวข้องกับโครงสร้างโดยรวมของแอป และจะทำให้ติดตามได้ง่ายขึ้นเมื่อเราเจาะลึกลงไปอีก

เพื่อให้ง่ายขึ้น เราจะไม่กังวลเกี่ยวกับการตรวจสอบผู้ใช้ การอัปโหลดจะไม่ระบุชื่อและใช้ได้กับทุกคนที่ทราบ URL ผู้ชมทุกคนสามารถส่งข้อความและสามารถเลือกนามแฝงของตนเองได้ การปรับกลไกการตรวจสอบสิทธิ์ที่เหมาะสมและความสามารถด้านความเป็นส่วนตัวควรเป็นเรื่องเล็กน้อย และอยู่นอกเหนือขอบเขตของบทความนี้

ข้อมูลที่คงอยู่

อันนี้ง่าย

เมื่อใดก็ตามที่ผู้ใช้อัปโหลดภาพ เราจะจัดเก็บไว้ใน Amazon S3 แล้วเก็บเส้นทางไปยัง MongoDB โดยเทียบกับสอง ID: BSON Object ID หนึ่งรายการ (รายการโปรดของ MongoDB) และ ID แบบสั้นยาว 8 อักขระอีกตัว (ค่อนข้างน่าพอใจ) สิ่งนี้จะเข้าไปในคอลเล็กชัน "อัปโหลด" ของฐานข้อมูลของเราและมีโครงสร้างดังนี้:

 type Upload struct { ID bson.ObjectId `bson:"_id"` ShortID string `bson:"shortID"` Kind Kind `bson:"kind"` Content Blob `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` } type Blob struct { Path string `bson:"path"` Size int64 `bson:"size"` }

ฟิลด์ ชนิด ใช้เพื่อระบุชนิดของสื่อที่ "อัปโหลด" นี้มี นี่หมายความว่าเราสนับสนุนสื่ออื่นที่ไม่ใช่รูปภาพหรือไม่ น่าเสียดายที่ แต่ฟิลด์ถูกทิ้งไว้ที่นั่นเพื่อทำหน้าที่เป็นเครื่องเตือนใจว่าเราไม่จำเป็นต้อง จำกัด เฉพาะรูปภาพที่นี่

เมื่อผู้ใช้ส่งข้อความถึงกัน จะถูกเก็บไว้ในคอลเลกชั่นที่แตกต่างกัน ใช่ คุณเดาได้แล้วว่า: "ข้อความ"

 type Message struct { ID bson.ObjectId `bson:"_id"` UploadID bson.ObjectId `bson:"uploadID"` AuthorName string `bson:"anonName"` Content string `bson:"content"` CreatedAt time.Time `bson:"createdAt"` ModifiedAt time.Time `bson:"modifiedAt"` }

สิ่งที่น่าสนใจเพียงอย่างเดียวที่นี่คือฟิลด์ UploadID ซึ่งใช้เพื่อเชื่อมโยงข้อความกับการอัปโหลดเฉพาะ

ปลายทาง API

แอปพลิเคชันนี้มีจุดสิ้นสุดสามจุด

POST /api/uploads

ตัวจัดการสำหรับปลายทางนี้คาดว่าจะส่ง "หลายส่วน/ข้อมูลแบบฟอร์ม" พร้อมรูปภาพในช่อง "ไฟล์" พฤติกรรมของตัวจัดการจะประมาณดังนี้:

 func HandleUploadCreate(w http.ResponseWriter, r *http.Request) { f, h, _ := r.FormFile("file") b := bytes.Buffer{} n, _ := io.Copy(&b, io.LimitReader(f, data.MaxUploadContentSize+10)) if n > data.MaxUploadContentSize { ServeBadRequest(w, r) return } id := bson.NewObjectId() upl := data.Upload{ ID: id, Kind: data.Image, Content: data.Blob{ Path: "/uploads/" + id.Hex(), Size: n, }, } data.Bucket.Put(upl.Content.Path, b.Bytes(), h.Header.Get("Content-Type"), s3.Private, s3.Options{}) upl.Put() // Respond with newly created upload entity (JSON encoded) }

Go ต้องการข้อผิดพลาดทั้งหมดเพื่อจัดการอย่างชัดเจน สิ่งนี้ได้ทำในต้นแบบแล้ว แต่ละเว้นจากตัวอย่างในบทความนี้เพื่อเน้นที่ส่วนที่สำคัญ

ในตัวจัดการจุดปลาย API นี้ เรากำลังอ่านไฟล์โดยพื้นฐานแล้ว แต่จำกัดขนาดของไฟล์ไว้ที่ค่าเฉพาะ หากการอัปโหลดเกินค่านี้ คำขอจะถูกปฏิเสธ มิฉะนั้น ระบบจะสร้าง BSON ID และใช้เพื่ออัปโหลดรูปภาพไปยัง Amazon S3 ก่อนที่จะยืนยันเอนทิตีการอัปโหลดไปยัง MongoDB

มีข้อดีและข้อเสียในการสร้างรหัสวัตถุ BSON พวกเขาจะถูกสร้างขึ้นบนฝั่งไคลเอ็นต์ อย่างไรก็ตาม กลยุทธ์ที่ใช้ในการสร้าง Object ID ทำให้ความน่าจะเป็นของการชนกันนั้นน้อยมากจนสามารถสร้างความปลอดภัยได้ทางฝั่งไคลเอ็นต์ ในทางกลับกัน ค่าของ Object ID ที่สร้างขึ้นมักจะเป็นแบบต่อเนื่องและนั่นคือสิ่งที่ Amazon S3 ไม่ค่อยชอบ วิธีแก้ปัญหาอย่างง่ายคือใส่ชื่อไฟล์นำหน้าด้วยสตริงสุ่ม

รับ /api/uploads/{id}/messages

API นี้ใช้เพื่อดึงข้อความล่าสุดและข้อความที่โพสต์หลังจากช่วงเวลาหนึ่ง

 func ServeMessageList(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } sinceStr := r.URL.Query().Get("since") var msgs []data.Message if sinceStr != "" { since, _ := time.Parse(time.RFC3339, sinceStr) msgs, _ = data.ListMessagesByUploadID(upl.ID, since, 16) } else { msgs, _ = data.ListRecentMessagesByUploadID(upl.ID, 16) } // Respond with message entities (JSON encoded) }

เมื่อเบราว์เซอร์ของผู้ใช้ได้รับแจ้งเกี่ยวกับข้อความใหม่ในการอัปโหลดที่ผู้ใช้กำลังดูอยู่ เบราว์เซอร์จะดึงข้อความใหม่โดยใช้ปลายทางนี้

POST /api/uploads/{id}/messages

และสุดท้าย ตัวจัดการที่สร้างข้อความและแจ้งให้ทุกคนทราบ:

 func HandleMessageCreate(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) idStr := vars["id"] if !bson.IsObjectIdHex(idStr) { ServeNotFound(w, r) return } upl, _ := data.GetUpload(bson.ObjectIdHex(idStr)) if upl == nil { ServeNotFound(w, r) return } body := Message{} json.NewDecoder(r.Body).Decode(&body) msg := data.Message{} msg.UploadID = upl.ID msg.AuthorName = body.AuthorName msg.Content = body.Content msg.Put() // Respond with newly created message entity (JSON encoded) hub.Emit("upload:"+upl.ID.Hex(), "message:"+msg.ID.Hex()) }

ตัวจัดการนี้คล้ายกับตัวจัดการอื่น ๆ ที่เกือบจะน่าเบื่อที่จะรวมไว้ที่นี่ หรือว่า? สังเกตว่ามีฟังก์ชัน call hub.Emit() ที่ส่วนท้ายสุดของฟังก์ชันอย่างไร อะไรคือฮับที่คุณพูด? นั่นคือที่ที่เวทมนตร์ Pub/Sub ทั้งหมดเกิดขึ้น

Hub: ที่ WebSockets พบกับ Redis

Hub คือที่ที่เราติด WebSockets กับช่อง Pub/Sub ของ Redis และบังเอิญว่าแพ็คเกจที่เราใช้เพื่อจัดการ WebSockets ภายในเว็บเซิร์ฟเวอร์ของเรานั้นเรียกว่ากาว

โดยพื้นฐานแล้ว Hub จะรักษาโครงสร้างข้อมูลบางอย่างที่สร้างการจับคู่ระหว่าง WebSockets ที่เชื่อมต่อทั้งหมดกับช่องทางทั้งหมดที่พวกเขาสนใจ ตัวอย่างเช่น WebSocket บนแท็บเบราว์เซอร์ของผู้ใช้ที่ชี้ไปที่รูปภาพที่อัปโหลดโดยเฉพาะควรสนใจการแจ้งเตือนทั้งหมดที่เกี่ยวข้อง ไปที่มัน

แพ็คเกจฮับดำเนินการหกฟังก์ชั่น:

  • ติดตาม
  • UnsubscribeAll
  • ปล่อย
  • EmitLocal
  • InitHub
  • HandleSocket

สมัครสมาชิกและยกเลิกการสมัครทั้งหมด

 func Subscribe(s *glue.Socket, t string) error { l.Lock() defer l.Unlock() _, ok := sockets[s] if !ok { sockets[s] = map[string]bool{} } sockets[s][t] = true _, ok = topics[t] if !ok { topics[t] = map[*glue.Socket]bool{} err := subconn.Subscribe(t) if err != nil { return err } } topics[t][s] = true return nil }

ฟังก์ชันนี้ เหมือนกับฟังก์ชันอื่นๆ ในแพ็คเกจนี้ ล็อค mutex แบบอ่าน/เขียนไว้ในขณะที่ดำเนินการ เพื่อให้เราสามารถปรับเปลี่ยนโครงสร้างข้อมูลดั้งเดิม ซ็อกเก็ต และ หัวข้อ ตัวแปรได้อย่างปลอดภัย ตัวแปรแรก, sockets , แม็พ sockets กับชื่อแชนเนล ในขณะที่ตัวที่สอง, topic , แม็พชื่อแชนเนลกับ sockets ในฟังก์ชันนี้ เราสร้างการแมปเหล่านี้ เมื่อใดก็ตามที่เราเห็น socket สมัครรับชื่อช่องใหม่ เราจะทำการเชื่อมต่อ Redis, subconn , สมัครสมาชิกช่องนั้นบน Redis โดยใช้ subconn.Subscribe สิ่งนี้ทำให้ Redis ส่งต่อการแจ้งเตือนทั้งหมดในช่องนั้นไปยังโหนดเว็บนี้

และในทำนองเดียวกัน ในฟังก์ชัน UnsubscribeAll เราฉีกการแมปลง:

 func UnsubscribeAll(s *glue.Socket) error { l.Lock() defer l.Unlock() for t := range sockets[s] { delete(topics[t], s) if len(topics[t]) == 0 { delete(topics, t) err := subconn.Unsubscribe(t) if err != nil { return err } } } delete(sockets, s) return nil }

เมื่อเราลบซ็อกเก็ตสุดท้ายออกจากโครงสร้างข้อมูลที่สนใจในช่องใดช่องหนึ่ง เราจะยกเลิกการสมัครรับข้อมูลจากช่องใน Redis โดยใช้ subconn.Unsubscribe

ปล่อย

 func Emit(t string, m string) error { _, err := pubconn.Do("PUBLISH", t, m) return err }

ฟังก์ชันนี้เผยแพร่ข้อความ m บนช่อง t โดยใช้การเชื่อมต่อเผยแพร่ไปยัง Redis

EmitLocal

 func EmitLocal(t string, m string) { l.RLock() defer l.RUnlock() for s := range topics[t] { s.Write(m) } }

InitHub

 func InitHub(url string) error { c, _ := redis.DialURL(url) pubconn = c c, _ = redis.DialURL(url) subconn = redis.PubSubConn{c} go func() { for { switch v := subconn.Receive().(type) { case redis.Message: EmitLocal(v.Channel, string(v.Data)) case error: panic(v) } } }() return nil }

ในฟังก์ชัน InitHub เรากำลังสร้างการเชื่อมต่อกับ Redis สองครั้ง: ช่องทางแรกสำหรับสมัครรับข้อมูลจากแชนเนลที่โหนดเว็บนี้สนใจ และอีกช่องทางหนึ่งเพื่อเผยแพร่ข้อความ เมื่อสร้างการเชื่อมต่อแล้ว เราจะเริ่มรูทีน Go ใหม่ด้วยการวนซ้ำที่รอรับข้อความผ่านการเชื่อมต่อสมาชิกไปยัง Redis อย่างถาวร ทุกครั้งที่ได้รับข้อความ ข้อความจะส่งข้อความในเครื่อง (เช่น ไปยัง WebSockets ทั้งหมดที่เชื่อมต่อกับโหนดเว็บนี้)

HandleSocket

และสุดท้าย HandleSocket เป็นที่ที่เรารอให้ข้อความผ่าน WebSockets หรือล้างข้อมูลหลังจากการเชื่อมต่อปิด:

 func HandleSocket(s *glue.Socket) { s.OnClose(func() { UnsubscribeAll(s) }) s.OnRead(func(data string) { fields := strings.Fields(data) if len(fields) == 0 { return } switch fields[0] { case "watch": if len(fields) != 2 { return } Subscribe(s, fields[1]) case "touch": if len(fields) != 4 { return } Emit(fields[1], "touch:"+fields[2]+","+fields[3]) } }) }

JavaScript ฟรอนต์เอนด์

เนื่องจากกาวมาพร้อมกับไลบรารี JavaScript ส่วนหน้าจึงง่ายกว่ามากในการจัดการ WebSockets (หรือทางเลือกสำรอง XHR เมื่อ WebSockets ไม่พร้อมใช้งาน):

 var socket = glue() socket.onMessage(function(data) { data = data.split(':') switch(data[0]) { case 'message': messages.fetch({ data: { since: _.first(messages.pluck('createdAt')) || '' }, add: true, remove: false }) break case 'touch': var coords = data[1].split(',') showTouchBubble(coords) break } }) socket.send('watch upload:'+upload.id)

ทางฝั่งไคลเอ็นต์ เรากำลังรับฟังข้อความที่ส่งเข้ามาทาง WebSocket เนื่องจากกาวส่งข้อความทั้งหมดเป็นสตริง เราจึงเข้ารหัสข้อมูลทั้งหมดในนั้นโดยใช้รูปแบบเฉพาะ:

  • ข้อความใหม่: “ข้อความ:{messageID}”
  • คลิกที่ภาพ: “touch:{coordX},{coordY}” โดยที่ coordX และ coordY เป็นพิกัดตามเปอร์เซ็นต์ของตำแหน่งที่ผู้ใช้คลิกบนรูปภาพ

เมื่อผู้ใช้สร้างข้อความใหม่ เราจะใช้ API “POST /api/uploads/{uploadID}/messages” เพื่อสร้างข้อความใหม่ ทำได้โดยใช้เมธอด create ในคอลเลกชั่นแบ็คโบนสำหรับข้อความ:

 messages.create({ authorName: $messageAuthorNameEl.val(), content: $messageContentEl.val(), createdAt: '' }, { at: 0 })

เมื่อผู้ใช้คลิกที่รูปภาพ เราจะคำนวณตำแหน่งของการคลิกเป็นเปอร์เซ็นต์ของความกว้างและความสูงของรูปภาพ และส่งข้อมูลผ่าน WebSocket โดยตรง

 socket.send('touch upload:'+upload.id+' '+(event.pageX - offset.left) / $contentImgEl.width()+' '+(event.pageY - offset.top) / $contentImgEl.height())

ภาพรวม

ภาพรวมสถาปัตยกรรมของแอปพลิเคชัน

เมื่อผู้ใช้พิมพ์ข้อความและกดปุ่ม Enter ไคลเอ็นต์จะเรียกใช้ปลายทาง API "POST /api/uploads/{id}/messages" ซึ่งจะสร้างเอนทิตีข้อความในฐานข้อมูลและเผยแพร่สตริง “message:{messageID}” ผ่าน Redis Pub/Sub ในช่อง “upload:{uploadID}” ผ่านแพ็คเกจฮับ

Redis ส่งต่อสตริงนี้ไปยังทุกโหนดของเว็บ (สมาชิก) ที่สนใจในช่อง “upload:{uploadID}” โหนดของเว็บที่ได้รับสตริงนี้วนซ้ำใน WebSockets ทั้งหมดที่เกี่ยวข้องกับแชนเนล และส่งสตริงไปยังไคลเอ็นต์ผ่านการเชื่อมต่อ WebSocket ลูกค้าที่ได้รับสตริงนี้จะเริ่มดึงข้อความใหม่จากเซิร์ฟเวอร์โดยใช้ "GET /api/uploads/{id}/messages"

ในทำนองเดียวกัน สำหรับการเผยแพร่เหตุการณ์การคลิกบนรูปภาพ ไคลเอ็นต์จะส่งข้อความโดยตรงผ่าน WebSocket ที่ดูเหมือน "การอัปโหลดด้วยการแตะ:{uploadID} {coordX} {coordY}" ข้อความนี้สิ้นสุดในแพ็คเกจฮับที่มีการเผยแพร่ในช่องเดียวกันของช่อง “upload:{uploadID}” ด้วยเหตุนี้ สตริงจึงถูกแจกจ่ายให้กับผู้ใช้ทุกคนที่ดูรูปภาพที่อัปโหลด เมื่อได้รับสตริงนี้ ลูกค้าจะแยกวิเคราะห์เพื่อแยกพิกัดและแสดงวงกลมที่ค่อยๆ จางลงเพื่อเน้นตำแหน่งการคลิกชั่วขณะ

สรุป

ในบทความนี้ เราได้เห็นเพียงแวบเดียวว่ารูปแบบการเผยแพร่และสมัครรับข้อมูลสามารถช่วยแก้ปัญหาการปรับขนาดเว็บแอปแบบเรียลไทม์ในระดับที่ดีและสะดวกมากได้อย่างไร

มีแอปตัวอย่างเพื่อใช้เป็นสนามเด็กเล่นสำหรับการทดลองกับ Redis Pub/Sub แต่ดังที่กล่าวไว้ก่อนหน้านี้ แนวคิดนี้สามารถนำไปใช้ในภาษาโปรแกรมยอดนิยมอื่นๆ ได้เกือบทั้งหมด