Namekoを使用したPythonマイクロサービスの概要
公開: 2022-03-11序章
マイクロサービスのアーキテクチャパターンは、柔軟性と復元力を考慮して、人気が高まっているアーキテクチャスタイルです。 Kubernetesなどのテクノロジーとともに、マイクロサービスアーキテクチャを使用したアプリケーションのブートストラップがこれまでになく簡単になりました。
Martin Fowlerのブログの古典的な記事によると、マイクロサービスのアーキテクチャスタイルは次のように要約できます。
つまり、マイクロサービスアーキテクチャスタイルは、単一のアプリケーションを小さなサービスのスイートとして開発するためのアプローチであり、それぞれが独自のプロセスで実行され、軽量のメカニズム(多くの場合HTTPリソースAPI)と通信します。 これらのサービスはビジネス機能を中心に構築されており、完全に自動化された展開機構によって独立して展開できます。
つまり、マイクロサービスアーキテクチャに準拠したアプリケーションは、通信プロトコルを使用して相互に通信する複数の独立した動的サービスで構成されます。 HTTP(およびREST)を使用するのが一般的ですが、これから説明するように、AMQP(Advanced Message Queuing Protocol)を介したRPC(Remote Procedure Call)などの他のタイプの通信プロトコルを使用できます。
マイクロサービスのパターンは、SOA(サービス指向アーキテクチャー)の特定のケースと考えることができます。 ただし、SOAでは、ESB(エンタープライズサービスバス)を使用してサービス間の通信を管理するのが一般的です。 ESBは通常、非常に洗練されており、複雑なメッセージルーティングおよびビジネスルールアプリケーションの機能を備えています。 マイクロサービスでは、「スマートエンドポイントとダムパイプ」という代替アプローチを採用するのが一般的です。つまり、サービス自体にすべてのビジネスロジックと複雑さ(高い凝集度)が含まれている必要がありますが、サービス間の接続は次のように単純である必要があります。可能(高デカップリング)。つまり、サービスは、他のどのサービスが通信するかを必ずしも知る必要はありません。 これは、アーキテクチャレベルで適用される関心の分離です。
マイクロサービスのもう1つの側面は、各サービス内でどのテクノロジーを使用する必要があるかについての強制がないことです。 他のサービスと通信できる任意のソフトウェアスタックを使用してサービスを記述できる必要があります。 各サービスには、独自のライフサイクル管理もあります。 つまり、企業では、さまざまなテクノロジーや管理方法を使用して、チームが別々のサービスに取り組むことが可能です。 各チームはビジネス能力に関心を持ち、より機敏な組織の構築を支援します。
Pythonマイクロサービス
これらの概念を念頭に置いて、この記事では、Pythonを使用した概念実証マイクロサービスアプリケーションの構築に焦点を当てます。 そのために、PythonマイクロサービスフレームワークであるNamekoを使用します。 RPC over AMQPが組み込まれているため、サービス間で簡単に通信できます。 また、このチュートリアルで使用するHTTPクエリ用のシンプルなインターフェイスもあります。 ただし、HTTPエンドポイントを公開するマイクロサービスを作成する場合は、Flaskなどの別のフレームワークを使用することをお勧めします。 Flaskを使用してRPCを介してNamekoメソッドを呼び出すには、FlaskとNamekoを相互運用するためだけに構築されたラッパーであるflask_namekoを使用できます。
基本環境の設定
NamekoのWebサイトから抽出した、可能な限り単純な例を実行することから始めて、目的に合わせて拡張してみましょう。 まず、Dockerをインストールする必要があります。 この例ではPython3を使用するため、Python3もインストールされていることを確認してください。 次に、python virtualenvを作成し、 $ pip install nameko
pipinstallnamekoを実行します。
Namekoを実行するには、RabbitMQメッセージブローカーが必要です。 Namekoサービス間の通信を担当します。 ただし、マシンにもう1つの依存関係をインストールする必要がないため、心配する必要はありません。 Dockerを使用すると、事前構成されたイメージをダウンロードして実行し、完了したらコンテナーを停止するだけです。 デーモン、 apt-get
またはdnf install
はありません。
$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
を実行してRabbitMQコンテナーを開始します(これを行うにはsudoが必要になる場合があります)。 これにより、最新バージョン3のRabbitMQを使用してDockerコンテナーが起動し、デフォルトのポート5672を介して公開されます。
マイクロサービスを使用したHelloWorld
先に進み、次の内容のhello.py
というファイルを作成します。
from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): return "Hello, {}!".format(name)
Namekoサービスはクラスです。 これらのクラスは、拡張機能として実装されているエントリポイントを公開します。 組み込みの拡張機能には、RPCメソッド、イベントリスナー、HTTPエンドポイント、またはタイマーを表すエントリポイントを作成する機能が含まれています。 PostgreSQLデータベースやRedisなどとやり取りするために使用できるコミュニティ拡張機能もあります。独自の拡張機能を作成することができます。
先に進んで、例を実行してみましょう。 デフォルトのポートでRabbitMQを実行している場合は、 $ nameko run hello
だけです。 RabbitMQを見つけて、自動的に接続します。 次に、サービスをテストするには、別のターミナルで$ nameko shell
を実行します。 これにより、同じRabbitMQインスタンスに接続するインタラクティブシェルが作成されます。 すばらしいのは、RPC over AMQPを使用することで、Namekoが自動サービス検出を実装していることです。 RPCメソッドを呼び出すとき、namekoは対応する実行中のサービスを見つけようとします。
Namekoシェルを実行すると、名前空間にn
という特別なオブジェクトが追加されます。 このオブジェクトを使用すると、イベントをディスパッチしてRPC呼び出しを実行できます。 サービスへのRPC呼び出しを行うには、次を実行します。
> >> n.rpc.greetingservice.hello(name='world') 'Hello, world!'
同時通話
これらのサービスクラスは、呼び出しが行われた瞬間にインスタンス化され、呼び出しが完了した後に破棄されます。 したがって、それらは本質的にステートレスである必要があります。つまり、呼び出し間でオブジェクトまたはクラスの状態を維持しようとしないでください。 これは、サービス自体がステートレスでなければならないことを意味します。 すべてのサービスがステートレスであるという前提で、Namekoはイベントレットグリーンスレッドを使用して同時実行性を活用できます。 インスタンス化されたサービスは「ワーカー」と呼ばれ、同時に実行されるワーカーの最大数を構成できます。
Namekoの同時実行性を実際に確認するには、応答を返す前にプロシージャ呼び出しにスリープを追加してソースコードを変更します。
from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}!".format(name)
非同期が有効になっていないtime
モジュールからのsleep
を使用しています。 ただし、 nameko run
を使用してサービスを実行すると、 sleep(5)
などのブロック呼び出しからのトリガーの結果に自動的にパッチが適用されます。
現在、プロシージャコールからの応答時間は約5秒かかると予想されます。 ただし、namekoシェルから実行すると、次のスニペットの動作はどうなりますか?
res = [] for i in range(5): hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) res.append(hello_res) for hello_res in res: print(hello_res.result())
Namekoは、RPCエントリポイントごとにノンブロッキングのcall_async
メソッドを提供し、プロキシ応答オブジェクトを返します。このオブジェクトは、結果を照会できます。 result
メソッドは、応答プロキシで呼び出されると、応答が返されるまでブロックされます。
予想どおり、この例は約5秒で実行されます。 各ワーカーはsleep
呼び出しが終了するのを待ってブロックされますが、これによって別のワーカーの開始が停止することはありません。 たとえば、このsleep
呼び出しを便利なブロッキングI / Oデータベース呼び出しに置き換えると、非常に高速な同時サービスが得られます。
先に説明したように、Namekoはメソッドが呼び出されたときにワーカーを作成します。 ワーカーの最大数は構成可能です。 デフォルトでは、その数は10に設定されています。上記のスニペットのrange(5)
を、たとえばrange(20)に変更してテストできます。 これにより、 hello
メソッドが20回呼び出され、実行に10秒かかるはずです。
> >> res = [] > >> for i in range(20): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! Hello, 2! Hello, 3! Hello, 4! Hello, 5! Hello, 6! Hello, 7! Hello, 8! Hello, 9! Hello, 10! Hello, 11! Hello, 12! Hello, 13! Hello, 14! Hello, 15! Hello, 16! Hello, 17! Hello, 18! Hello, 19!
ここで、そのhello
メソッドを呼び出す同時ユーザーが多すぎる(10人を超える)と仮定します。 一部のユーザーは、応答を予想される5秒以上待ってハングアップします。 1つの解決策は、たとえば構成ファイルを使用してデフォルト設定をオーバーライドすることにより、作業の数を増やすことでした。 ただし、呼び出されたメソッドが大量のデータベースクエリに依存しているために、サーバーがすでに10個のワーカーで制限に達している場合、ワーカーの数を増やすと、応答時間がさらに長くなる可能性があります。
サービスのスケーリング
より良い解決策は、Namekoマイクロサービス機能を使用することです。 これまで、1つのサーバー(コンピューター)のみを使用し、RabbitMQの1つのインスタンスとサービスの1つのインスタンスを実行していました。 実稼働環境では、呼び出しが多すぎるサービスを実行しているノードの数を任意に増やす必要があります。 メッセージブローカーの信頼性を高めたい場合は、RabbitMQクラスターを構築することもできます。
サービスのスケーリングをシミュレートするには、 $ nameko run hello
を使用して、別のターミナルを開き、以前と同じようにサービスを実行します。 これにより、さらに10人のワーカーを実行する可能性のある別のサービスインスタンスが開始されます。 ここで、 range(20)
を使用してそのスニペットを再度実行してみてください。 実行するのに再び5秒かかるはずです。 複数のサービスインスタンスが実行されている場合、Namekoは使用可能なインスタンス間でRPCリクエストをラウンドロビンします。
Namekoは、クラスター内のこれらのメソッド呼び出しを堅牢に処理するように構築されています。 それをテストするには、切り取ったものを実行してみてください。終了する前に、Namekoサービスを実行している端末の1つに移動し、 Ctrl+C
を2回押します。 これにより、ワーカーが終了するのを待たずにホストがシャットダウンされます。 Namekoは、呼び出しを別の利用可能なサービスインスタンスに再割り当てします。
実際には、後で説明するように、Dockerを使用してサービスをコンテナ化し、Kubernetesなどのオーケストレーションツールを使用して、サービスを実行しているノードやメッセージブローカーなどの他の依存関係を管理します。 Kubernetesを使用して正しく実行すれば、予期しないピークの影響を受けずに、堅牢な分散システムでアプリケーションを効果的に変換できます。 また、Kubernetesではダウンタイムゼロのデプロイが可能です。 したがって、新しいバージョンのサービスを展開しても、システムの可用性には影響しません。
実稼働環境では、特にデプロイメント中に、同じサービスの複数の異なるバージョンが同時に実行される可能性があるため、下位互換性を念頭に置いてサービスを構築することが重要です。 Kubernetesを使用する場合、デプロイ中に、実行中の新しいコンテナーが十分にある場合にのみ、古いバージョンのコンテナーがすべて強制終了されます。
Namekoの場合、同じサービスの複数の異なるバージョンを同時に実行しても問題はありません。 ラウンドロビン方式で呼び出しを分散するため、呼び出しは古いバージョンまたは新しいバージョンを通過する可能性があります。 これをテストするには、サービスが古いバージョンを実行している1つの端末を維持し、サービスモジュールを次のように編集します。
from time import sleep from nameko.rpc import rpc class GreetingService: name = "greeting_service" @rpc def hello(self, name): sleep(5) return "Hello, {}! (version 2)".format(name)
そのサービスを別の端末から実行すると、2つのバージョンが同時に実行されます。 ここで、テストスニペットを再度実行すると、両方のバージョンが表示されます。

> >> res = [] > >> for i in range(5): ... hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ... res.append(hello_res) > >> for hellores in res: ... print(hello_res.result()) Hello, 0! Hello, 1! (version 2) Hello, 2! Hello, 3! (version 2) Hello, 4!
複数のインスタンスの操作
これで、Namekoを効果的に操作する方法と、スケーリングがどのように機能するかがわかりました。 さらに一歩進んで、Dockerエコシステムのツールであるdocker-composeをさらに使用してみましょう。 これは、単一のサーバーにデプロイする場合に機能します。これは、マイクロサービスアーキテクチャの利点の多くを活用しないため、絶対に理想的ではありません。 繰り返しになりますが、より適切なインフラストラクチャが必要な場合は、Kubernetesなどのオーケストレーションツールを使用して、コンテナーの分散システムを管理できます。 したがって、先に進んでdocker-composeをインストールします。
繰り返しになりますが、RabbitMQインスタンスをデプロイするだけで、すべてのサービスがそのRabbitMQインスタンスにアクセスできるため、残りはNamekoが処理します。 この例の完全なソースコードは、このGitHubリポジトリで入手できます。
Namekoの機能をテストするための簡単な旅行アプリケーションを作成しましょう。 そのアプリケーションは、空港や旅行の登録を可能にします。 各空港は単に空港の名前として保存され、旅行では出発地と目的地の空港のIDが保存されます。 システムのアーキテクチャは次のようになります。
理想的には、各マイクロサービスには独自のデータベースインスタンスがあります。 ただし、簡単にするために、TripsとAirportsの両方のマイクロサービスが共有する単一のRedisデータベースを作成しました。 ゲートウェイマイクロサービスは、単純なRESTのようなAPIを介してHTTPリクエストを受信し、RPCを使用して空港や旅行と通信します。
ゲートウェイマイクロサービスから始めましょう。 その構造は単純で、Flaskのようなフレームワークから来た人なら誰でもよく知っているはずです。 基本的に2つのエンドポイントを定義し、それぞれがGETメソッドとPOSTメソッドの両方を許可します。
import json from nameko.rpc import RpcProxy from nameko.web.handlers import http class GatewayService: name = 'gateway' airports_rpc = RpcProxy('airports_service') trips_rpc = RpcProxy('trips_service') @http('GET', '/airport/<string:airport_id>') def get_airport(self, request, airport_id): airport = self.airports_rpc.get(airport_id) return json.dumps({'airport': airport}) @http('POST', '/airport') def post_airport(self, request): data = json.loads(request.get_data(as_text=True)) airport_id = self.airports_rpc.create(data['airport']) return airport_id @http('GET', '/trip/<string:trip_id>') def get_trip(self, request, trip_id): trip = self.trips_rpc.get(trip_id) return json.dumps({'trip': trip}) @http('POST', '/trip') def post_trip(self, request): data = json.loads(request.get_data(as_text=True)) trip_id = self.trips_rpc.create(data['airport_from'], data['airport_to']) return trip_id
それでは、空港サービスを見てみましょう。 予想どおり、2つのRPCメソッドが公開されています。 get
メソッドは、Redisデータベースにクエリを実行し、指定されたIDの空港を返すだけです。 create
メソッドは、ランダムなIDを生成し、空港情報を保存して、IDを返します。
import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "airports_service" redis = Redis('development') @rpc def get(self, airport_id): airport = self.redis.get(airport_id) return airport @rpc def create(self, airport): airport_id = uuid.uuid4().hex self.redis.set(airport_id, airport) return airport_id
nameko_redis
拡張子をどのように使用しているかに注意してください。 コミュニティ拡張機能リストをご覧ください。 拡張機能は、依存性注入を採用する方法で実装されます。 Namekoは、各ワーカーが使用する実際の拡張オブジェクトの開始を処理します。
AirportsとTripsマイクロサービスの間に大きな違いはありません。 Tripsマイクロサービスは次のようになります。
import uuid from nameko.rpc import rpc from nameko_redis import Redis class AirportsService: name = "trips_service" redis = Redis('development') @rpc def get(self, trip_id): trip = self.redis.get(trip_id) return trip @rpc def create(self, airport_from_id, airport_to_id): trip_id = uuid.uuid4().hex self.redis.set(trip_id, { "from": airport_from_id, "to": airport_to_id }) return trip_id
各マイクロサービスのDockerfile
も非常に簡単です。 唯一の依存関係はnameko
であり、Airports and Tripsサービスの場合は、 nameko-redis
もインストールする必要があります。 これらの依存関係は、各サービスのrequirements.txt
に記載されています。 AirportsサービスのDockerfileは次のようになります。
FROM python:3 RUN apt-get update && apt-get -y install netcat && apt-get clean WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY config.yml ./ COPY run.sh ./ COPY airports.py ./ RUN chmod +x ./run.sh CMD ["./run.sh"]
それと他のサービスのDockerfileとの唯一の違いは、ソースファイル(この場合はairports.py
)であり、それに応じて変更する必要があります。
run.sh
スクリプトは、RabbitMQまで待機し、Airports and Tripsサービスの場合は、Redisデータベースの準備が整います。 次のスニペットは、空港のrun.sh
のコンテンツを示しています。 繰り返しになりますが、他のサービスについては、 aiports
からgateway
に変更するか、それに応じてtrips
します。
#!/bin/bash until nc -z ${RABBIT_HOST} ${RABBIT_PORT}; do echo "$(date) - waiting for rabbitmq..." sleep 1 done until nc -z ${REDIS_HOST} ${REDIS_PORT}; do echo "$(date) - waiting for redis..." sleep 1 done nameko run --config config.yml airports
これで、サービスを実行する準備が整いました。
$ docker-compose up
システムをテストしてみましょう。 次のコマンドを実行します。
$ curl -i -d "{\"airport\": \"first_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:05:53 GMT f2bddf0e506145f6ba0c28c247c54629
その最後の行は、空港用に生成されたIDです。 動作しているかどうかをテストするには、次のコマンドを実行します。
$curl localhost:8000/airport/f2bddf0e506145f6ba0c28c247c54629 {"airport": "first_airport"} Great, now let's add another airport: $ curl -i -d "{\"airport\": \"second_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:06:00 GMT 565000adcc774cfda8ca3a806baec6b5
今、私たちは2つの空港を手に入れました、それは旅行を形成するのに十分です。 今すぐ旅行を作成しましょう:
$ curl -i -d "{\"airport_from\": \"f2bddf0e506145f6ba0c28c247c54629\", \"airport_to\": \"565000adcc774cfda8ca3a806baec6b5\"}" localhost:8000/trip HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:09:10 GMT 34ca60df07bc42e88501178c0b6b95e4
前と同じように、その最後の行はトリップIDを表します。 正しく挿入されたかどうかを確認しましょう。
$ curl localhost:8000/trip/34ca60df07bc42e88501178c0b6b95e4 {"trip": "{'from': 'f2bddf0e506145f6ba0c28c247c54629', 'to': '565000adcc774cfda8ca3a806baec6b5'}"}
概要
NamekoがRabbitMQのローカル実行インスタンスを作成し、それに接続していくつかのテストを実行することにより、どのように機能するかを見てきました。 次に、得られた知識を適用して、マイクロサービスアーキテクチャを使用した単純なシステムを作成しました。
非常にシンプルですが、私たちのシステムは、本番環境に対応した展開の外観に非常に近いものです。 FalconやFlaskなどのHTTPリクエストを処理するには、別のフレームワークを使用することをお勧めします。 どちらも優れたオプションであり、たとえばゲートウェイサービスを中断したい場合に、他のHTTPベースのマイクロサービスを簡単に作成できます。 Flaskには、Namekoと対話するためのプラグインがすでにあるという利点がありますが、nameko-proxyを任意のフレームワークから直接使用できます。
ナメコもテストがとても簡単です。 ここでは簡単にするためにテストについては説明していませんが、Namekoのテストドキュメントを確認してください。
マイクロサービスアーキテクチャ内のすべての可動部分を使用して、堅牢なロギングシステムを確保する必要があります。 ビルドするには、Python Logging:仲間のToptalerとPython開発者による詳細なチュートリアル:SonNguyenKimを参照してください。