查询 Amazon CloudTrail 日志
Amazon CloudTrail 是一个记录 Amazon 亚马逊云科技账户的 API 调用和事件。
CloudTrail 日志包含有关对 Amazon Web Services 进行的任何 API 调用的详细信息,包括控制台。CloudTrail 生成加密的日志文件并将其存储在 Amazon S3 中。有关更多信息,请参阅《Amazon CloudTrail 用户指南》。
注意
如果您需要跨账户、区域和日期对 CloudTrail 事件信息执行 SQL 查询,请考虑使用 CloudTrail Lake。CloudTrail Lake 是一种可用于创建跟踪的替代 Amazon 服务,可将来自企业的信息聚合到单个可搜索的事件数据存储中。它不使用 Amazon S3 存储桶存储,而是将事件存储在某个数据湖中,从而支持更丰富、更快的查询。您可以使用它创建 SQL 查询,以便在自定义时间范围内跨组织和区域搜索事件。由于 CloudTrail Lake 查询是在 CloudTrail 控制台中执行,因此使用 CloudTrail Lake 不需要 Athena。有关更多信息,请参阅 CloudTrail Lake 文档。
将 Athena 与 CloudTrail 日志结合使用是加强对 Amazon Web Service 活动进行分析的强有力方法。例如,您可以使用查询来确定趋势,并根据属性 (如源 IP 地址或用户) 进一步隔离活动。
常见的应用程序是使用 CloudTrail 日志分析运营活动的安全性和合规性。有关详细示例的信息,请参阅 Amazon 大数据博客文章:使用 Amazon CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动
您可以使用 Athena 通过指定日志文件的 LOCATION
直接从 Amazon S3 查询这些日志文件。您可以通过两种方式之一来执行此操作:
-
通过直接从 CloudTrail 控制台为 CloudTrail 日志文件创建表。
-
通过在 Athena 控制台中手动为 CloudTrail 日志文件创建表。
主题
了解 CloudTrail 日志和 Athena 表
在开始创建表之前,应了解有关 CloudTrail 以及它如何存储数据的更多信息。这有助于创建所需的表,无论您从 CloudTrail 控制台或从 Athena 创建它们都是如此。
CloudTrail 将日志另存为采用压缩 gzip 格式的 JSON 文本文件 (*.json.gzip)。日志文件的位置取决于您如何设置跟踪、您正在记录的一个 Amazon Web Services 区域 或多个区域以及其他因素。
有关日志存储位置、JSON 结构和记录文件内容的更多信息,请参阅《Amazon CloudTrail 用户指南》中的以下主题:
要收集日志并将其保存到 Amazon S3,请从 Amazon Web Services Management Console 启用 CloudTrail。有关更多信息,请参阅《Amazon CloudTrail 用户指南》中的 创建跟踪记录。
请记下您在其中保存日志的目标 Amazon S3 存储桶。将 LOCATION
子句替换为至 CloudTrail 日志位置的路径和要使用的一组对象。该示例使用特定账户的日志的 LOCATION
值,但您可以使用最适合您的应用程序的明确度程度。
例如:
-
要分析多个账户中的数据,您可以通过使用
LOCATION 's3://MyLogFiles/AWSLogs/'
,回滚LOCATION
说明符来指示所有AWSLogs
。 -
要分析特定日期、账户和区域中的数据,请使用
LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'.
如果使用对象层次结构中的最高级别,则可以在使用 Athena 查询时获得最高程度的灵活度。
使用 CloudTrail 控制台为 CloudTrail 日志创建 Athena 表
您可以创建未分区的 Athena 表,来直接从较旧的 CloudTrail 控制台查询 CloudTrail 日志。若要从 CloudTrail 控制台创建 Athena 表,您需要使用具有在 Athena 中创建表的足够权限的角色登录。
注意
您不能使用 CloudTrail 控制台为组织跟踪记录日志创建 Athena 表。相反,请使用 Athena 控制台手动创建表,以便指定正确的存储位置。有关组织跟踪的信息,请参阅《Amazon CloudTrail 用户指南》中的为组织创建跟踪记录。
-
有关设置 Athena 权限的信息,请参阅 设置。
-
有关创建带分区的表的信息,请参阅使用手动分区在 Athena 中为 CloudTrail 日志创建表。
使用 CloudTrail 控制台为 CloudTrail 跟踪记录创建 Athena 表
访问 https://console.aws.amazon.com/cloudtrail/
,打开 CloudTrail 控制台。 -
在导航窗格中,选择事件历史记录。
-
选择 Create Athena table(创建 Athena 表)。
-
对于 Storage location(存储位置),使用向下箭头选择其中存储了供跟踪查询的日志文件的 Amazon S3 存储桶。
注意
要查找与跟踪记录关联的存储桶的名称,请选择 CloudTrail 导航窗格中的 Trails(跟踪记录),然后查看跟踪记录的 S3 bucket(S3 存储桶)列。要查看 Amazon S3 中存储桶的位置,请在 S3 bucket(S3 存储桶)列中选择存储桶的链接。这样,就可以将 Amazon S3 控制台打开到 CloudTrail 存储桶位置。
-
选择创建表。将使用包含 Amazon S3 存储桶名称的默认名称创建表。
使用手动分区在 Athena 中为 CloudTrail 日志创建表
可以在 Athena 控制台中为 CloudTrail 日志文件手动创建表,然后在 Athena 中运行查询。
使用 Athena 控制台为 CloudTrail 跟踪记录创建 Athena 表
-
将以下 DDL 语句复制并粘贴到 Athena 控制台查询编辑器中。
CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< arn:STRING, accountid:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
CloudTrail_bucket_name
/AWSLogs/Account_ID
/CloudTrail/';注意
建议使用示例中所示的
org.apache.hive.hcatalog.data.JsonSerDe
。虽然存在com.amazon.emr.hive.serde.CloudTrailSerde
,但目前还无法处理一些较新的 CloudTrail 字段。 -
(可选)删除表格中所有非必填字段。如果您只需要读取一组特定的列,则您的表定义可以排除其他列。
-
修改
s3://
以指向包含您的日志数据的 Amazon S3 存储桶。CloudTrail_bucket_name
/AWSLogs/Account_ID/
CloudTrail/ -
验证字段是否正确列出。有关 CloudTrail 记录中的完整字段列表的更多信息,请参阅 CloudTrail 记录内容。
下面的示例使用了 Hive JSON SerDe。在此示例中,字段
requestparameters
、responseelements
和additionaleventdata
作为查询中的STRING
类型列出,但在 JSON 中使用STRUCT
数据类型。因此,要将数据移出这些字段,请使用JSON_EXTRACT
函数。有关更多信息,请参阅 从 JSON 中提取数据。为了提高性能,此示例按 Amazon Web Services 区域、年份、月份和日期对数据进行分区。 -
在 Athena 控制台中运行
CREATE TABLE
语句。 -
使用 ALTER TABLE ADD PARTITION 命令加载分区,以便您可以查询它们,如以下示例所示。
ALTER TABLE
table_name
ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://cloudtrail_bucket_name
/AWSLogs/Account_ID
/CloudTrail/us-east-1/2019/02/01/
'
使用手动分区为整个组织范围的跟踪创建表
要在 Athena 中为整个组织的 CloudTrail 日志文件创建表,请按照 使用手动分区在 Athena 中为 CloudTrail 日志创建表 中的步骤操作,但需要按照以下过程中的说明进行修改。
为整个组织的 CloudTrail 日志记录创建 Athena 表
-
在
CREATE TABLE
语句中,修改LOCATION
子句以包含组织 ID,如下例所示:LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/' -
在
PARTITIONED BY
子句中,为账户 ID 添加一个字符串条目,如下例所示:PARTITIONED BY (account string, region string, year string, month string, day string)
以下示例显示的是综合结果:
... PARTITIONED BY (account string, region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/' -
ALTER TABLE
语句的ADD PARTITION
子句包含账户 ID,如下例所示:ALTER TABLE table_name ADD PARTITION (account='
111122223333
', region='us-east-1', year='2022', month='08', day='08') -
ALTER TABLE
语句的LOCATION
子句包含组织 ID、账户 ID 以及您要添加的分区,如下例所示:LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/us-east-1/2022/08/08/'以下示例
ALTER TABLE
语句显示的是综合结果:ALTER TABLE table_name ADD PARTITION (account='
111122223333
', region='us-east-1', year='2022', month='08', day='08') LOCATION 's3://cloudtrail_bucket_name
/AWSLogs/organization_id
/111122223333
/CloudTrail/us-east-1/2022/08/08/'
使用分区投影在 Athena 中为 CloudTrail 日志创建表
由于 CloudTrail 日志具有一个已知结构,您可以预先指定该结构的分区方案,因此可以使用 Athena 分区投影功能减少查询运行时间并自动管理分区。当添加新数据时,分区投影会自动添加新分区。这样就不必使用 ALTER TABLE ADD PARTITION
手动添加分区了。
以下示例 CREATE TABLE
语句会自动在 CloudTrail 日志上从指定日期开始到当前为单个 Amazon Web Services 区域 使用分区投影。在 LOCATION
和 storage.location.template
子句中,将存储桶
、account-id
和 aws-region
占位符替换为对应的相同值。对于 projection.timestamp.range
,将 2020
/01
/01
替换为要使用的开始日期。成功运行查询后,您可以查询表。您无需运行 ALTER TABLE ADD PARTITION
来加载分区。
CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
bucket
/AWSLogs/account-id
/CloudTrail/aws-region
' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020
/01
/01
,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket
/AWSLogs/account-id
/CloudTrail/aws-region
/${timestamp}')
更多有关分区投影的信息,请参阅 使用 Amazon Athena 分区投影。
查询嵌套字段
由于 userIdentity
和 resources
字段是嵌套的数据类型,查询这些内容需要特殊处理。
userIdentity
对象由嵌套 STRUCT
类型组成。可以使用点分隔字段以分隔待查询的字段,如下例所示:
SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10
resources
字段是一个 STRUCT
对象数组。对于这些数组,请使用 CROSS JOIN UNNEST
来取消嵌套数组,以便您可以查询其对象。
下面的示例将返回资源 ARN 以 example/datafile.txt
结尾的所有行。为了便于读取,replacearn:aws:s3:::
子字符串。
SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime
以下是 DeleteBucket
事件的示例查询。查询将从 resources
对象中提取存储桶的名称以及存储桶所属的账户 ID。
SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime
有关取消嵌套的更多信息,请参阅 筛选数组。
示例查询
以下示例显示从在 CloudTrail 事件日志创建的表,返回所有匿名(未签名)请求的查询部分。此查询选择 useridentity.accountid
匿名并且 useridentity.arn
未指定的那些请求:
SELECT * FROM
cloudtrail_logs
WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid = 'anonymous' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';
有关更多信息,请参阅 Amazon 大数据博客文章:使用 Amazon CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动
有关查询 CloudTrail 日志的提示
要浏览 CloudTrail 日志数据,请使用下列提示:
-
在查询日志之前,请验证您的日志表是否看似与使用手动分区在 Athena 中为 CloudTrail 日志创建表中的表一样。如果不是第一个表,请使用以下命令删除现有表:
DROP TABLE cloudtrail_logs
。 -
删除现有表后,重新创建它。有关更多信息,请参阅 使用手动分区在 Athena 中为 CloudTrail 日志创建表。
确认正确列出了 Athena 查询中的字段。有关 CloudTrail 记录中的完整字段列表的信息,请参阅 CloudTrail 记录内容。
如果您的查询包含 JSON 格式的字段,例如
STRUCT
,请从 JSON 中提取数据。有关更多信息,请参阅 从 JSON 中提取数据。关于针对 CloudTrail 表发布查询的一些建议如下:
-
首先查看哪些用户调用了哪些 API 操作以及来自哪些源 IP 地址。
-
将以下基本 SQL 查询用作您的模板。将查询粘贴到 Athena 控制台并运行它。
SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
-
修改查询以进一步探索您的数据。
-
为了提高性能,请包含
LIMIT
子句以返回指定的行子集。