记录组级订阅过滤器 - Amazon CloudWatch 日志
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

记录组级订阅过滤器

你可以将订阅过滤器与 Kinesis Data Streams、Lambda 或 Firehose 一起使用。通过订阅筛选条件发送到接收服务的日志将经过 base64 编码并使用 gzip 格式进行压缩。

您可以使用筛选条件和模式语法搜索日志数据。

示例 1:Kinesis Data Streams 订阅筛选条件

以下示例将订阅过滤器与包含 Amazon CloudTrail 事件的日志组相关联。订阅过滤器会将通过 “根” Amazon 凭据记录的所有活动发送到 Kinesis Data Streams 中名为 “RootAccess” 的直播中。有关如何向 CloudWatch 日志发送 Amazon CloudTrail 事件的更多信息,请参阅Amazon CloudTrail 用户指南中的向 CloudWatch 日志发送 CloudTrail 事件

注意

在创建 流之前,请计算将生成日志数据的卷。请确保创建具有足够分片的 流来处理此卷。如果流没有足够的分片,日志流将受限制。有关流卷限制的更多信息,请参阅限额和限制

受限的交付项最多可在 24 小时内重试。24 小时后,失败的交付项将被丢弃。

要减轻节流的风险,您可以执行以下步骤:

  • 使用 CloudWatch 指标监控您的直播。这可以帮助您识别任何节流并相应地调整配置。例如,该DeliveryThrottling指标可用于跟踪在将数据转发到订阅目标时限制 CloudWatch 日志的日志事件的数量。有关监控的更多信息,请参阅使用 CloudWatch 指标进行监控

  • 为您在 Kinesis Data Streams 中的流使用按需容量模式。按需模式会随着工作负载的增加或减少立即进行调整。有关按需容量模式的更多信息,请参阅按需模式

  • 限制您的 CloudWatch 订阅筛选模式,使其与 Kinesis Data Streams 中的直播容量相匹配。如果您向流发送的数据过多,则可能需要减小筛选器大小或调整筛选条件。

为 Kinesis Data Streams 创建订阅筛选条件
  1. 使用以下命令创建目标 流:

    $ C:\> aws kinesis create-stream --stream-name "RootAccess" --shard-count 1
  2. 请耐心等待,直到 流变为活动状态(这可能需要一两分钟)。你可以使用以下 Kinesis Data St reams describe-stream 命令来检查。StreamDescription StreamStatus财产。此外,请注意 StreamDescription.streamArn 的值,因为您将在以后的步骤中使用它:

    aws kinesis describe-stream --stream-name "RootAccess"

    下面是示例输出:

    { "StreamDescription": { "StreamStatus": "ACTIVE", "StreamName": "RootAccess", "StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/RootAccess", "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49551135218688818456679503831981458784591352702181572610" } } ] } }
  3. 创建 IAM 角色,该角色将授予 CloudWatch 日志将数据放入您的直播的权限。首先,您需要在文件 (例如 ~/TrustPolicyForCWL-Kinesis.json) 中创建信任策略。使用文本编辑器创建此策略。请勿使用 IAM 控制台来创建策略。

    此策略包括 aws:SourceArn 全局条件上下文密钥,有助于避免出现混淆代理安全问题。有关更多信息,请参阅 混淆代理问题防范

    { "Statement": { "Effect": "Allow", "Principal": { "Service": "logs.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringLike": { "aws:SourceArn": "arn:aws:logs:region:123456789012:*" } } } }
  4. 使用 create-role 命令创建 IAM 角色,并指定信任策略文件。请记下返回的 Role.Arn 值,因为后面的步骤中将会用到它:

    aws iam create-role --role-name CWLtoKinesisRole --assume-role-policy-document file://~/TrustPolicyForCWL-Kinesis.json

    下面是输出的一个示例。

    { "Role": { "AssumeRolePolicyDocument": { "Statement": { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "logs.amazonaws.com" }, "Condition": { "StringLike": { "aws:SourceArn": { "arn:aws:logs:region:123456789012:*" } } } } }, "RoleId": "AAOIIAH450GAB4HC5F431", "CreateDate": "2015-05-29T13:46:29.431Z", "RoleName": "CWLtoKinesisRole", "Path": "/", "Arn": "arn:aws:iam::123456789012:role/CWLtoKinesisRole" } }
  5. 创建权限策略以定义 CloudWatch 日志可以对您的账户执行的操作。首先,您将在文件 (例如 ~/PermissionsForCWL-Kinesis.json) 中创建权限策略。使用文本编辑器创建此策略。请勿使用 IAM 控制台来创建策略。

    { "Statement": [ { "Effect": "Allow", "Action": "kinesis:PutRecord", "Resource": "arn:aws:kinesis:region:123456789012:stream/RootAccess" } ] }
  6. 使用以下put-role-policy命令将权限策略与角色关联:

    aws iam put-role-policy --role-name CWLtoKinesisRole --policy-name Permissions-Policy-For-CWL --policy-document file://~/PermissionsForCWL-Kinesis.json
  7. 在直播处于 “活动” 状态且您已创建 IAM 角色后,您可以创建 CloudWatch 日志订阅筛选器。订阅筛选条件将立即让实时日志数据开始从所选日志组流动到您的 流:

    aws logs put-subscription-filter \ --log-group-name "CloudTrail/logs" \ --filter-name "RootAccess" \ --filter-pattern "{$.userIdentity.type = Root}" \ --destination-arn "arn:aws:kinesis:region:123456789012:stream/RootAccess" \ --role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisRole"
  8. 设置订阅过滤器后,Lo CloudWatch gs 会将所有与过滤器模式匹配的传入日志事件转发到您的直播中。您可以抓取 Kinesis Data Streams 分片迭代器,并使用 Kinesis Data Streams get-records 命令提取一些 Kinesis Data Streams 记录以验证是否发生该过程:

    aws kinesis get-shard-iterator --stream-name RootAccess --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON
    { "ShardIterator": "AAAAAAAAAAFGU/kLvNggvndHq2UIFOw5PZc6F01s3e3afsSscRM70JSbjIefg2ub07nk1y6CDxYR1UoGHJNP4m4NFUetzfL+wev+e2P4djJg4L9wmXKvQYoE+rMUiFq+p4Cn3IgvqOb5dRA0yybNdRcdzvnC35KQANoHzzahKdRGb9v4scv+3vaq+f+OIK8zM5My8ID+g6rMo7UKWeI4+IWiK2OSh0uP" }
    aws kinesis get-records --limit 10 --shard-iterator "AAAAAAAAAAFGU/kLvNggvndHq2UIFOw5PZc6F01s3e3afsSscRM70JSbjIefg2ub07nk1y6CDxYR1UoGHJNP4m4NFUetzfL+wev+e2P4djJg4L9wmXKvQYoE+rMUiFq+p4Cn3IgvqOb5dRA0yybNdRcdzvnC35KQANoHzzahKdRGb9v4scv+3vaq+f+OIK8zM5My8ID+g6rMo7UKWeI4+IWiK2OSh0uP"

    请注意,您可能需要在 Kinesis Data Streams 开始返回数据之前调用几次此命令。

    您将看到包含一组记录的响应。Kinesis Data Streams 记录中的数据属性采用 Base64 编码并使用 gzip 格式进行压缩。您可使用以下 Unix 命令检查命令行中的原始数据:

    echo -n "<Content of Data>" | base64 -d | zcat

    Base64 解码和解压缩数据被格式化为 JSON 并具有以下结构:

    { "owner": "111111111111", "logGroup": "CloudTrail/logs", "logStream": "111111111111_CloudTrail/logs_us-east-1", "subscriptionFilters": [ "Destination" ], "messageType": "DATA_MESSAGE", "logEvents": [ { "id": "31953106606966983378809025079804211143289615424298221568", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" }, { "id": "31953106606966983378809025079804211143289615424298221569", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" }, { "id": "31953106606966983378809025079804211143289615424298221570", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" } ] }

    上述数据架构中的关键元素如下:

    owner

    原始日志数据的 Amazon 账户 ID。

    logGroup

    原始日志数据的日志组名称。

    logStream

    原始日志数据的日志流名称。

    subscriptionFilters

    与原始日志数据匹配的订阅筛选条件名称的列表。

    messageType

    数据消息将使用“DATA_MESSAGE”类型。有时, CloudWatch 日志可能会发出 “CONTROL_MESSAGE” 类型的 Kinesis Data Streams 记录,主要用于检查目标是否可达。

    logEvents

    表示为一组日志事件记录的实际日志数据。“id”属性是每个日志事件的唯一标识符。

示例 2:带有以下内容的订阅过滤器 Amazon Lambda

在此示例中,您将创建一个 Lo CloudWatch gs 订阅过滤器,用于将日志数据发送到您的 Amazon Lambda 函数。

注意

在创建 Lambda 函数之前,请计算将生成的日志数据量。请确保创建一个处理此卷的函数。如果函数没有足够的卷,日志流将受限制。有关 Lambda 限制的更多信息,请参阅 Amazon Lambda 限制

为 Lambda 创建订阅筛选条件
  1. 创建 Amazon Lambda 函数。

    确保您已设置 Lambda 执行角色。有关更多信息,请参阅 Amazon Lambda 开发人员指南中的步骤 2.2:创建 IAM 角色(执行角色)

  2. 打开文本编辑器,并用以下内容创建名为 helloWorld.js 的文件:

    var zlib = require('zlib'); exports.handler = function(input, context) { var payload = Buffer.from(input.awslogs.data, 'base64'); zlib.gunzip(payload, function(e, result) { if (e) { context.fail(e); } else { result = JSON.parse(result.toString()); console.log("Event Data:", JSON.stringify(result, null, 2)); context.succeed(); } }); };
  3. 压缩 helloWorld.js 文件,并用名称 helloWorld.zip 保存它。

  4. 使用以下命令,其中的角色是您在第一步中设置的 Lambda 执行角色:

    aws lambda create-function \ --function-name helloworld \ --zip-file fileb://file-path/helloWorld.zip \ --role lambda-execution-role-arn \ --handler helloWorld.handler \ --runtime nodejs12.x
  5. 授 CloudWatch 予 Logs 执行函数的权限。使用以下命令,将占位符账户替换为您自己的账户,将占位符日志组替换为要处理的日志组:

    aws lambda add-permission \ --function-name "helloworld" \ --statement-id "helloworld" \ --principal "logs.amazonaws.com" \ --action "lambda:InvokeFunction" \ --source-arn "arn:aws:logs:region:123456789123:log-group:TestLambda:*" \ --source-account "123456789012"
  6. 使用以下命令创建订阅筛选条件,将占位符账户替换为您自己的账户,将占位符日志组替换为要处理的日志组:

    aws logs put-subscription-filter \ --log-group-name myLogGroup \ --filter-name demo \ --filter-pattern "" \ --destination-arn arn:aws:lambda:region:123456789123:function:helloworld
  7. (可选) 使用样本日志事件进行测试。在出现命令提示符时,运行以下命令,以将一条简单的日志消息放入已订阅的流中。

    要查看 Lambda 函数的输出,请导航至 Lambda 函数,在此处可以查看 /aws/lambda/helloworld 中的输出:

    aws logs put-log-events --log-group-name myLogGroup --log-stream-name stream1 --log-events "[{\"timestamp\":<CURRENT TIMESTAMP MILLIS> , \"message\": \"Simple Lambda Test\"}]"

    您将看到包含一组 Lambda 的响应。Lambda 记录中的数据属性采用 base64 编码并使用 gzip 格式进行压缩。Lambda 接收的实际负载采用以下格式:{ "awslogs": {"data": "BASE64ENCODED_GZIP_COMPRESSED_DATA"} }。您可以使用以下 Unix 命令检查命令行中的原始数据:

    echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat

    Base64 解码和解压缩数据被格式化为 JSON 并具有以下结构:

    { "owner": "123456789012", "logGroup": "CloudTrail", "logStream": "123456789012_CloudTrail_us-east-1", "subscriptionFilters": [ "Destination" ], "messageType": "DATA_MESSAGE", "logEvents": [ { "id": "31953106606966983378809025079804211143289615424298221568", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" }, { "id": "31953106606966983378809025079804211143289615424298221569", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" }, { "id": "31953106606966983378809025079804211143289615424298221570", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" } ] }

    上述数据架构中的关键元素如下:

    owner

    原始日志数据的 Amazon 账户 ID。

    logGroup

    原始日志数据的日志组名称。

    logStream

    原始日志数据的日志流名称。

    subscriptionFilters

    与原始日志数据匹配的订阅筛选条件名称的列表。

    messageType

    数据消息将使用“DATA_MESSAGE”类型。有时, CloudWatch 日志可能会发出 “CONTROL_MESSAGE” 类型的 Lambda 记录,主要用于检查目标是否可达。

    logEvents

    表示为一组日志事件记录的实际日志数据。“id”属性是每个日志事件的唯一标识符。

示例 3:使用 Amazon Data Firehose 的订阅筛选条件

在此示例中,您将创建一个 CloudWatch 日志订阅,该订阅将与您定义的筛选条件匹配的所有传入日志事件发送到您的 Amazon Data Firehose 传输流。从 CloudWatch 日志发送到 Amazon Data Firehose 的数据已使用 gzip 第 6 级压缩进行压缩,因此您无需在 Firehose 传输流中使用压缩。然后,您可以使用 Firehose 中的解压缩功能自动解压缩日志。有关更多信息,请参阅使用日志写入 Kinesis Data CloudWatch Fire hose。

注意

在创建 Firehose 流之前,请计算将生成的日志数据量。请务必创建能够处理此音量的 Firehose 直播。如果流无法处理卷,则日志流将受限制。有关 Firehose 直播容量限制的更多信息,请参阅亚马逊数据 Firehose 数据限制。

为 Firehose 创建订阅过滤器
  1. 创建 Amazon Simple Storage Service (Amazon S3) 存储桶 我们建议您使用专为 Logs 创建的存储 CloudWatch 桶。但是,如果要使用现有存储桶,请跳至第 2 步。

    运行以下命令,将占位符区域替换为您想使用的区域:

    aws s3api create-bucket --bucket my-bucket --create-bucket-configuration LocationConstraint=region

    下面是示例输出:

    { "Location": "/my-bucket" }
  2. 创建 IAM 角色,授予 Amazon Data Firehose 将数据放入您的 Amazon S3 存储桶的权限。

    有关更多信息,请参阅《亚马逊数据 Firehose 开发者指南》中的使用亚马逊数据 Firehose 控制访问权限

    首先,使用文本编辑器在文件 ~/TrustPolicyForFirehose.json 中创建一个信任策略,具体如下所示:

    { "Statement": { "Effect": "Allow", "Principal": { "Service": "firehose.amazonaws.com" }, "Action": "sts:AssumeRole" } }
  3. 使用 create-role 命令创建 IAM 角色,并指定信任策略文件。请记下返回的 Role.Arn 值,因为后面的步骤中将会用到它:

    aws iam create-role \ --role-name FirehosetoS3Role \ --assume-role-policy-document file://~/TrustPolicyForFirehose.json { "Role": { "AssumeRolePolicyDocument": { "Statement": { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "firehose.amazonaws.com" } } }, "RoleId": "AAOIIAH450GAB4HC5F431", "CreateDate": "2015-05-29T13:46:29.431Z", "RoleName": "FirehosetoS3Role", "Path": "/", "Arn": "arn:aws:iam::123456789012:role/FirehosetoS3Role" } }
  4. 创建权限策略以定义 Firehose 可以对您的账户执行哪些操作。首先,使用文本编辑器在文件 ~/PermissionsForFirehose.json 中创建权限策略:

    { "Statement": [ { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::my-bucket", "arn:aws:s3:::my-bucket/*" ] } ] }
  5. 使用以下 put-role-policy命令将权限策略与角色关联:

    aws iam put-role-policy --role-name FirehosetoS3Role --policy-name Permissions-Policy-For-Firehose --policy-document file://~/PermissionsForFirehose.json
  6. 按如下方式创建目标 Firehose 交付流,将 roLearn 和 Bucketarn 的占位符值替换为您创建的角色和存储桶 AR N:

    aws firehose create-delivery-stream \ --delivery-stream-name 'my-delivery-stream' \ --s3-destination-configuration \ '{"RoleARN": "arn:aws:iam::123456789012:role/FirehosetoS3Role", "BucketARN": "arn:aws:s3:::my-bucket"}'

    请注意,对于已交付的亚马逊 S3 对象,Firehose 会自动使用 YYYY/MM/DD/HH UTC 时间格式的前缀。您可以指定要添加在该时间格式前缀前面的额外前缀。如果前缀以正斜杠 (/) 结束,则在 Amazon S3 存储桶中显示为文件夹。

  7. 请耐心等待,直到流变为活动状态 (这可能需要几分钟时间)。你可以使用 Fire describe-delivery-streamhose 命令来检查。DeliveryStreamDescription DeliveryStreamStatus财产。此外,请注意DeliveryStreamDescription。 DeliveryStreamARN 值,因为你将在后面的步骤中需要它:

    aws firehose describe-delivery-stream --delivery-stream-name "my-delivery-stream" { "DeliveryStreamDescription": { "HasMoreDestinations": false, "VersionId": "1", "CreateTimestamp": 1446075815.822, "DeliveryStreamARN": "arn:aws:firehose:us-east-1:123456789012:deliverystream/my-delivery-stream", "DeliveryStreamStatus": "ACTIVE", "DeliveryStreamName": "my-delivery-stream", "Destinations": [ { "DestinationId": "destinationId-000000000001", "S3DestinationDescription": { "CompressionFormat": "UNCOMPRESSED", "EncryptionConfiguration": { "NoEncryptionConfig": "NoEncryption" }, "RoleARN": "delivery-stream-role", "BucketARN": "arn:aws:s3:::my-bucket", "BufferingHints": { "IntervalInSeconds": 300, "SizeInMBs": 5 } } } ] } }
  8. 创建 IAM 角色,该角色向 CloudWatch 日志授予将数据放入 Firehose 传输流的权限。首先,使用文本编辑器在文件 ~/TrustPolicyForCWL.json 中创建信任策略:

    此策略包括 aws:SourceArn 全局条件上下文密钥,有助于避免出现混淆代理安全问题。有关更多信息,请参阅 混淆代理问题防范

    { "Statement": { "Effect": "Allow", "Principal": { "Service": "logs.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringLike": { "aws:SourceArn": "arn:aws:logs:region:123456789012:*" } } } }
  9. 使用 create-role 命令创建 IAM 角色,并指定信任策略文件。请记下返回的 Role.Arn 值,因为后面的步骤中将会用到它:

    aws iam create-role \ --role-name CWLtoKinesisFirehoseRole \ --assume-role-policy-document file://~/TrustPolicyForCWL.json { "Role": { "AssumeRolePolicyDocument": { "Statement": { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "logs.amazonaws.com" }, "Condition": { "StringLike": { "aws:SourceArn": "arn:aws:logs:region:123456789012:*" } } } }, "RoleId": "AAOIIAH450GAB4HC5F431", "CreateDate": "2015-05-29T13:46:29.431Z", "RoleName": "CWLtoKinesisFirehoseRole", "Path": "/", "Arn": "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole" } }
  10. 创建权限策略以定义 CloudWatch 日志可以对您的账户执行的操作。首先,使用文本编辑器创建权限策略文件 (例如 ~/PermissionsForCWL.json):

    { "Statement":[ { "Effect":"Allow", "Action":["firehose:PutRecord"], "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/delivery-stream-name"] } ] }
  11. 使用 put-role-policy以下命令将权限策略与角色关联:

    aws iam put-role-policy --role-name CWLtoKinesisFirehoseRole --policy-name Permissions-Policy-For-CWL --policy-document file://~/PermissionsForCWL.json
  12. 在 Amazon Data Firehose 传输流处于活动状态并且您已创建 IAM 角色之后,您可以创建 CloudWatch 日志订阅筛选条件。订阅筛选器会立即启动实时日志数据从所选日志组流向您的 Amazon Data Firehose 传输流:

    aws logs put-subscription-filter \ --log-group-name "CloudTrail" \ --filter-name "Destination" \ --filter-pattern "{$.userIdentity.type = Root}" \ --destination-arn "arn:aws:firehose:region:123456789012:deliverystream/my-delivery-stream" \ --role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"
  13. 设置订阅筛选器后,Lo CloudWatch gs 会将所有与筛选模式匹配的传入日志事件转发到您的 Amazon Data Firehose 传输流。您的数据将根据在 Amazon Data Firehose 传输流中设置的时间缓冲间隔开始出现在您的 Amazon S3 中。经过足够的时间后,您可以通过检查您的 Amazon S3 存储桶验证您的数据。

    aws s3api list-objects --bucket 'my-bucket' --prefix 'firehose/' { "Contents": [ { "LastModified": "2015-10-29T00:01:25.000Z", "ETag": "\"a14589f8897f4089d3264d9e2d1f1610\"", "StorageClass": "STANDARD", "Key": "firehose/2015/10/29/00/my-delivery-stream-2015-10-29-00-01-21-a188030a-62d2-49e6-b7c2-b11f1a7ba250", "Owner": { "DisplayName": "cloudwatch-logs", "ID": "1ec9cf700ef6be062b19584e0b7d84ecc19237f87b5" }, "Size": 593 }, { "LastModified": "2015-10-29T00:35:41.000Z", "ETag": "\"a7035b65872bb2161388ffb63dd1aec5\"", "StorageClass": "STANDARD", "Key": "firehose/2015/10/29/00/my-delivery-stream-2015-10-29-00-35-40-7cc92023-7e66-49bc-9fd4-fc9819cc8ed3", "Owner": { "DisplayName": "cloudwatch-logs", "ID": "1ec9cf700ef6be062b19584e0b7d84ecc19237f87b6" }, "Size": 5752 } ] }
    aws s3api get-object --bucket 'my-bucket' --key 'firehose/2015/10/29/00/my-delivery-stream-2015-10-29-00-01-21-a188030a-62d2-49e6-b7c2-b11f1a7ba250' testfile.gz { "AcceptRanges": "bytes", "ContentType": "application/octet-stream", "LastModified": "Thu, 29 Oct 2015 00:07:06 GMT", "ContentLength": 593, "Metadata": {} }

    Amazon S3 对象中的数据以 gzip 格式压缩。您可以使用以下 Unix 命令检查命令行中的原始数据:

    zcat testfile.gz