如何使用 Tornado 創建一個簡單的 Python WebSocket 服務器

已發表: 2022-03-11

隨著實時 Web 應用程序的普及,WebSockets 已成為其實現的關鍵技術。 您必須不斷按下重新加載按鈕才能從服務器接收更新的日子已經一去不復返了。 想要提供實時更新的 Web 應用程序不再需要輪詢服務器以獲取更改 - 相反,服務器會在更改發生時將更改推送到流中。 強大的 Web 框架已經開始支持開箱即用的 WebSockets。 例如,Ruby on Rails 5 更進一步,增加了對動作電纜的支持。

在 Python 世界中,存在許多流行的 Web 框架。 諸如 Django 之類的框架幾乎提供了構建 Web 應用程序所需的一切,它所缺少的任何東西都可以用 Django 可用的數千個插件之一來彌補。 然而,由於 Python 或其大多數 Web 框架的工作方式,處理長期存在的連接很快就會成為一場噩夢。 線程模型和全局解釋器鎖通常被認為是 Python 的致命弱點。

但所有這一切都開始改變了。 借助 Python 3 的某些新功能和 Python 已經存在的框架(例如 Tornado),處理長壽命連接不再是一個挑戰。 Tornado 在 Python 中提供了 Web 服務器功能,在處理長連接時特別有用。

在本文中,我們將了解如何使用 Tornado 在 Python 中構建簡單的 WebSocket 服務器。 演示應用程序將允許我們上傳製表符分隔值 (TSV) 文件,對其進行解析並使其內容在唯一的 URL 上可用。

Tornado 和 WebSockets

Tornado 是一個異步網絡庫,專門處理事件驅動的網絡。 由於它自然可以同時容納數万個打開的連接,因此服務器可以利用這一點並在單個節點內處理大量 WebSocket 連接。 WebSocket 是一種通過單個 TCP 連接提供全雙工通信通道的協議。 由於它是一個開放式套接字,因此該技術使 Web 連接成為有狀態的,並促進與服務器之間的實時數據傳輸。 服務器保持客戶端的狀態,可以輕鬆實現基於 WebSockets 的實時聊天應用或網頁遊戲。

WebSockets 旨在在 Web 瀏覽器和服務器中實現,目前在所有主要的 Web 瀏覽器中都受支持。 一個連接打開一次,消息可以在連接關閉之前來回傳輸多次。

安裝 Tornado 相當簡單。 它在 PyPI 中列出,可以使用 pip 或 easy_install 安裝:

 pip install tornado

Tornado 帶有自己的 WebSockets 實現。 就本文而言,這幾乎就是我們所需要的。

WebSockets 在行動

使用 WebSocket 的優點之一是它的狀態屬性。 這改變了我們通常認為客戶端-服務器通信的方式。 這種情況的一個特殊用例是服務器需要執行長時間緩慢的進程並逐漸將結果流式傳輸回客戶端。

在我們的示例應用程序中,用戶將能夠通過 WebSocket 上傳文件。 在連接的整個生命週期內,服務器會將解析後的文件保留在內存中。 根據請求,服務器可以將文件的一部分發送回前端。 此外,該文件將在一個 URL 上可用,然後多個用戶可以查看該 URL。 如果在同一 URL 上上傳了另一個文件,查看它的每個人都將能夠立即看到新文件。

對於前端,我們將使用 AngularJS。 這個框架和庫將使我們能夠輕鬆處理文件上傳和分頁。 然而,對於與 WebSockets 相關的所有內容,我們將使用標準的 JavaScript 函數。

這個簡單的應用程序將分解為三個單獨的文件:

  • parser.py:我們的 Tornado 服務器與請求處理程序實現的地方
  • templates/index.html:前端 HTML 模板
  • static/parser.js:用於我們的前端 JavaScript

打開一個 WebSocket

在前端,可以通過實例化 WebSocket 對象來建立 WebSocket 連接:

 new WebSocket(WEBSOCKET_URL);

這是我們在頁面加載時必須做的事情。 實例化 WebSocket 對像後,必須附加處理程序以處理三個重要事件:

  • open:建立連接時觸發
  • message:從服務器接收到消息時觸發
  • close:連接關閉時觸發
$scope.init = function() { $scope.ws = new WebSocket('ws://' + location.host + '/parser/ws'); $scope.ws.binaryType = 'arraybuffer'; $scope.ws.onopen = function() { console.log('Connected.') }; $scope.ws.onmessage = function(evt) { $scope.$apply(function () { message = JSON.parse(evt.data); $scope.currentPage = parseInt(message['page_no']); $scope.totalRows = parseInt(message['total_number']); $scope.rows = message['data']; }); }; $scope.ws.onclose = function() { console.log('Connection is closed...'); }; } $scope.init();

由於這些事件處理程序不會自動觸發 AngularJS 的 $scope 生命週期,因此處理程序函數的內容需要包裝在 $apply 中。 如果您有興趣,AngularJS 的特定包可以更容易地將 WebSocket 集成到 AngularJS 應用程序中。

值得一提的是,斷開的 WebSocket 連接不會自動重新建立,並且會在觸發關閉事件處理程序時要求應用程序嘗試重新連接。 這有點超出了本文的範圍。

選擇要上傳的文件

由於我們正在使用 AngularJS 構建單頁應用程序,因此嘗試以古老的方式提交帶有文件的表單是行不通的。 為了讓事情變得更簡單,我們將使用 Danial Farid 的 ng-file-upload 庫。 使用它,我們需要做的就是允許用戶上傳文件,只需在前端模板中添加一個帶有特定 AngularJS 指令的按鈕:

 <button class="btn btn-default" type="file" ngf-select="uploadFile($file, $invalidFiles)" accept=".tsv" ngf-max-size="10MB">Select File</button>

該庫允許我們設置可接受的文件擴展名和大小。 單擊此按鈕,就像任何<input type=”file”>元素一樣,將打開標准文件選擇器。

上傳文件

當您要傳輸二進制數據時,您可以在數組緩衝區和 blob 中進行選擇。 如果它只是像圖像文件這樣的原始數據,請選擇 blob 並在服務器中正確處理。 數組緩衝區用於固定長度的二進制緩衝區,像TSV這樣的文本文件可以以字節串的格式傳輸。 此代碼段顯示如何以數組緩衝區格式上傳文件。

 $scope.uploadFile = function(file, errFiles) { ws = $scope.ws; $scope.f = file; $scope.errFile = errFiles && errFiles[0]; if (file) { reader = new FileReader(); rawData = new ArrayBuffer(); reader.onload = function(evt) { rawData = evt.target.result; ws.send(rawData); } reader.readAsArrayBuffer(file); } }

ng-file-upload 指令提供了一個 uploadFile 函數。 在這裡,您可以使用 FileReader 將文件轉換為數組緩衝區,並通過 WebSocket 發送。

請注意,通過 WebSocket 通過將大文件讀入數組緩衝區來發送大文件可能不是上傳它們的最佳方式,因為它會很快佔用大量內存,從而導致糟糕的體驗。

在服務器上接收文件

桌子

Tornado 使用 4 位操作碼確定消息類型,並返回 str 表示二進制數據和 unicode 表示文本。

 if opcode == 0x1: # UTF-8 data self._message_bytes_in += len(data) try: decoded = data.decode("utf-8") except UnicodeDecodeError: self._abort() return self._run_callback(self.handler.on_message, decoded) elif opcode == 0x2: # Binary data self._message_bytes_in += len(data) self._run_callback(self.handler.on_message, data)

在 Tornado Web 服務器中,數組緩衝區以 str 類型接收。

在此示例中,我們期望的內容類型是 TSV,因此文件被解析並轉換為字典。 當然,在實際應用中,有更明智的方法來處理任意上傳。

 def make_message(self, page_no=1): page_size = 100 return { "page_no": page_no, "total_number": len(self.rows), "data": self.rows[page_size * (page_no - 1):page_size * page_no] } def on_message(self, message): if isinstance(message, str): self.rows = [csv.reader([line], delimiter="\t").next() for line in (x.strip() for x in message.splitlines()) if line] self.write_message(self.make_message())

請求頁面

由於我們的目標是在小頁面塊中顯示上傳的 TSV 數據,因此我們需要一種請求特定頁面的方法。 為了簡單起見,我們將簡單地使用相同的 WebSocket 連接將頁碼發送到我們的服務器。

 $scope.pageChanged = function() { ws = $scope.ws; ws.send($scope.currentPage); }

服務器將以 unicode 格式接收此消息:

 def on_message(self, message): if isinstance(message, unicode): page_no = int(message) self.write_message(self.make_message(page_no))

嘗試使用來自 Tornado WebSocket 服務器的 dict 進行響應將自動將其編碼為 JSON 格式。 所以只發送一個包含 100 行內容的 dict 是完全可以的。

與他人共享訪問權限

為了能夠與多個用戶共享對同一上傳的訪問權限,我們需要能夠唯一地識別上傳。 每當用戶通過 WebSocket 連接到服務器時,都會生成一個隨機 UUID 並將其分配給他們的連接。

 def open(self, doc_uuid=None): if doc_uuid is None: self.uuid = str(uuid.uuid4())

uuid.uuid4()生成隨機 UUID,str() 將 UUID 轉換為標準格式的十六進制數字字符串。

如果另一個具有 UUID 的用戶連接到服務器,則將相應的 FileHandler 實例添加到以 UUID 為鍵的字典中,並在連接關閉時刪除。

 @classmethod @tornado.gen.coroutine def add_clients(cls, doc_uuid, client): with (yield lock.acquire()): if doc_uuid in cls.clients: clients_with_uuid = FileHandler.clients[doc_uuid] clients_with_uuid.append(client) else: FileHandler.clients[doc_uuid] = [client] @classmethod @tornado.gen.coroutine def remove_clients(cls, doc_uuid, client): with (yield lock.acquire()): if doc_uuid in cls.clients: clients_with_uuid = FileHandler.clients[doc_uuid] clients_with_uuid.remove(client) if len(clients_with_uuid) == 0: del cls.clients[doc_uuid]

同時添加或刪除客戶端時,客戶端字典可能會引發 KeyError。 由於 Tornado 是一個異步網絡庫,它為同步提供了鎖定機制。 帶有協程的簡單鎖適合這種處理客戶端字典的情況。

如果任何用戶上傳文件或在頁面之間移動,所有具有相同 UUID 的用戶都會查看相同的頁面。

 @classmethod def send_messages(cls, doc_uuid): clients_with_uuid = cls.clients[doc_uuid] message = cls.make_message(doc_uuid) for client in clients_with_uuid: try: client.write_message(message) except: logging.error("Error sending message", exc_info=True)

跑在 Nginx 後面

實現 WebSockets 非常簡單,但是在生產環境中使用它時需要考慮一些棘手的事情。 Tornado 是一個 Web 服務器,因此它可以直接獲取用戶的請求,但出於多種原因,將其部署在 Nginx 後面可能是更好的選擇。 然而,通過 Nginx 使用 WebSocket 需要付出更多的努力:

 http { upstream parser { server 127.0.0.1:8080; } server { location ^~ /parser/ws { proxy_pass http://parser; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } } }

兩個proxy_set_header指令使 Nginx 將必要的標頭傳遞給後端服務器,這是升級到 WebSocket 連接所必需的。

下一步是什麼?

在本文中,我們實現了一個簡單的 Python Web 應用程序,它使用 WebSockets 來維護服務器和每個客戶端之間的持久連接。 使用像 Tornado 這樣的現代異步網絡框架,在 Python 中同時保持數以萬計的打開連接是完全可行的。

儘管此演示應用程序的某些實現方面可能會有所不同,但我希望它仍然有助於演示 https://www.toptal.com/tornado 框架中 WebSockets 的使用。 演示應用程序的源代碼可在 GitHub 上獲得。

相關: Python 多線程教程:並發和並行