ELK 到 AWS:轻松管理日志

已发表: 2022-03-11

Elasticsearch 是一个强大的软件解决方案,旨在快速搜索大量数据中的信息。 与 Logstash 和 Kibana 结合,形成了非正式的“ELK 堆栈”,通常用于收集、临时存储、分析和可视化日志数据。 通常还需要一些其他软件,例如 Filebeat 将日志从服务器发送到 Logstash,Elastalert 根据对存储在 Elasticsearch 中的数据运行的一些分析结果生成警报。

ELK Stack 很强大,但是……

我使用 ELK 管理日志的经验非常复杂。 一方面,它非常强大,而且它的能力范围令人印象深刻。 另一方面,设置起来很棘手,维护起来也很头疼。

事实是,Elasticsearch 总体来说非常好,可以用在各种各样的场景中; 它甚至可以用作搜索引擎! 由于它不是专门用于管理日志数据,因此需要更多的配置工作来自定义其行为以满足管理此类数据的特定需求。

设置 ELK 集群非常棘手,需要我尝试一些参数才能最终启动并运行。 然后是配置它的工作。 就我而言,我需要配置五种不同的软件(Filebeat、Logstash、Elasticsearch、Kibana 和 Elastalert)。 这可能是一项相当乏味的工作,因为我必须通读文档并调试不与下一个对话的链中的一个元素。 即使在您最终启动并运行集群之后,您仍然需要对其执行日常维护操作:打补丁、升级操作系统包、检查 CPU、RAM 和磁盘使用情况,根据需要进行细微调整等。

Logstash 更新后,我的整个 ELK 堆栈停止工作。 经过仔细检查,结果发现,出于某种原因,ELK 开发人员决定更改其配置文件中的关键字并将其复数。 那是最后一根稻草,并决定寻找更好的解决方案(至少是针对我的特定需求的更好解决方案)。

我想存储 Apache 和各种 PHP 和节点应用程序生成的日志,并解析它们以找到指示软件错误的模式。 我找到的解决方案如下:

  • 在目标上安装 CloudWatch 代理。
  • 配置 CloudWatch 代理以将日志发送到 CloudWatch 日志。
  • 触发 Lambda 函数的调用以处理日志。
  • 如果找到模式,Lambda 函数会将消息发布到 Slack 通道。
  • 在可能的情况下,将筛选器应用于 CloudWatch 日志组,以避免为每个日志调用 Lambda 函数(这可能会很快增加成本)。

而且,在高层次上,就是这样! 一个 100% 无服务器解决方案,无需任何维护即可正常工作,并且无需任何额外工作即可很好地扩展。 这种无服务器解决方案相对于服务器集群的优势有很多:

  • 本质上,您将定期在集群服务器上执行的所有日常维护操作现在都由云提供商负责。 任何底层服务器都会在你不知情的情况下为你打补丁、升级和维护。
  • 您不再需要监控集群,并将所有扩展问题委托给云提供商。 实际上,如上所述的无服务器设置将自动扩展,您无需执行任何操作!
  • 上述解决方案需要较少的配置,并且云提供商不太可能将重大更改带入配置格式。
  • 最后,编写一些 CloudFormation 模板将所有内容作为基础架构即代码非常容易。 做同样的事情来建立一个完整的 ELK 集群需要很多工作。

配置 Slack 警报

所以现在让我们进入细节! 让我们探索一下 CloudFormation 模板对于此类设置的外观,以及用于提醒工程师的 Slack webhook。 我们需要先配置所有的 Slack 设置,所以让我们深入研究一下。

 AWSTemplateFormatVersion: 2010-09-09 Description: Setup log processing Parameters: SlackWebhookHost: Type: String Description: Host name for Slack web hooks Default: hooks.slack.com SlackWebhookPath: Type: String Description: Path part of the Slack webhook URL Default: /services/YOUR/SLACK/WEBHOOK

您需要为此设置 Slack 工作区,请查看此 WebHooks for Slack 指南以获取更多信息。

创建 Slack 应用程序并配置传入挂钩后,挂钩 URL 将成为 CloudFormation 堆栈的参数。

 Resources: ApacheAccessLogGroup: Type: AWS::Logs::LogGroup Properties: RetentionInDays: 100 # Or whatever is good for you ApacheErrorLogGroup: Type: AWS::Logs::LogGroup Properties: RetentionInDays: 100 # Or whatever is good for you

在这里,我们创建了两个日志组:一个用于Apache 访问日志,另一个用于Apache 错误日志

我没有为日志数据配置任何生命周期机制,因为它超出了本文的范围。 在实践中,您可能希望缩短保留窗口并设计 S3 生命周期策略以在一段时间后将它们移至 Glacier。

用于处理访问日志的 Lambda 函数

现在让我们实现将处理 Apache 访问日志的 Lambda 函数。

 BasicLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

在这里,我们创建了一个将附加到 Lambda 函数的 IAM 角色,以允许它们执行其职责。 实际上, AWSLambdaBasicExecutionRole是(尽管它的名称)由 AWS 提供的 IAM 策略。 它只允许 Lambda 函数创建其日志组并在该组中创建日志流,然后将其自己的日志发送到 CloudWatch Logs。

 ProcessApacheAccessLogFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt BasicLambdaExecutionRole.Arn Runtime: python3.7 Timeout: 10 Environment: Variables: SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath Code: ZipFile: | import base64 import gzip import json import os from http.client import HTTPSConnection def handler(event, context): tmp = event['awslogs']['data'] # `awslogs.data` is base64-encoded gzip'ed JSON tmp = base64.b64decode(tmp) tmp = gzip.decompress(tmp) tmp = json.loads(tmp) events = tmp['logEvents'] for event in events: raw_log = event['message'] log = json.loads(raw_log) if log['status'][0] == "5": # This is a 5XX status code print(f"Received an Apache access log with a 5XX status code: {raw_log}") slack_host = os.getenv('SLACK_WEBHOOK_HOST') slack_path = os.getenv('SLACK_WEBHOOK_PATH') print(f"Sending Slack post to: host={slack_host}, path={slack_path}, url={url}, content={raw_log}") cnx = HTTPSConnection(slack_host, timeout=5) cnx.request("POST", slack_path, json.dumps({'text': raw_log})) # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg resp = cnx.getresponse() resp_content = resp.read() resp_code = resp.status assert resp_code == 200

所以在这里我们定义一个 Lambda 函数来处理 Apache 访问日志。 请注意,我没有使用 Apache 默认的通用日志格式。 我像这样配置了访问日志格式(您会注意到它本质上会生成格式为 JSON 的日志,这使得后续处理变得更加容易):

 LogFormat "{\"vhost\": \"%v:%p\", \"client\": \"%a\", \"user\": \"%u\", \"timestamp\": \"%{%Y-%m-%dT%H:%M:%S}t\", \"request\": \"%r\", \"status\": \"%>s\", \"size\": \"%O\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-Agent}i\"}" json

此 Lambda 函数是用 Python 3 编写的。它采用从 CloudWatch 发送的日志行并可以搜索模式。 在上面的示例中,它仅检测导致 5XX 状态代码的 HTTP 请求并将消息发布到 Slack 通道。

您可以在模式检测方面做任何您喜欢的事情,而且它是一种真正的编程语言 (Python),而不仅仅是 Logstash 或 Elastalert 配置文件中的正则表达式模式,这为您提供了很多实现复杂模式识别的机会.

修订控制

关于修订控制的简短说明:我发现在 CloudFormation 模板中为小型实用程序 Lambda 函数(例如这个函数)内嵌代码是完全可以接受和方便的。 当然,对于涉及许多 Lambda 函数和层的大型项目,这很可能不方便,您需要使用 SAM。

 ApacheAccessLogFunctionPermission: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref ProcessApacheAccessLogFunction Action: lambda:InvokeFunction Principal: logs.amazonaws.com SourceArn: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*

以上内容允许 CloudWatch Logs 调用您的 Lambda 函数。 提醒一句:我发现使用SourceAccount属性会导致与SourceArn发生冲突。

一般来说,当调用 Lambda 函数的服务在同一个 AWS 账户中时,我建议不要包含它。 SourceArn都会禁止其他账户调用 Lambda 函数。

 ApacheAccessLogSubscriptionFilter: Type: AWS::Logs::SubscriptionFilter DependsOn: ApacheAccessLogFunctionPermission Properties: LogGroupName: !Ref ApacheAccessLogGroup DestinationArn: !GetAtt ProcessApacheAccessLogFunction.Arn FilterPattern: "{$.status = 5*}"

订阅筛选器资源是 CloudWatch Logs 和 Lambda 之间的链接。 在这里,发送到ApacheAccessLogGroup的日志将被转发到我们上面定义的 Lambda 函数,但只有那些通过过滤模式的日志。 在这里,过滤器模式需要一些 JSON 作为输入(过滤器模式以 '{' 开头并以 '}' 结尾),并且仅当它具有以“5”开头的字段status时才会匹配日志条目。

这意味着我们只有在 Apache 返回的 HTTP 状态码是 500 码时才调用 Lambda 函数,这通常意味着正在发生一些非常糟糕的事情。 这样可以确保我们不会过多地调用 Lambda 函数,从而避免不必要的成本。

有关过滤模式的更多信息,请参阅 Amazon CloudWatch 文档。 CloudWatch 过滤器模式非常好,但显然不如 Grok 强大。

请注意DependsOn字段,它确保 CloudWatch Logs 可以在创建订阅之前实际调用 Lambda 函数。 这只是蛋糕上的一颗樱桃,它很可能是不必要的,因为在真实情况下,Apache 可能不会在至少几秒钟之前收到请求(例如:将 EC2 实例与负载均衡器链接,并获取负载平衡器将 EC2 实例的状态识别为健康)。

用于处理错误日志的 Lambda 函数

现在让我们看一下将处理 Apache 错误日志的 Lambda 函数。

 ProcessApacheErrorLogFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt BasicLambdaExecutionRole.Arn Runtime: python3.7 Timeout: 10 Environment: Variables: SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath Code: ZipFile: | import base64 import gzip import json import os from http.client import HTTPSConnection def handler(event, context): tmp = event['awslogs']['data'] # `awslogs.data` is base64-encoded gzip'ed JSON tmp = base64.b64decode(tmp) tmp = gzip.decompress(tmp) tmp = json.loads(tmp) events = tmp['logEvents'] for event in events: raw_log = event['message'] log = json.loads(raw_log) if log['level'] in ["error", "crit", "alert", "emerg"]: # This is a serious error message msg = log['msg'] if msg.startswith("PHP Notice") or msg.startswith("PHP Warning"): print(f"Ignoring PHP notices and warnings: {raw_log}") else: print(f"Received a serious Apache error log: {raw_log}") slack_host = os.getenv('SLACK_WEBHOOK_HOST') slack_path = os.getenv('SLACK_WEBHOOK_PATH') print(f"Sending Slack post to: host={slack_host}, path={slack_path}, url={url}, content={raw_log}") cnx = HTTPSConnection(slack_host, timeout=5) cnx.request("POST", slack_path, json.dumps({'text': raw_log})) # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg resp = cnx.getresponse() resp_content = resp.read() resp_code = resp.status assert resp_code == 200

第二个 Lambda 函数处理 Apache 错误日志,并且仅在遇到严重错误时才会向 Slack 发布消息。 在这种情况下,PHP 通知和警告消息的严重程度不足以触发警报。

同样,此函数要求 Apache 错误日志为 JSON 格式。 所以这是我一直在使用的错误日志格式字符串:

 ErrorLogFormat "{\"vhost\": \"%v\", \"timestamp\": \"%{cu}t\", \"module\": \"%-m\", \"level\": \"%l\", \"pid\": \"%-P\", \"tid\": \"%-T\", \"oserror\": \"%-E\", \"client\": \"%-a\", \"msg\": \"%M\"}"
 ApacheErrorLogFunctionPermission: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref ProcessApacheErrorLogFunction Action: lambda:InvokeFunction Principal: logs.amazonaws.com SourceArn: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:* SourceAccount: !Ref AWS::AccountId

此资源授予 CloudWatch Logs 调用您的 Lambda 函数的权限。

 ApacheErrorLogSubscriptionFilter: Type: AWS::Logs::SubscriptionFilter DependsOn: ApacheErrorLogFunctionPermission Properties: LogGroupName: !Ref ApacheErrorLogGroup DestinationArn: !GetAtt ProcessApacheErrorLogFunction.Arn FilterPattern: '{$.msg != "PHP Warning*" && $.msg != "PHP Notice*"}'

最后,我们使用 Apache 错误日志组的订阅过滤器将 CloudWatch Logs 与 Lambda 函数链接起来。 请注意过滤模式,它确保以“PHP 警告”或“PHP 通知”开头的消息的日志不会触发对 Lambda 函数的调用。

最后的想法、定价和可用性

关于成本的最后一句话:这个解决方案比运行 ELK 集群便宜得多。 存储在 CloudWatch 中的日志定价与 S3 相同,Lambda 允许每月 100 万次调用作为其免费套餐的一部分。 对于具有中等到大量流量的网站(假设您使用 CloudWatch Logs 过滤器),这可能就足够了,特别是如果您编码良好并且没有太多错误!

此外,请注意 Lambda 函数最多支持 1,000 个并发调用。 在撰写本文时,这是 AWS 中无法更改的硬限制。 但是,您可以预期对上述函数的调用将持续大约 30-40 毫秒。 这应该足够快以处理相当大的流量。 如果您的工作量如此之大以至于您达到了这个限制,您可能需要一个基于 Kinesis 的更复杂的解决方案,我可能会在以后的文章中介绍。


进一步阅读 Toptal 工程博客:

  • 使用 AWS SSM 的 SSH 日志记录和会话管理