如何使用 Tornado 创建一个简单的 Python WebSocket 服务器
已发表: 2022-03-11随着实时 Web 应用程序的普及,WebSockets 已成为其实现的关键技术。 您必须不断按下重新加载按钮才能从服务器接收更新的日子已经一去不复返了。 想要提供实时更新的 Web 应用程序不再需要轮询服务器以获取更改 - 相反,服务器会在更改发生时将更改推送到流中。 强大的 Web 框架已经开始支持开箱即用的 WebSockets。 例如,Ruby on Rails 5 更进一步,增加了对动作电缆的支持。
在 Python 世界中,存在许多流行的 Web 框架。 诸如 Django 之类的框架几乎提供了构建 Web 应用程序所需的一切,它所缺少的任何东西都可以用 Django 可用的数千个插件之一来弥补。 然而,由于 Python 或其大多数 Web 框架的工作方式,处理长期存在的连接很快就会成为一场噩梦。 线程模型和全局解释器锁通常被认为是 Python 的致命弱点。
但所有这一切都开始改变了。 借助 Python 3 的某些新功能和 Python 已经存在的框架(例如 Tornado),处理长寿命连接不再是一个挑战。 Tornado 在 Python 中提供了 Web 服务器功能,在处理长连接时特别有用。
在本文中,我们将了解如何使用 Tornado 在 Python 中构建简单的 WebSocket 服务器。 演示应用程序将允许我们上传制表符分隔值 (TSV) 文件,对其进行解析并使其内容在唯一的 URL 上可用。
Tornado 和 WebSockets
Tornado 是一个异步网络库,专门处理事件驱动的网络。 由于它自然可以同时容纳数万个打开的连接,因此服务器可以利用这一点并在单个节点内处理大量 WebSocket 连接。 WebSocket 是一种通过单个 TCP 连接提供全双工通信通道的协议。 由于它是一个开放式套接字,因此该技术使 Web 连接成为有状态的,并促进与服务器之间的实时数据传输。 服务器保持客户端的状态,可以轻松实现基于 WebSockets 的实时聊天应用或网页游戏。
WebSockets 旨在在 Web 浏览器和服务器中实现,目前在所有主要的 Web 浏览器中都受支持。 一个连接打开一次,消息可以在连接关闭之前来回传输多次。
安装 Tornado 相当简单。 它在 PyPI 中列出,可以使用 pip 或 easy_install 安装:
pip install tornadoTornado 带有自己的 WebSockets 实现。 就本文而言,这几乎就是我们所需要的。
WebSockets 在行动
使用 WebSocket 的优点之一是它的状态属性。 这改变了我们通常认为客户端-服务器通信的方式。 这种情况的一个特殊用例是服务器需要执行长时间缓慢的进程并逐渐将结果流式传输回客户端。
在我们的示例应用程序中,用户将能够通过 WebSocket 上传文件。 在连接的整个生命周期内,服务器会将解析后的文件保留在内存中。 根据请求,服务器可以将文件的一部分发送回前端。 此外,该文件将在一个 URL 上可用,然后多个用户可以查看该 URL。 如果在同一 URL 上上传了另一个文件,查看它的每个人都将能够立即看到新文件。
对于前端,我们将使用 AngularJS。 这个框架和库将使我们能够轻松处理文件上传和分页。 然而,对于与 WebSockets 相关的所有内容,我们将使用标准的 JavaScript 函数。
这个简单的应用程序将分解为三个单独的文件:
- parser.py:我们的 Tornado 服务器与请求处理程序实现的地方
- templates/index.html:前端 HTML 模板
- static/parser.js:用于我们的前端 JavaScript
打开一个 WebSocket
在前端,可以通过实例化 WebSocket 对象来建立 WebSocket 连接:
new WebSocket(WEBSOCKET_URL);这是我们在页面加载时必须做的事情。 实例化 WebSocket 对象后,必须附加处理程序以处理三个重要事件:
- open:建立连接时触发
- message:从服务器接收到消息时触发
- close:连接关闭时触发
$scope.init = function() { $scope.ws = new WebSocket('ws://' + location.host + '/parser/ws'); $scope.ws.binaryType = 'arraybuffer'; $scope.ws.onopen = function() { console.log('Connected.') }; $scope.ws.onmessage = function(evt) { $scope.$apply(function () { message = JSON.parse(evt.data); $scope.currentPage = parseInt(message['page_no']); $scope.totalRows = parseInt(message['total_number']); $scope.rows = message['data']; }); }; $scope.ws.onclose = function() { console.log('Connection is closed...'); }; } $scope.init();由于这些事件处理程序不会自动触发 AngularJS 的 $scope 生命周期,因此处理程序函数的内容需要包装在 $apply 中。 如果您有兴趣,AngularJS 的特定包可以更容易地将 WebSocket 集成到 AngularJS 应用程序中。
值得一提的是,断开的 WebSocket 连接不会自动重新建立,并且会在触发关闭事件处理程序时要求应用程序尝试重新连接。 这有点超出了本文的范围。
选择要上传的文件
由于我们正在使用 AngularJS 构建单页应用程序,因此尝试以古老的方式提交带有文件的表单是行不通的。 为了让事情变得更简单,我们将使用 Danial Farid 的 ng-file-upload 库。 使用它,我们需要做的就是允许用户上传文件,只需在前端模板中添加一个带有特定 AngularJS 指令的按钮:
<button class="btn btn-default" type="file" ngf-select="uploadFile($file, $invalidFiles)" accept=".tsv" ngf-max-size="10MB">Select File</button> 该库允许我们设置可接受的文件扩展名和大小。 单击此按钮,就像任何<input type=”file”>元素一样,将打开标准文件选择器。
上传文件
当您要传输二进制数据时,您可以在数组缓冲区和 blob 中进行选择。 如果它只是像图像文件这样的原始数据,请选择 blob 并在服务器中正确处理。 数组缓冲区用于固定长度的二进制缓冲区,像TSV这样的文本文件可以以字节串的格式传输。 此代码段显示如何以数组缓冲区格式上传文件。
$scope.uploadFile = function(file, errFiles) { ws = $scope.ws; $scope.f = file; $scope.errFile = errFiles && errFiles[0]; if (file) { reader = new FileReader(); rawData = new ArrayBuffer(); reader.onload = function(evt) { rawData = evt.target.result; ws.send(rawData); } reader.readAsArrayBuffer(file); } }ng-file-upload 指令提供了一个 uploadFile 函数。 在这里,您可以使用 FileReader 将文件转换为数组缓冲区,并通过 WebSocket 发送。
请注意,通过 WebSocket 通过将大文件读入数组缓冲区来发送大文件可能不是上传它们的最佳方式,因为它会很快占用大量内存,从而导致糟糕的体验。

在服务器上接收文件
Tornado 使用 4 位操作码确定消息类型,并返回 str 表示二进制数据和 unicode 表示文本。
if opcode == 0x1: # UTF-8 data self._message_bytes_in += len(data) try: decoded = data.decode("utf-8") except UnicodeDecodeError: self._abort() return self._run_callback(self.handler.on_message, decoded) elif opcode == 0x2: # Binary data self._message_bytes_in += len(data) self._run_callback(self.handler.on_message, data)在 Tornado Web 服务器中,数组缓冲区以 str 类型接收。
在此示例中,我们期望的内容类型是 TSV,因此文件被解析并转换为字典。 当然,在实际应用中,有更明智的方法来处理任意上传。
def make_message(self, page_no=1): page_size = 100 return { "page_no": page_no, "total_number": len(self.rows), "data": self.rows[page_size * (page_no - 1):page_size * page_no] } def on_message(self, message): if isinstance(message, str): self.rows = [csv.reader([line], delimiter="\t").next() for line in (x.strip() for x in message.splitlines()) if line] self.write_message(self.make_message())请求页面
由于我们的目标是在小页面块中显示上传的 TSV 数据,因此我们需要一种请求特定页面的方法。 为了简单起见,我们将简单地使用相同的 WebSocket 连接将页码发送到我们的服务器。
$scope.pageChanged = function() { ws = $scope.ws; ws.send($scope.currentPage); }服务器将以 unicode 格式接收此消息:
def on_message(self, message): if isinstance(message, unicode): page_no = int(message) self.write_message(self.make_message(page_no))尝试使用来自 Tornado WebSocket 服务器的 dict 进行响应将自动将其编码为 JSON 格式。 所以只发送一个包含 100 行内容的 dict 是完全可以的。
与他人共享访问权限
为了能够与多个用户共享对同一上传的访问权限,我们需要能够唯一地识别上传。 每当用户通过 WebSocket 连接到服务器时,都会生成一个随机 UUID 并将其分配给他们的连接。
def open(self, doc_uuid=None): if doc_uuid is None: self.uuid = str(uuid.uuid4()) uuid.uuid4()生成随机 UUID,str() 将 UUID 转换为标准格式的十六进制数字字符串。
如果另一个具有 UUID 的用户连接到服务器,则将相应的 FileHandler 实例添加到以 UUID 为键的字典中,并在连接关闭时删除。
@classmethod @tornado.gen.coroutine def add_clients(cls, doc_uuid, client): with (yield lock.acquire()): if doc_uuid in cls.clients: clients_with_uuid = FileHandler.clients[doc_uuid] clients_with_uuid.append(client) else: FileHandler.clients[doc_uuid] = [client] @classmethod @tornado.gen.coroutine def remove_clients(cls, doc_uuid, client): with (yield lock.acquire()): if doc_uuid in cls.clients: clients_with_uuid = FileHandler.clients[doc_uuid] clients_with_uuid.remove(client) if len(clients_with_uuid) == 0: del cls.clients[doc_uuid]同时添加或删除客户端时,客户端字典可能会引发 KeyError。 由于 Tornado 是一个异步网络库,它为同步提供了锁定机制。 带有协程的简单锁适合这种处理客户端字典的情况。
如果任何用户上传文件或在页面之间移动,所有具有相同 UUID 的用户都会查看相同的页面。
@classmethod def send_messages(cls, doc_uuid): clients_with_uuid = cls.clients[doc_uuid] message = cls.make_message(doc_uuid) for client in clients_with_uuid: try: client.write_message(message) except: logging.error("Error sending message", exc_info=True)跑在 Nginx 后面
实现 WebSockets 非常简单,但是在生产环境中使用它时需要考虑一些棘手的事情。 Tornado 是一个 Web 服务器,因此它可以直接获取用户的请求,但出于多种原因,将其部署在 Nginx 后面可能是更好的选择。 然而,通过 Nginx 使用 WebSocket 需要付出更多的努力:
http { upstream parser { server 127.0.0.1:8080; } server { location ^~ /parser/ws { proxy_pass http://parser; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } } } 两个proxy_set_header指令使 Nginx 将必要的标头传递给后端服务器,这是升级到 WebSocket 连接所必需的。
下一步是什么?
在本文中,我们实现了一个简单的 Python Web 应用程序,它使用 WebSockets 来维护服务器和每个客户端之间的持久连接。 使用像 Tornado 这样的现代异步网络框架,在 Python 中同时保持数以万计的打开连接是完全可行的。
尽管此演示应用程序的某些实现方面可能会有所不同,但我希望它仍然有助于演示 https://www.toptal.com/tornado 框架中 WebSockets 的使用。 演示应用程序的源代码可在 GitHub 上获得。
