使用 CloudTrail Processing Library - Amazon CloudTrail
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 CloudTrail Processing Library

CloudTrail Processing Library 是一个 Java 库,它提供了一种处理 Amazon CloudTrail 日志的简单方法。您提供与您的 CloudTrail SQS 队列有关的配置详细信息并编写代码来处理事件。CloudTrail Processing Library 将完成其余操作。它会轮询您的 Amazon SQS 队列,读取和分析队列消息,下载 CloudTrail 日志文件,分析日志文件中的事件,并将事件作为 Java 对象传递至您的代码。

CloudTrail Processing Library 具有高度的可扩展性和容错性。它可以并行处理日志文件,以便您可以根据需要处理任意数量的日志。它会处理与网络超时和无法访问的资源相关的网络故障。

以下主题介绍了如何使用 CloudTrail Processing Library 处理您的 Java 项目中的 CloudTrail 日志。

此库作为一个 Apache 许可的开源项目在 GitHub 上提供:https://github.com/aws/aws-cloudtrail-processing-library。库源包括可用作您自己的项目基础的示例代码。

最低要求

要使用 CloudTrail Processing Library,您必须拥有以下项:

处理 CloudTrail 日志

处理您的 Java 应用程序中的 CloudTrail 日志:

将 CloudTrail Processing Library 添加到您的项目

要使用 CloudTrail Processing Library,请将其添加到您的 Java 项目的类路径中。

将库添加到 Apache Ant 项目

将 CloudTrail Processing Library 添加到 Apache Ant 项目
  1. 从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:

  2. 从源构建 .jar 文件,如自述文件中所述:

    mvn clean install -Dgpg.skip=true
  3. 将生成的 .jar 文件复制到您的项目中,并将它添加到您项目的 build.xml 文件中。例如:

    <classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>

将库添加到 Apache Maven 项目

CloudTrail Processing Library 可用于 Apache Maven。您可以将其添加到您的项目,方法是:在项目的 pom.xml 文件中编写单个依赖项。

将 CloudTrail Processing Library 添加到 Maven 项目
  • 打开您的 Maven 项目的 pom.xml 文件,并添加如下依赖项:

    <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>

将库添加到 Eclipse 项目

将 CloudTrail Processing Library 添加到 Eclipse 项目
  1. 从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:

  2. 从源构建 .jar 文件,如自述文件中所述:

    mvn clean install -Dgpg.skip=true
  3. 将生成的 aws-cloudtrail-processing-library-1.6.1.jar 复制到您项目中的目录(通常是 lib)。

  4. 在 Eclipse Project Explorer 中右键单击您的项目名称,选择 Build Path,然后选择 Configure

  5. Java Build Path 窗口中,选择 Libraries 选项卡。

  6. 选择添加 JAR...,然后导航到您复制 aws-cloudtrail-processing-library-1.6.1.jar 的路径。

  7. 选择 OK 以完成将 .jar 添加到您的项目的过程。

将库添加到 IntelliJ 项目

将 CloudTrail Processing Library 添加到 IntelliJ 项目
  1. 从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:

  2. 从源构建 .jar 文件,如自述文件中所述:

    mvn clean install -Dgpg.skip=true
  3. File 中,选择 Project Structure

  4. 选择 Modules,然后选择 Dependencies

  5. 选择 + JARS or Directories,然后转至您构建 aws-cloudtrail-processing-library-1.6.1.jar 的路径。

  6. 选择 Apply,然后选择 OK 以完成将 .jar 添加到您的项目的过程。

配置 CloudTrail Processing Library

您可以通过创建运行时加载的类路径属性文件,或通过创建 ClientConfiguration 对象并手动设置选项来配置 CloudTrail Processing Library。

提供属性文件

您可以编写向您的应用程序提供配置选项的类路径属性文件。以下示例文件显示了您可设置的选项:

# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false

以下参数为必需参数:

  • sqsUrl – 提供从中提取您的 CloudTrail 通知的 URL。如果不指定此值,则 AWSCloudTrailProcessingExecutor 会引发 IllegalStateException

  • accessKey – 您账户的唯一标识符,例如 AKIAIOSFODNN7EXAMPLE。

  • secretKey – 您账户的唯一标识符,例如 wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY。

accessKeysecretKey 参数会将您的 Amazon 凭证提供给库,因此库可代表您访问 Amazon。

其他参数的默认值由库设置。有关更多信息,请参阅 Amazon CloudTrail Processing Library 参考

创建 ClientConfiguration

您可以通过在 ClientConfiguration 对象上设置选项来提供针对 AWSCloudTrailProcessingExecutor 的选项,而不是在类路径属性中设置选项,如下面的示例所示:

ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);

实施事件处理器

要处理 CloudTrail 日志,您必须实施接收 CloudTrail 日志数据的 EventsProcessor。以下是一个实施示例:

public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }

实施 EventsProcessor 时,您将实施 process() 回调,AWSCloudTrailProcessingExecutor 使用该回调向您发送 CloudTrail 事件。事件在 CloudTrailClientEvent 对象的列表中提供。

CloudTrailClientEvent 对象提供您可用来读取 CloudTrail 事件和传输信息的 CloudTrailEventCloudTrailEventMetadata

该简单示例会输出传递到 SampleEventsProcessor 的每个事件的事件信息。在您自己的实现中,您可以按照所需方式处理日志。只要 AWSCloudTrailProcessingExecutor 有事件要发送并仍在运行,它就会继续将事件发送至您的 EventsProcessor

实例化和运行处理执行程序

在编写 EventsProcessor 并设置 CloudTrail Processing Library 的配置值(在属性文件中设置或通过使用 ClientConfiguration 类设置) 后,您可以使用这些元素来初始化并使用 AWSCloudTrailProcessingExecutor

使用 AWSCloudTrailProcessingExecutor 处理 CloudTrail 事件
  1. 实例化 AWSCloudTrailProcessingExecutor.Builder 对象。 Builder 的构造函数采用一个 EventsProcessor 对象和一个类路径属性文件名。

  2. 调用 Builderbuild() 工厂方法,以配置并获取 AWSCloudTrailProcessingExecutor 对象。

  3. 使用 AWSCloudTrailProcessingExecutorstart()stop() 方法开始和结束 CloudTrail 事件处理。

public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }

高级主题

筛选要处理的事件

默认情况下,您的 Amazon SQS 队列的 S3 存储桶中的所有日志及其包含的所有事件均会发送到您的 EventsProcessor。CloudTrail Processing Library 提供了可选接口,您可实施这些接口来筛选用于获取 CloudTrail 日志的源并筛选您想处理的事件。

SourceFilter

您可以实施 SourceFilter 接口,以选择是否要处理来自提供的源的日志。SourceFilter 声明单个调用方法 filterSource(),该方法接收一个 CloudTrailSource 对象。要阻止源中的事件被处理,请从 false () 返回 filterSource()

CloudTrail Processing Library 会在对 Amazon SQS 队列上的日志执行库轮询后调用 filterSource() 方法。在库启动日志的事件筛选或处理之前会发生此事。

以下是一个实施示例:

public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }

如果您未提供自己的 SourceFilter,则会使用 DefaultSourceFilter,这样将允许处理所有源 (它总是返回 true)。

EventFilter

您可以实施 EventFilter 接口,以选择是否要将 CloudTrail 事件发送到 EventsProcessorEventFilter 声明单个调用方法 filterEvent(),该方法接收一个 CloudTrailEvent 对象。要阻止事件被处理,请从 false () 返回 filterEvent()

CloudTrail Processing Library 会在对 Amazon SQS 队列上的日志执行库轮询后和执行源筛选后调用 filterEvent() 方法。这在库开始处理日志的事件之前发生。

请参阅下面的实施示例:

public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }

如果您未提供自己的 EventFilter,则会使用 DefaultEventFilter,这样将允许处理所有事件 (它总是返回 true)。

处理数据事件

CloudTrail 处理数据事件时,无论是整数 (int) 还是 float(包含小数的数字),都会保留原始格式的数字。在数据事件的字段中包含整数的事件中,CloudTrail 历史上将这些数字处理为浮点数。目前,CloudTrail 通过保留原始格式来处理这些字段中的数字。

作为最佳实践,为了避免中断自动化,可以灵活地对待您用来处理或筛选 CloudTrail 数据事件的任何代码或自动化,并允许 intfloat 格式的数字。为了获得最佳结果,请使用 1.4.0 或更高版本的 CloudTrail Processing Library。

以下示例代码段显示了 float 格式的数字 2.0,用于数据事件的 ResponseParameters数据块中的 desiredCount 参数。

"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...

以下示例代码段显示了 int 格式的数字 2,用于数据事件的 ResponseParameters数据块中的 desiredCount 参数。

"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...

报告进度

实施 ProgressReporter 接口以自定义 CloudTrail Processing Library 进度的报告。ProgressReporter 声明两种方法:reportStart()reportEnd(),在以下操作的开头和结尾将调用这两种方法:

  • 轮询来自 Amazon SQS 的消息

  • 分析来自 Amazon SQS 的消息

  • 处理 CloudTrail 日志的 Amazon SQS 源

  • 删除来自 Amazon SQS 的消息

  • 下载 CloudTrail 日志文件

  • 处理 CloudTrail 日志文件

这两种方法均会收到一个 ProgressStatus 对象,该对象包含有关已执行操作的信息。progressState 成员包含标识当前操作的 ProgressState 枚举的一个成员。此成员可以包含 progressInfo 成员中的其他信息。此外,您从 reportStart() 返回的任何对象都会传递给 reportEnd(),以便您能够提供上下文信息,例如事件开始处理时的时间。

下面是一个实施示例,该示例提供了有关完成操作所需时间的信息:

public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }

如果您未实施自己的 ProgressReporter,则将改用 DefaultExceptionHandler (它会输出正在运行的状态的名称)。

处理错误

您可使用 ExceptionHandler 接口在日志处理期间发生异常时提供特殊处理。ExceptionHandler 声明单个调用方法 handleException(),该方法将接收一个带有关于已发生异常的上下文的 ProcessingLibraryException 对象。

您可以使用传入的 ProcessingLibraryExceptiongetStatus() 方法来查明发生异常时所执行的操作,并获取有关该操作的状态的附加信息。ProcessingLibraryException 派生自 Java 的标准 Exception 类,因此您也可以通过调用任一 Exception 方法来检索有关异常的信息。

请参阅下面的实施示例:

public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }

如果您未提供自己的 ExceptionHandler,则改用 DefaultExceptionHandler (它会输出标准错误消息)。

注意

如果 deleteMessageUponFailure 参数为 true,则 CloudTrail Processing Library 不会区分一般异常与处理错误,并可能会删除队列消息。

  1. 例如,使用 SourceFilter 按时间戳筛选消息。

  2. 但是,您没有访问可接收 CloudTrail 日志文件的 S3 存储桶所需的权限。由于您没有所需的权限,因此会引发 AmazonServiceException。CloudTrail Processing Library 将此信息包含在 CallBackException 中。

  3. DefaultExceptionHandler 会将其记录为错误,但不会确定根本原因,这个原因就是您没有所需的权限。CloudTrail Processing Library 会将其视为处理错误,并删除该消息,即使该消息包含有效的 CloudTrail 日志文件也是如此。

如果您要用 SourceFilter 来筛选消息,请验证您的 ExceptionHandler 是否可将服务异常与处理错误区分开。

其他资源

有关 CloudTrail Processing Library 的更多信息,请参阅以下内容: