在 Celery for Python 中編排後台作業工作流
已發表: 2022-03-11現代 Web 應用程序及其底層系統比以往任何時候都更快、響應更快。 但是,仍然有很多情況下,您希望將繁重任務的執行轉移到整個系統架構的其他部分,而不是在主線程上處理它們。 識別此類任務就像檢查它們是否屬於以下類別之一一樣簡單:
- 定期任務 - 您將安排在特定時間或間隔後運行的作業,例如,每月報告生成或每天運行兩次的網絡爬蟲。
- 第三方任務——Web 應用程序必須快速為用戶提供服務,而無需在頁面加載時等待其他操作完成,例如,發送電子郵件或通知或將更新傳播到內部工具(例如為 A/B 測試或系統日誌收集數據) )。
- 長時間運行的作業——資源昂貴的作業,用戶在計算結果時需要等待,例如,複雜的工作流執行(DAG 工作流)、圖形生成、類似 Map-Reduce 的任務以及提供媒體內容(視頻、聲音的)。
執行後台任務的直接解決方案是在單獨的線程或進程中運行它。 Python 是一種高級圖靈完備的編程語言,遺憾的是它沒有提供與 Erlang、Go、Java、Scala 或 Akka 相匹配的規模的內置並發。 這些基於 Tony Hoare 的通信順序過程 (CSP)。 另一方面,Python 線程由全局解釋器鎖 (GIL) 協調和調度,這可以防止多個本機線程同時執行 Python 字節碼。 擺脫 GIL 是 Python 開發人員經常討論的話題,但這不是本文的重點。 Python 中的並發編程是過時的,儘管歡迎您在 Toptaler Marcus McCurdy 的Python 多線程教程中閱讀它。 因此,一致地設計進程之間的通信是一個容易出錯的過程,會導致代碼耦合和系統可維護性差,更不用說它會對可伸縮性產生負面影響。 此外,Python 進程是操作系統(OS)下的正常進程,與整個 Python 標準庫一起成為重量級。 隨著應用程序中進程數量的增加,從一個這樣的進程切換到另一個進程變得非常耗時。
為了更好地理解 Python 的並發性,請觀看 David Beazley 在 PyCon'15 上的精彩演講。
一個更好的解決方案是服務於分佈式隊列或其眾所周知的兄弟範式,稱為發布-訂閱。 如圖 1 所示,有兩種類型的應用程序,其中一種稱為發布者發送消息,另一種稱為訂閱者接收消息。 這兩個代理不直接相互交互,甚至不知道彼此。 發布者將消息發送到中央隊列或代理,訂閱者從該代理接收感興趣的消息。 這種方法有兩個主要優點:
- 可擴展性——代理不需要在網絡中相互了解。 他們以主題為重點。 因此,這意味著每個都可以以異步方式繼續正常運行,而不管對方如何。
- 松耦合——每個代理都代表系統的一部分(服務、模塊)。 由於它們是鬆散耦合的,因此每個都可以在數據中心之外單獨擴展。
有很多消息系統支持這種範式並提供簡潔的 API,由 TCP 或 HTTP 協議驅動,例如 JMS、RabbitMQ、Redis Pub/Sub、Apache ActiveMQ 等。

什麼是芹菜?
Celery 是 Python 世界中最受歡迎的後台作業管理器之一。 Celery 與 RabbitMQ 或 Redis 等多個消息代理兼容,並且可以充當生產者和消費者。
Celery 是一個基於分佈式消息傳遞的異步任務隊列/作業隊列。 它專注於實時操作,但也支持調度。 執行單元,稱為任務,使用多處理、Eventlet 或 gevent 在一個或多個工作服務器上同時執行。 任務可以異步執行(在後台)或同步執行(等到準備好)。 – 芹菜項目
要開始使用 Celery,只需按照官方文檔中的分步指南進行操作。
本文的重點是讓您更好地了解 Celery 可以涵蓋哪些用例。 在本文中,我們不僅會展示有趣的示例,還會嘗試學習如何將 Celery 應用到實際任務中,例如後台郵件、報告生成、日誌記錄和錯誤報告。 我將分享我在模擬之外測試任務的方法,最後,我將提供一些官方文檔中沒有(很好)記錄的技巧,這些技巧我花了幾個小時的研究才自己發現。
如果您之前沒有使用過 Celery,我鼓勵您先按照官方教程進行嘗試。
激起你的食慾
如果這篇文章引起了您的興趣並讓您想立即深入研究代碼,那麼請關注此 GitHub 存儲庫以獲取本文中使用的代碼。 那裡的README
文件將為您提供運行和使用示例應用程序的快速而骯髒的方法。
芹菜的第一步
首先,我們將通過一系列實際示例向讀者展示 Celery 如何簡單而優雅地解決看似不平凡的任務。 所有示例都將在 Django 框架內呈現; 但是,它們中的大多數可以很容易地移植到其他 Python 框架(Flask、Pyramid)。
項目佈局由 Cookiecutter Django 生成; 但是,我只保留了一些依賴項,在我看來,這些依賴項有助於這些用例的開發和準備。 我還為這篇文章和應用程序刪除了不必要的模塊,以減少噪音並使代碼更易於理解。
- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
-
celery_uncovered/{toyex,tricks,advex}
包含我們將在這篇文章中介紹的不同應用程序。 每個應用程序都包含一組按所需的 Celery 理解水平組織的示例。 -
celery_uncovered/celery.py
定義了一個 Celery 實例。
文件: celery_uncovered/celery.py
:
from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
然後我們需要確保 Celery 將與 Django 一起啟動。 出於這個原因,我們將應用程序導入到celery_uncovered/__init__.py
中。
文件: celery_uncovered/__init__.py
:
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])
config/settings
是我們的應用和 Celery 的配置來源。 根據執行環境,Django 會啟動相應的設置: local.py
用於開發或test.py
用於測試。 如果需要,您還可以通過創建新的 python 模塊(例如prod.py
)來定義自己的環境。 芹菜配置以CELERY_
為前綴。 對於這篇文章,我將 RabbitMQ 配置為代理,將 SQLite 配置為結果 bac-end。
文件: config/local.py
:
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest:guest@localhost:5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'
場景 1 - 報告生成和導出
我們將介紹的第一個案例是報告生成和導出。 在此示例中,您將學習如何定義一個生成 CSV 報告的任務,並使用 celerybeat 定期安排它。
用例描述:從 GitHub 獲取每個選定時間段(日、週、月)的 500 個最熱門的存儲庫,按主題對它們進行分組,並將結果導出到 CSV 文件。
如果我們提供的 HTTP 服務將執行通過單擊標有“生成報告”的按鈕觸發的此功能,則應用程序將停止並等待任務完成,然後再發送回 HTTP 響應。 這是不好的。 我們希望我們的 Web 應用程序速度快,並且我們不希望我們的用戶在後端計算結果時等待。 與其等待結果產生,我們寧願通過 Celery 中的註冊隊列將任務排隊到工作進程,並用task_id
響應前端。 然後前端將使用task_id
以異步方式(例如,AJAX)查詢任務結果,並讓用戶隨時了解任務進度。 最後,當該過程完成時,可以將結果作為文件提供,以便通過 HTTP 下載。
實施細節
首先,讓我們將流程分解為盡可能小的單元並創建一個管道:
- Fetchers是負責從 GitHub 服務獲取存儲庫的工作人員。
- 聚合器是負責將結果合併到一個列表中的工作人員。
- Importer是生成 GitHub 中最熱門存儲庫的 CSV 報告的工作人員。

獲取存儲庫是使用 GitHub 搜索 API GET /search/repositories
的 HTTP 請求。 但是,應該處理 GitHub API 服務的一個限制:該 API 每個請求最多返回 100 個存儲庫,而不是 500 個。我們可以一次發送 5 個請求,但我們不想讓用戶等待五個單獨的請求,因為 HTTP 請求是 I/O 綁定操作。 相反,我們可以使用適當的頁面參數執行五個並發 HTTP 請求。 所以頁面將在 [1..5] 範圍內。 讓我們在toyex/tasks.py
模塊中定義一個名為fetch_hot_repos/3 -> list
的任務:
文件: celery_uncovered/toyex/local.py
@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items
所以fetch_hot_repos
創建一個對 GitHub API 的請求,並用一個存儲庫列表來響應用戶。 它接收三個參數來定義我們的請求負載:
-
since
- 在創建日期過濾存儲庫。 -
per_page
— 每個請求返回的結果數(限制為 100)。 -
page
--- 請求的頁碼(在 [1..5] 範圍內)。
注意:為了使用 GitHub Search API,您需要一個 OAuth 令牌來通過身份驗證檢查。 在我們的例子中,它保存在GITHUB_OAUTH
下的設置中。
接下來,我們需要定義一個主任務,負責匯總結果並將其導出到 CSV 文件中: produce_hot_repo_report_task/2->filepath:
文件: celery_uncovered/toyex/local.py
@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)
此任務使用celery.canvas.group
執行fetch_hot_repos/3
的五個並發調用。 等待這些結果,然後簡化為存儲庫對象列表。 然後我們的結果集按主題分組,最後導出到MEDIA_ROOT/
目錄下生成的 CSV 文件。
為了定期調度任務,您可能需要在配置文件的調度列表中添加一個條目:
文件: config/local.py
from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }
試一試
為了啟動和測試任務是如何工作的,首先我們需要啟動 Celery 進程:
$ celery -A celery_uncovered worker -l info
接下來,我們需要創建celery_uncovered/media/
目錄。 然後,您將能夠通過 Shell 或 Celerybeat 測試其功能:
殼牌:
from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)
芹菜節拍:
# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info
您可以在MEDIA_ROOT/
目錄下查看結果。
場景 2 - 通過電子郵件報告服務器 500錯誤
Celery 最常見的用例之一是發送電子郵件通知。 電子郵件通知是一種離線 I/O 綁定操作,它利用本地 SMTP 服務器或第三方 SES。 有許多涉及發送電子郵件的用例,對於其中的大多數,用戶無需等到此過程完成即可接收 HTTP 響應。 這就是為什麼首選在後台執行此類任務並立即響應用戶的原因。

用例描述:通過 Celery 向管理員電子郵件報告 50X 錯誤。
Python 和 Django 具有執行系統日誌記錄的必要背景。 我不會詳細介紹 Python 的日誌記錄實際上是如何工作的。 但是,如果您以前從未嘗試過或需要復習,請閱讀內置日誌記錄模塊的文檔。 您肯定希望在您的生產環境中使用它。 Django 有一個名為 AdminEmailHandler 的特殊記錄器處理程序,它會為它收到的每條日誌消息向管理員發送電子郵件。
實施細節
主要思想是擴展AdminEmailHandler
類的send_mail
方法,使其可以通過 Celery 發送郵件。 這可以如下圖所示完成:

首先,我們需要設置一個名為report_error_task
的任務,它使用提供的主題和消息調用mail_admins
:
文件: celery_uncovered/toyex/tasks.py
@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)
接下來,我們實際上擴展了 AdminEmailHandler 以便它在內部只調用定義的 Celery 任務:
文件: celery_uncovered/toyex/admin_email.py
from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)
最後,我們需要設置日誌記錄。 Django 中的日誌配置相當簡單。 您需要覆蓋LOGGING
以便日誌引擎開始使用新定義的處理程序:
文件config/settings/local.py
LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }
請注意,我有意設置處理程序過濾器require_debug_true
以便在應用程序在調試模式下運行時測試此功能。
試一試
為了測試它,我準備了一個 Django 視圖,它在localhost:8000/report-error
提供“除零”操作。 您還需要啟動 MailHog Docker 容器來測試電子郵件是否實際發送。
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)
額外細節
作為一個郵件測試工具,我設置了 MailHog 並配置了 Django 郵件以將其用於 SMTP 傳遞。 有很多方法可以部署和運行 MailHog。 我決定使用 Docker 容器。 您可以在相應的 README 文件中找到詳細信息:
文件: docker/mailhog/README.md
$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025
要將您的應用程序配置為使用 MailHog,您需要在配置中添加以下行:
文件: config/settings/local.py
EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')
超越默認的 Celery 任務
Celery 任務可以由任何可調用函數創建。 默認情況下,任何用戶定義的任務都被注入celery.app.task.Task
作為父(抽象)類。 此類包含異步運行任務(通過網絡將其傳遞給 Celery worker)或同步(用於測試目的)、創建簽名和許多其他實用程序的功能。 在接下來的示例中,我們將嘗試擴展Celery.app.task.Task
,然後將其用作基類,以便為我們的任務添加一些有用的行為。
場景 3 - 每個任務的文件記錄
在我的一個項目中,我正在開發一個應用程序,該應用程序為最終用戶提供類似於提取、轉換、加載 (ETL) 的工具,該工具能夠攝取並過濾大量分層數據。 後端分為兩個模塊:
- 用 Celery 編排數據處理管道
- 使用 Go 進行數據處理
Celery 部署了一個 Celerybeat 實例和 40 多個工作人員。 有 20 多個不同的任務組成了管道和編排活動。 每個這樣的任務都可能在某個時候失敗。 所有這些故障都被轉儲到每個工人的系統日誌中。 在某些時候,調試和維護 Celery 層開始變得不方便。 最終,我們決定將任務日誌隔離到一個特定於任務的文件中。
用例描述:擴展 Celery 以便每個任務將其標準輸出和錯誤記錄到文件中
Celery 為 Python 應用程序提供了對其內部工作的極大控制。 它附帶一個熟悉的信號框架。 使用 Celery 的應用程序可以訂閱其中的一些,以增強某些操作的行為。 我們將利用任務級信號來提供對單個任務生命週期的詳細跟踪。 Celery 總是帶有一個日誌記錄後端,我們將從它中受益,同時只在少數幾個地方稍微覆蓋以實現我們的目標。
實施細節
Celery 已經支持每個任務的日誌記錄。 要保存到文件,有必要將日誌輸出分派到正確的位置。 在我們的例子中,任務的正確位置是與任務名稱匹配的文件。 在 Celery 實例上,我們將使用動態推斷的日誌處理程序覆蓋內置的日誌配置。 可以訂閱celeryd_after_setup
信號,然後在那裡配置系統日誌記錄:
文件: celery_uncovered/toyex/celery_conf.py
@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + ".log")) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)
請注意,對於在 Celery 應用程序中註冊的每個任務,我們正在構建一個相應的記錄器及其處理程序。 每個處理程序都是logging.FileHandler
類型,因此每個這樣的實例都接收一個文件名作為輸入。 運行此程序所需要做的就是將此模塊導入文件末尾的celery_uncovered/celery.py
:
import celery_uncovered.tricks.celery_conf
可以通過調用get_task_logger(task_name)
來接收特定的任務記錄器。 為了概括每個任務的這種行為,有必要使用一些實用方法稍微擴展celery.current_app.Task
:
文件: celery_uncovered/tricks/celery_ext.py
class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)
現在,在調用task.log_msg("Hello, my name is: %s", task.request.id)
的情況下,日誌輸出將被路由到任務名稱下的相應文件。
試一試
為了啟動和測試這個任務是如何工作的,首先啟動 Celery 進程:
$ celery -A celery_uncovered worker -l info
然後您將能夠通過 Shell 測試功能:
from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)
最後,要查看結果,導航到celery_uncovered/logs
目錄並打開名為celery_uncovered.tricks.tasks.add.log
的相應日誌文件。 多次運行此任務後,您可能會看到類似以下內容:
Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...
場景 4 - 範圍感知任務
讓我們想像一個基於 Celery 和 Django 構建的面向國際用戶的 Python 應用程序。 用戶可以設置他們使用您的應用程序的語言(區域設置)。
您必須設計一個多語言、區域感知的電子郵件通知系統。 要發送電子郵件通知,您已經註冊了一個由特定隊列處理的特殊 Celery 任務。 此任務接收一些關鍵參數作為輸入和當前用戶區域設置,以便以用戶選擇的語言發送電子郵件。
現在想像我們有很多這樣的任務,但是每個任務都接受一個語言環境參數。 在這種情況下,在更高的抽象層次上解決它不是更好嗎? 在這裡,我們看到瞭如何做到這一點。
用例描述:自動從一個執行上下文繼承作用域,並將其作為參數注入到當前執行上下文中。
實施細節
同樣,正如我們對任務日誌所做的那樣,我們想要擴展一個基本任務類celery.current_app.Task
並覆蓋一些負責調用任務的方法。 出於本演示的目的,我將覆蓋celery.current_app.Task::apply_async
方法。 該模塊還有一些額外的任務可以幫助您製作功能齊全的替代品。
文件: celery_uncovered/tricks/celery_ext.py
class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)
關鍵線索是默認將當前語言環境作為鍵值參數傳遞給調用任務。 如果使用特定語言環境作為參數調用任務,則它不會更改。
試一試
為了測試這個功能,讓我們定義一個ScopeBasedTask
類型的虛擬任務。 它通過語言環境 ID 定位文件並將其內容讀取為 JSON:
文件: celery_uncovered/tricks/tasks.py
@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ["locales", "sc_%i.json" % kwargs['scenario_id']] return read_fixture(*fixture_parts)
現在您需要做的是重複啟動 Celery、啟動 shell 以及在不同場景下測試此任務的執行的步驟。 Fixtures 位於celery_uncovered/tricks/fixtures/locales/
目錄下。
結論
這篇文章旨在從不同的角度探索 Celery。 我在常規示例中演示了 Celery,例如郵件和報告生成,以及一些有趣的小眾業務用例的共享技巧。 Celery 建立在數據驅動的理念之上,您的團隊可以通過將其作為系統堆棧的一部分來簡化他們的生活。 如果你有基本的 Python 經驗,開發基於 Celery 的服務並不是很複雜,而且你應該能夠很快上手。 默認配置足以滿足大多數用途,但如果需要,它們可以非常靈活。
我們的團隊選擇使用 Celery 作為後台作業和長時間運行任務的編排後端。 我們將它廣泛用於各種用例,本文中只提到了其中的幾個。 我們每天攝取和分析千兆字節的數據,但這只是水平擴展技術的開始。