使用 Nameko 介绍 Python 微服务

已发表: 2022-03-11

介绍

鉴于其灵活性和弹性,微服务架构模式是一种越来越受欢迎的架构风格。 与 Kubernetes 等技术一起,使用微服务架构引导应用程序变得前所未有的容易。

根据 Martin Fowler 博客中的一篇经典文章,微服务架构风格可以概括为:

简而言之,微服务架构风格是一种将单个应用程序开发为一组小服务的方法,每个小服务都运行在自己的进程中并与轻量级机制(通常是 HTTP 资源 API)进行通信。 这些服务是围绕业务能力构建的,并且可以通过全自动部署机制独立部署。

换句话说,遵循微服务架构的应用程序由几个独立的、动态的服务组成,这些服务使用通信协议相互通信。 使用 HTTP(和 REST)很常见,但正如我们将看到的,我们可以使用其他类型的通信协议,例如 AMQP(高级消息队列协议)上的 RPC(远程过程调用)。

微服务模式可以被认为是 SOA(面向服务的架构)的一个特定案例。 然而,在 SOA 中,通常使用 ESB(企业服务总线)来管理服务之间的通信。 ESB 通常非常复杂,并且包含用于复杂消息路由和业务规则应用程序的功能。 在微服务中,更常见的是采用另一种方法:“智能端点和哑管道”,这意味着服务本身应该包含所有的业务逻辑和复杂性(高内聚),但服务之间的连接应该像可能(高度解耦),这意味着服务不一定需要知道哪些其他服务将与它进行通信。 这是在架构级别应用的关注点分离。

微服务的另一个方面是没有强制规定每个服务中应该使用哪些技术。 您应该能够使用可以与其他服务通信的任何软件堆栈编写服务。 每个服务也有自己的生命周期管理。 所有这一切意味着在一家公司中,可以让团队使用不同的技术甚至管理方法来处理不同的服务。 每个团队都将关注业务能力,帮助建立一个更敏捷的组织。

Python 微服务

牢记这些概念,在本文中,我们将重点关注使用 Python 构建概念验证微服务应用程序。 为此,我们将使用 Python 微服务框架 Nameko。 它内置了 RPC over AMQP,让您可以轻松地在服务之间进行通信。 它还有一个简单的 HTTP 查询接口,我们将在本教程中使用它。 但是,为了编写暴露 HTTP 端点的微服务,建议您使用其他框架,例如 Flask。 要使用 Flask 通过 RPC 调用 Nameko 方法,您可以使用 flask_nameko,这是一个为 Flask 与 Nameko 互操作而构建的包装器。

设置基本环境

让我们从运行从 Nameko 网站中提取的最简单的示例开始,并根据我们的目的对其进行扩展。 首先,您需要安装 Docker。 我们将在示例中使用 Python 3,因此请确保您也安装了它。 然后,创建一个 python virtualenv 并运行$ pip install nameko

要运行 Nameko,我们需要 RabbitMQ 消息代理。 它将负责我们 Nameko 服务之间的通信。 不过不用担心,因为您不需要在您的机器上再安装一个依赖项。 使用 Docker,我们可以简单地下载预配置的镜像并运行它,完成后只需停止容器即可。 没有守护进程, apt-getdnf install

带有 Nameko 的 Python 微服务与 RabbitMQ 代理交谈

通过运行$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3启动 RabbitMQ 容器(您可能需要 sudo 来执行此操作)。 这将使用最新版本 3 RabbitMQ 启动 Docker 容器,并通过默认端口 5672 公开它。

带有微服务的 Hello World

继续创建一个名为hello.py的文件,其内容如下:

 from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): return "Hello, {}!".format(name)

Nameko 服务是类。 这些类公开了作为扩展实现的入口点。 内置扩展包括创建表示 RPC 方法、事件侦听器、HTTP 端点或计时器的入口点的能力。 还有一些社区扩展可以用来与 PostgreSQL 数据库、Redis 等进行交互……可以编写自己的扩展。

让我们继续运行我们的示例。 如果你让 RabbitMQ 在默认端口上运行,只需运行$ nameko run hello 。 它会找到 RabbitMQ 并自动连接到它。 然后,为了测试我们的服务,在另一个终端中运行$ nameko shell 。 这将创建一个交互式 shell,它将连接到同一个 RabbitMQ 实例。 很棒的是,通过使用 RPC over AMQP,Nameko 实现了自动服务发现。 当调用一个 RPC 方法时,nameko 会尝试寻找对应的正在运行的服务。

两个 Nameko 服务通过 RabbitMQ RPC 通信

运行 Nameko shell 时,您会在命名空间中添加一个名为n的特殊对象。 该对象允许调度事件和进行 RPC 调用。 要对我们的服务进行 RPC 调用,请运行:

 > >> n.rpc.greetingservice.hello(name='world') 'Hello, world!'

并发呼叫

这些服务类在调用时实例化并在调用完成后销毁。 因此,它们本质上应该是无状态的,这意味着您不应该尝试在调用之间保持对象或类中的任何状态。 这意味着服务本身必须是无状态的。 假设所有服务都是无状态的,Nameko 能够通过使用 eventlet greenthreads 来利用并发性。 实例化的服务称为“workers”,并且可以配置最大数量的同时运行的workers。

要在实践中验证 Nameko 并发性,请在返回响应之前通过在过程调用中添加 sleep 来修改源代码:

 from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}!".format(name)

我们正在使用time模块中的sleep ,该模块未启用异步。 但是,当使用nameko run运行我们的服务时,它会自动修补来自阻塞调用(例如sleep(5)的触发器产量。

现在预计来自过程调用的响应时间大约需要 5 秒。 但是,当我们从 nameko shell 运行以下代码片段时,它的行为会是什么?

 res = [] for i in range(5): hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) res.append(hello_res) for hello_res in res: print(hello_res.result())

Nameko 为每个 RPC 入口点提供了一个非阻塞的call_async方法,返回一个代理回复对象,然后可以查询其结果。 在响应代理上调用result方法时,将被阻塞,直到返回响应。

正如预期的那样,这个例子只需要大约五秒钟的时间。 每个工作人员都将被阻止等待sleep调用完成,但这不会阻止另一个工作人员启动。 例如,用一个有用的阻塞 I/O 数据库调用替换这个sleep调用,你就会得到一个非常快的并发服务。

如前所述,Nameko 在调用方法时创建工作程序。 最大工人数是可配置的。 默认情况下,该数字设置为 10。您可以测试将上述代码段中的range(5)更改为例如 range(20)。 这将调用hello方法 20 次,现在应该需要 10 秒才能运行:

 > >> res = [] > >> for i in range(20): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! Hello, 2! Hello, 3! Hello, 4! Hello, 5! Hello, 6! Hello, 7! Hello, 8! Hello, 9! Hello, 10! Hello, 11! Hello, 12! Hello, 13! Hello, 14! Hello, 15! Hello, 16! Hello, 17! Hello, 18! Hello, 19!

现在,假设您有太多(超过 10 个)并发用户调用该hello方法。 一些用户将等待超过预期的 5 秒以等待响应。 一种解决方案是通过使用例如配置文件覆盖默认设置来增加作品的数量。 但是,如果您的服务器由于调用的方法依赖于一些繁重的数据库查询而已经达到了这十个工作人员的限制,那么增加工作人员的数量可能会导致响应时间增加更多。

扩展我们的服务

更好的解决方案是使用 Nameko 微服务功能。 到目前为止,我们只使用了一台服务器(您的计算机),运行了一个 RabbitMQ 实例和一个服务实例。 在生产环境中,您将希望任意增加运行服务的节点数,该服务获得过多调用。 如果您希望消息代理更可靠,也可以构建 RabbitMQ 集群。

要模拟服务扩展,我们可以简单地打开另一个终端并像以前一样运行服务,使用$ nameko run hello 。 这将启动另一个服务实例,并有可能再运行十个工作人员。 现在,尝试使用range(20)再次运行该代码段。 现在应该再次运行五秒钟。 当有多个服务实例在运行时,Nameko 将在可用实例之间轮询 RPC 请求。

Nameko 旨在稳健地处理集群中的这些方法调用。 要对此进行测试,请尝试运行 snipped,在它完成之前,转到运行 Nameko 服务的终端之一,然后按Ctrl+C两次。 这将在不等待工作人员完成的情况下关闭主机。 Nameko 会将调用重新分配给另一个可用的服务实例。

在实践中,您将使用 Docker 将您的服务容器化,稍后我们将使用编排工具(例如 Kubernetes)来管理运行服务的节点和其他依赖项,例如消息代理。 如果做得正确,使用 Kubernetes,您将有效地将应用程序转换为强大的分布式系统,不受意外峰值的影响。 此外,Kubernetes 允许零停机部署。 因此,部署新版本的服务不会影响系统的可用性。

考虑到一些向后兼容性来构建服务很重要,因为在生产环境中,同一服务的多个不同版本可能会同时运行,尤其是在部署期间。 如果你使用 Kubernetes,在部署过程中它只会在有足够多的新容器运行时杀死所有旧版本的容器。

对于 Nameko,同时运行同一服务的多个不同版本不是问题。 由于它以循环方式分发呼叫,因此呼叫可能会通过旧版本或新版本。 为了测试这一点,让我们的服务在一个终端上运行旧版本,然后编辑服务模块,如下所示:

 from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}! (version 2)".format(name)

如果您从另一个终端运行该服务,您将同时运行两个版本。 现在,再次运行我们的测试片段,您将看到两个版本都显示:

 > >> res = [] > >> for i in range(5): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! (version 2) Hello, 2! Hello, 3! (version 2) Hello, 4!

使用多个实例

现在我们知道了如何有效地使用 Nameko,以及缩放是如何工作的。 现在让我们更进一步,使用 Docker 生态系统中的更多工具:docker-compose。 如果您要部署到单个服务器,这将起作用,这绝对不是理想的,因为您不会利用微服务架构的许多优势。 同样,如果您想拥有更合适的基础架构,您可以使用 Kubernetes 等编排工具来管理分布式容器系统。 所以,继续安装 docker-compose。

同样,我们所要做的就是部署一个 RabbitMQ 实例,Nameko 将负责其余的工作,因为所有服务都可以访问该 RabbitMQ 实例。 此示例的完整源代码可在此 GitHub 存储库中找到。

让我们构建一个简单的旅行应用程序来测试 Nameko 的功能。 该应用程序允许注册机场和旅行。 每个机场都简单地存储为机场的名称,而行程存储起点和目的地机场的 ID。 我们系统的架构如下所示:

旅游应用图

理想情况下,每个微服务都有自己的数据库实例。 但是,为简单起见,我创建了一个 Redis 数据库供 Trips 和 Airports 微服务共享。 Gateway 微服务将通过一个简单的类似 REST 的 API 接收 HTTP 请求,并使用 RPC 与 Airports 和 Trips 进行通信。

让我们从网关微服务开始。 它的结构很简单,任何来自像 Flask 这样的框架的人都应该非常熟悉。 我们基本上定义了两个端点,每个端点都允许 GET 和 POST 方法:

 import json from nameko.rpc import RpcProxy from nameko.web.handlers import http class GatewayService: name = 'gateway' airports_rpc = RpcProxy('airports_service') trips_rpc = RpcProxy('trips_service') @http('GET', '/airport/<string:airport_id>') def get_airport(self, request, airport_id): airport = self.airports_rpc.get(airport_id) return json.dumps({'airport': airport}) @http('POST', '/airport') def post_airport(self, request): data = json.loads(request.get_data(as_text=True)) airport_id = self.airports_rpc.create(data['airport']) return airport_id @http('GET', '/trip/<string:trip_id>') def get_trip(self, request, trip_id): trip = self.trips_rpc.get(trip_id) return json.dumps({'trip': trip}) @http('POST', '/trip') def post_trip(self, request): data = json.loads(request.get_data(as_text=True)) trip_id = self.trips_rpc.create(data['airport_from'], data['airport_to']) return trip_id

现在让我们来看看机场服务。 正如预期的那样,它公开了两个 RPC 方法。 get方法将简单地查询 Redis 数据库并返回给定 id 的机场。 create方法会随机生成一个id,存储机场信息,返回id:

 import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "airports_service" redis = Redis('development') @rpc def get(self, airport_id): airport = self.redis.get(airport_id) return airport @rpc def create(self, airport): airport_id = uuid.uuid4().hex self.redis.set(airport_id, airport) return airport_id

注意我们是如何使用nameko_redis扩展的。 查看社区扩展列表。 扩展以采用依赖注入的方式实现。 Nameko 负责启动每个工作人员将使用的实际扩展对象。

Airports 和 Trips 微服务之间没有太大区别。 以下是 Trips 微服务的外观:

 import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "trips_service" redis = Redis('development') @rpc def get(self, trip_id): trip = self.redis.get(trip_id) return trip @rpc def create(self, airport_from_id, airport_to_id): trip_id = uuid.uuid4().hex self.redis.set(trip_id, { "from": airport_from_id, "to": airport_to_id }) return trip_id

每个微服务的Dockerfile也非常简单。 唯一的依赖是nameko ,对于 Airports 和 Trips 服务,还需要安装nameko-redis 。 这些依赖项在每个服务的requirements.txt中给出。 Airports 服务的 Dockerfile 如下所示:

 FROM python:3 RUN apt-get update && apt-get -y install netcat && apt-get clean WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY config.yml ./ COPY run.sh ./ COPY airports.py ./ RUN chmod +x ./run.sh CMD ["./run.sh"]

它与其他服务的 Dockerfile 之间的唯一区别是源文件(在本例中为airports.py ),应相应更改。

run.sh脚本负责等待 RabbitMQ,对于 Airports 和 Trips 服务,Redis 数据库已准备就绪。 以下代码段显示了机场的run.sh的内容。 同样,对于其他服务,只需从aiports更改为gateway或相应的trips

 #!/bin/bash until nc -z ${RABBIT_HOST} ${RABBIT_PORT}; do echo "$(date) - waiting for rabbitmq..." sleep 1 done until nc -z ${REDIS_HOST} ${REDIS_PORT}; do echo "$(date) - waiting for redis..." sleep 1 done nameko run --config config.yml airports

我们的服务现在可以运行了:

$ docker-compose up

让我们测试一下我们的系统。 运行命令:

 $ curl -i -d "{\"airport\": \"first_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:05:53 GMT f2bddf0e506145f6ba0c28c247c54629

最后一行是为我们的机场生成的 id。 要测试它是否正常工作,请运行:

 $curl localhost:8000/airport/f2bddf0e506145f6ba0c28c247c54629 {"airport": "first_airport"} Great, now let's add another airport: $ curl -i -d "{\"airport\": \"second_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:06:00 GMT 565000adcc774cfda8ca3a806baec6b5

现在我们有两个机场,足以组成一次旅行。 现在让我们创建一个行程:

 $ curl -i -d "{\"airport_from\": \"f2bddf0e506145f6ba0c28c247c54629\", \"airport_to\": \"565000adcc774cfda8ca3a806baec6b5\"}" localhost:8000/trip HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:09:10 GMT 34ca60df07bc42e88501178c0b6b95e4

和以前一样,最后一行代表行程 ID。 让我们检查它是否正确插入:

 $ curl localhost:8000/trip/34ca60df07bc42e88501178c0b6b95e4 {"trip": "{'from': 'f2bddf0e506145f6ba0c28c247c54629', 'to': '565000adcc774cfda8ca3a806baec6b5'}"}

概括

我们通过创建本地运行的 RabbitMQ 实例、连接到它并执行多个测试来了解 Nameko 的工作原理。 然后,我们应用所获得的知识来创建一个使用微服务架构的简单系统。

尽管非常简单,但我们的系统非常接近生产就绪部署的样子。 您最好使用另一个框架来处理 HTTP 请求,例如 Falcon 或 Flask。 两者都是很好的选择,并且可以很容易地用于创建其他基于 HTTP 的微服务,例如,如果您想破坏您的网关服务。 Flask 的优势在于已经有一个插件来与 Nameko 交互,但是您可以直接从任何框架使用 nameko-proxy。

Nameko 也很容易测试。 为了简单起见,我们没有在这里介绍测试,但请查看 Nameko 的测试文档。

对于微服务架构中的所有活动部件,您希望确保拥有一个强大的日志记录系统。 要构建一个,请参阅 Toptaler 和 Python 开发人员 Son Nguyen Kim 编写的 Python 日志记录:深度教程。