使用 Nameko 介紹 Python 微服務
已發表: 2022-03-11介紹
鑑於其靈活性和彈性,微服務架構模式是一種越來越受歡迎的架構風格。 與 Kubernetes 等技術一起,使用微服務架構引導應用程序變得前所未有的容易。
根據 Martin Fowler 博客中的一篇經典文章,微服務架構風格可以概括為:
簡而言之,微服務架構風格是一種將單個應用程序開發為一組小服務的方法,每個小服務都運行在自己的進程中並與輕量級機制(通常是 HTTP 資源 API)進行通信。 這些服務是圍繞業務能力構建的,並且可以通過全自動部署機制獨立部署。
換句話說,遵循微服務架構的應用程序由幾個獨立的、動態的服務組成,這些服務使用通信協議相互通信。 使用 HTTP(和 REST)很常見,但正如我們將看到的,我們可以使用其他類型的通信協議,例如 AMQP(高級消息隊列協議)上的 RPC(遠程過程調用)。
微服務模式可以被認為是 SOA(面向服務的架構)的一個特定案例。 然而,在 SOA 中,通常使用 ESB(企業服務總線)來管理服務之間的通信。 ESB 通常非常複雜,並且包含用於復雜消息路由和業務規則應用程序的功能。 在微服務中,更常見的是採用另一種方法:“智能端點和啞管道”,這意味著服務本身應該包含所有的業務邏輯和復雜性(高內聚),但服務之間的連接應該像可能(高度解耦),這意味著服務不一定需要知道哪些其他服務將與它進行通信。 這是在架構級別應用的關注點分離。
微服務的另一個方面是沒有強制規定每個服務中應該使用哪些技術。 您應該能夠使用可以與其他服務通信的任何軟件堆棧編寫服務。 每個服務也有自己的生命週期管理。 所有這一切意味著在一家公司中,可以讓團隊使用不同的技術甚至管理方法來處理不同的服務。 每個團隊都將關注業務能力,幫助建立一個更敏捷的組織。
Python 微服務
牢記這些概念,在本文中,我們將重點關注使用 Python 構建概念驗證微服務應用程序。 為此,我們將使用 Python 微服務框架 Nameko。 它內置了 RPC over AMQP,讓您可以輕鬆地在服務之間進行通信。 它還有一個簡單的 HTTP 查詢接口,我們將在本教程中使用它。 但是,為了編寫暴露 HTTP 端點的微服務,建議您使用其他框架,例如 Flask。 要使用 Flask 通過 RPC 調用 Nameko 方法,您可以使用 flask_nameko,這是一個為 Flask 與 Nameko 互操作而構建的包裝器。
設置基本環境
讓我們從運行從 Nameko 網站中提取的最簡單的示例開始,並根據我們的目的對其進行擴展。 首先,您需要安裝 Docker。 我們將在示例中使用 Python 3,因此請確保您也安裝了它。 然後,創建一個 python virtualenv 並運行$ pip install nameko
。
要運行 Nameko,我們需要 RabbitMQ 消息代理。 它將負責我們 Nameko 服務之間的通信。 不過不用擔心,因為您不需要在您的機器上再安裝一個依賴項。 使用 Docker,我們可以簡單地下載預配置的鏡像並運行它,完成後只需停止容器即可。 沒有守護進程, apt-get
或dnf install
。
通過運行$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
啟動 RabbitMQ 容器(您可能需要 sudo 來執行此操作)。 這將使用最新版本 3 RabbitMQ 啟動 Docker 容器,並通過默認端口 5672 公開它。
帶有微服務的 Hello World
繼續創建一個名為hello.py
的文件,其內容如下:
from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): return "Hello, {}!".format(name)
Nameko 服務是類。 這些類公開了作為擴展實現的入口點。 內置擴展包括創建表示 RPC 方法、事件偵聽器、HTTP 端點或計時器的入口點的能力。 還有一些社區擴展可以用來與 PostgreSQL 數據庫、Redis 等進行交互……可以編寫自己的擴展。
讓我們繼續運行我們的示例。 如果你讓 RabbitMQ 在默認端口上運行,只需運行$ nameko run hello
。 它會找到 RabbitMQ 並自動連接到它。 然後,為了測試我們的服務,在另一個終端中運行$ nameko shell
。 這將創建一個交互式 shell,它將連接到同一個 RabbitMQ 實例。 很棒的是,通過使用 RPC over AMQP,Nameko 實現了自動服務發現。 當調用一個 RPC 方法時,nameko 會嘗試尋找對應的正在運行的服務。
運行 Nameko shell 時,您會在命名空間中添加一個名為n
的特殊對象。 該對象允許調度事件和進行 RPC 調用。 要對我們的服務進行 RPC 調用,請運行:
> >> n.rpc.greetingservice.hello(name='world') 'Hello, world!'
並發呼叫
這些服務類在調用時實例化並在調用完成後銷毀。 因此,它們本質上應該是無狀態的,這意味著您不應該嘗試在調用之間保持對像或類中的任何狀態。 這意味著服務本身必須是無狀態的。 假設所有服務都是無狀態的,Nameko 能夠通過使用 eventlet greenthreads 來利用並發性。 實例化的服務稱為“workers”,並且可以配置最大數量的同時運行的workers。
要在實踐中驗證 Nameko 並發性,請在返迴響應之前通過在過程調用中添加 sleep 來修改源代碼:
from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}!".format(name)
我們正在使用time
模塊中的sleep
,該模塊未啟用異步。 但是,當使用nameko run
運行我們的服務時,它會自動修補來自阻塞調用(例如sleep(5)
的觸發器產量。
現在預計來自過程調用的響應時間大約需要 5 秒。 但是,當我們從 nameko shell 運行以下代碼片段時,它的行為會是什麼?
res = [] for i in range(5): hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) res.append(hello_res) for hello_res in res: print(hello_res.result())
Nameko 為每個 RPC 入口點提供了一個非阻塞的call_async
方法,返回一個代理回復對象,然後可以查詢其結果。 在響應代理上調用result
方法時,將被阻塞,直到返迴響應。
正如預期的那樣,這個例子只需要大約五秒鐘的時間。 每個工作人員都將被阻止等待sleep
調用完成,但這不會阻止另一個工作人員啟動。 例如,用一個有用的阻塞 I/O 數據庫調用替換這個sleep
調用,你就會得到一個非常快的並發服務。
如前所述,Nameko 在調用方法時創建工作程序。 最大工人數是可配置的。 默認情況下,該數字設置為 10。您可以測試將上述代碼段中的range(5)
更改為例如 range(20)。 這將調用hello
方法 20 次,現在應該需要 10 秒才能運行:
> >> res = [] > >> for i in range(20): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! Hello, 2! Hello, 3! Hello, 4! Hello, 5! Hello, 6! Hello, 7! Hello, 8! Hello, 9! Hello, 10! Hello, 11! Hello, 12! Hello, 13! Hello, 14! Hello, 15! Hello, 16! Hello, 17! Hello, 18! Hello, 19!
現在,假設您有太多(超過 10 個)並髮用戶調用該hello
方法。 一些用戶將等待超過預期的 5 秒以等待響應。 一種解決方案是通過使用例如配置文件覆蓋默認設置來增加作品的數量。 但是,如果您的服務器由於調用的方法依賴於一些繁重的數據庫查詢而已經達到了這十個工作人員的限制,那麼增加工作人員的數量可能會導致響應時間增加更多。
擴展我們的服務
更好的解決方案是使用 Nameko 微服務功能。 到目前為止,我們只使用了一台服務器(您的計算機),運行了一個 RabbitMQ 實例和一個服務實例。 在生產環境中,您將希望任意增加運行服務的節點數,該服務獲得過多調用。 如果您希望消息代理更可靠,也可以構建 RabbitMQ 集群。
要模擬服務擴展,我們可以簡單地打開另一個終端並像以前一樣運行服務,使用$ nameko run hello
。 這將啟動另一個服務實例,並有可能再運行十個工作人員。 現在,嘗試使用range(20)
再次運行該代碼段。 現在應該再次運行五秒鐘。 當有多個服務實例在運行時,Nameko 將在可用實例之間輪詢 RPC 請求。
Nameko 旨在穩健地處理集群中的這些方法調用。 要對此進行測試,請嘗試運行 snipped,在它完成之前,轉到運行 Nameko 服務的終端之一,然後按Ctrl+C
兩次。 這將在不等待工作人員完成的情況下關閉主機。 Nameko 會將調用重新分配給另一個可用的服務實例。
在實踐中,您將使用 Docker 將您的服務容器化,稍後我們將使用編排工具(例如 Kubernetes)來管理運行服務的節點和其他依賴項,例如消息代理。 如果做得正確,使用 Kubernetes,您將有效地將應用程序轉換為強大的分佈式系統,不受意外峰值的影響。 此外,Kubernetes 允許零停機部署。 因此,部署新版本的服務不會影響系統的可用性。
考慮到一些向後兼容性來構建服務很重要,因為在生產環境中,同一服務的多個不同版本可能會同時運行,尤其是在部署期間。 如果你使用 Kubernetes,在部署過程中它只會在有足夠多的新容器運行時殺死所有舊版本的容器。
對於 Nameko,同時運行同一服務的多個不同版本不是問題。 由於它以循環方式分發呼叫,因此呼叫可能會通過舊版本或新版本。 為了測試這一點,讓我們的服務在一個終端上運行舊版本,然後編輯服務模塊,如下所示:
from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}! (version 2)".format(name)
如果您從另一個終端運行該服務,您將同時運行兩個版本。 現在,再次運行我們的測試片段,您將看到兩個版本都顯示:
> >> res = [] > >> for i in range(5): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! (version 2) Hello, 2! Hello, 3! (version 2) Hello, 4!
使用多個實例
現在我們知道瞭如何有效地使用 Nameko,以及縮放是如何工作的。 現在讓我們更進一步,使用 Docker 生態系統中的更多工具:docker-compose。 如果您要部署到單個服務器,這將起作用,這絕對不是理想的,因為您不會利用微服務架構的許多優勢。 同樣,如果您想擁有更合適的基礎架構,您可以使用 Kubernetes 等編排工具來管理分佈式容器系統。 所以,繼續安裝 docker-compose。

同樣,我們所要做的就是部署一個 RabbitMQ 實例,Nameko 將負責其餘的工作,因為所有服務都可以訪問該 RabbitMQ 實例。 此示例的完整源代碼可在此 GitHub 存儲庫中找到。
讓我們構建一個簡單的旅行應用程序來測試 Nameko 的功能。 該應用程序允許註冊機場和旅行。 每個機場都簡單地存儲為機場的名稱,而行程存儲起點和目的地機場的 ID。 我們系統的架構如下所示:
理想情況下,每個微服務都有自己的數據庫實例。 但是,為簡單起見,我創建了一個 Redis 數據庫供 Trips 和 Airports 微服務共享。 Gateway 微服務將通過一個簡單的類似 REST 的 API 接收 HTTP 請求,並使用 RPC 與 Airports 和 Trips 進行通信。
讓我們從網關微服務開始。 它的結構很簡單,任何來自像 Flask 這樣的框架的人都應該非常熟悉。 我們基本上定義了兩個端點,每個端點都允許 GET 和 POST 方法:
import json from nameko.rpc import RpcProxy from nameko.web.handlers import http class GatewayService: name = 'gateway' airports_rpc = RpcProxy('airports_service') trips_rpc = RpcProxy('trips_service') @http('GET', '/airport/<string:airport_id>') def get_airport(self, request, airport_id): airport = self.airports_rpc.get(airport_id) return json.dumps({'airport': airport}) @http('POST', '/airport') def post_airport(self, request): data = json.loads(request.get_data(as_text=True)) airport_id = self.airports_rpc.create(data['airport']) return airport_id @http('GET', '/trip/<string:trip_id>') def get_trip(self, request, trip_id): trip = self.trips_rpc.get(trip_id) return json.dumps({'trip': trip}) @http('POST', '/trip') def post_trip(self, request): data = json.loads(request.get_data(as_text=True)) trip_id = self.trips_rpc.create(data['airport_from'], data['airport_to']) return trip_id
現在讓我們來看看機場服務。 正如預期的那樣,它公開了兩個 RPC 方法。 get
方法將簡單地查詢 Redis 數據庫並返回給定 id 的機場。 create
方法會隨機生成一個id,存儲機場信息,返回id:
import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "airports_service" redis = Redis('development') @rpc def get(self, airport_id): airport = self.redis.get(airport_id) return airport @rpc def create(self, airport): airport_id = uuid.uuid4().hex self.redis.set(airport_id, airport) return airport_id
注意我們是如何使用nameko_redis
擴展的。 查看社區擴展列表。 擴展以採用依賴注入的方式實現。 Nameko 負責啟動每個工作人員將使用的實際擴展對象。
Airports 和 Trips 微服務之間沒有太大區別。 以下是 Trips 微服務的外觀:
import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "trips_service" redis = Redis('development') @rpc def get(self, trip_id): trip = self.redis.get(trip_id) return trip @rpc def create(self, airport_from_id, airport_to_id): trip_id = uuid.uuid4().hex self.redis.set(trip_id, { "from": airport_from_id, "to": airport_to_id }) return trip_id
每個微服務的Dockerfile
也非常簡單。 唯一的依賴是nameko
,對於 Airports 和 Trips 服務,還需要安裝nameko-redis
。 這些依賴項在每個服務的requirements.txt
中給出。 Airports 服務的 Dockerfile 如下所示:
FROM python:3 RUN apt-get update && apt-get -y install netcat && apt-get clean WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY config.yml ./ COPY run.sh ./ COPY airports.py ./ RUN chmod +x ./run.sh CMD ["./run.sh"]
它與其他服務的 Dockerfile 之間的唯一區別是源文件(在本例中為airports.py
),應相應更改。
run.sh
腳本負責等待 RabbitMQ,對於 Airports 和 Trips 服務,Redis 數據庫已準備就緒。 以下代碼段顯示了機場的run.sh
的內容。 同樣,對於其他服務,只需從aiports
更改為gateway
或相應的trips
:
#!/bin/bash until nc -z ${RABBIT_HOST} ${RABBIT_PORT}; do echo "$(date) - waiting for rabbitmq..." sleep 1 done until nc -z ${REDIS_HOST} ${REDIS_PORT}; do echo "$(date) - waiting for redis..." sleep 1 done nameko run --config config.yml airports
我們的服務現在可以運行了:
$ docker-compose up
讓我們測試一下我們的系統。 運行命令:
$ curl -i -d "{\"airport\": \"first_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:05:53 GMT f2bddf0e506145f6ba0c28c247c54629
最後一行是為我們的機場生成的 id。 要測試它是否正常工作,請運行:
$curl localhost:8000/airport/f2bddf0e506145f6ba0c28c247c54629 {"airport": "first_airport"} Great, now let's add another airport: $ curl -i -d "{\"airport\": \"second_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:06:00 GMT 565000adcc774cfda8ca3a806baec6b5
現在我們有兩個機場,足以組成一次旅行。 現在讓我們創建一個行程:
$ curl -i -d "{\"airport_from\": \"f2bddf0e506145f6ba0c28c247c54629\", \"airport_to\": \"565000adcc774cfda8ca3a806baec6b5\"}" localhost:8000/trip HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:09:10 GMT 34ca60df07bc42e88501178c0b6b95e4
和以前一樣,最後一行代表行程 ID。 讓我們檢查它是否正確插入:
$ curl localhost:8000/trip/34ca60df07bc42e88501178c0b6b95e4 {"trip": "{'from': 'f2bddf0e506145f6ba0c28c247c54629', 'to': '565000adcc774cfda8ca3a806baec6b5'}"}
概括
我們通過創建本地運行的 RabbitMQ 實例、連接到它並執行多個測試來了解 Nameko 的工作原理。 然後,我們應用所獲得的知識來創建一個使用微服務架構的簡單系統。
儘管非常簡單,但我們的系統非常接近生產就緒部署的樣子。 您最好使用另一個框架來處理 HTTP 請求,例如 Falcon 或 Flask。 兩者都是很好的選擇,並且可以很容易地用於創建其他基於 HTTP 的微服務,例如,如果您想破壞您的網關服務。 Flask 的優勢在於已經有一個插件來與 Nameko 交互,但是您可以直接從任何框架使用 nameko-proxy。
Nameko 也很容易測試。 為了簡單起見,我們沒有在這裡介紹測試,但請查看 Nameko 的測試文檔。
對於微服務架構中的所有活動部件,您希望確保擁有一個強大的日誌記錄系統。 要構建一個,請參閱 Toptaler 和 Python 開發人員 Son Nguyen Kim 編寫的 Python 日誌記錄:深度教程。