本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 CloudTrail Processing Library
CloudTrail Processing Library 是一个 Java 库,它提供了一种处理 Amazon CloudTrail 日志的简单方法。您提供与您的 CloudTrail SQS 队列有关的配置详细信息并编写代码来处理事件。CloudTrail Processing Library 将完成其余操作。它会轮询您的 Amazon SQS 队列,读取和分析队列消息,下载 CloudTrail 日志文件,分析日志文件中的事件,并将事件作为 Java 对象传递至您的代码。
CloudTrail Processing Library 具有高度的可扩展性和容错性。它可以并行处理日志文件,以便您可以根据需要处理任意数量的日志。它会处理与网络超时和无法访问的资源相关的网络故障。
以下主题介绍了如何使用 CloudTrail Processing Library 处理您的 Java 项目中的 CloudTrail 日志。
此库作为一个 Apache 许可的开源项目在 GitHub 上提供:https://github.com/aws/aws-cloudtrail-processing-library
最低要求
要使用 CloudTrail Processing Library,您必须拥有以下项:
处理 CloudTrail 日志
处理您的 Java 应用程序中的 CloudTrail 日志:
将 CloudTrail Processing Library 添加到您的项目
要使用 CloudTrail Processing Library,请将其添加到您的 Java 项目的类路径中。
将库添加到 Apache Ant 项目
将 CloudTrail Processing Library 添加到 Apache Ant 项目
-
从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:
-
从源构建 .jar 文件,如自述文件
中所述: mvn clean install -Dgpg.skip=true
-
将生成的 .jar 文件复制到您的项目中,并将它添加到您项目的
build.xml
文件中。例如:<classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>
将库添加到 Apache Maven 项目
CloudTrail Processing Library 可用于 Apache Mavenpom.xml
文件中编写单个依赖项。
将 CloudTrail Processing Library 添加到 Maven 项目
-
打开您的 Maven 项目的
pom.xml
文件,并添加如下依赖项:<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>
将库添加到 Eclipse 项目
将 CloudTrail Processing Library 添加到 Eclipse 项目
-
从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:
-
从源构建 .jar 文件,如自述文件
中所述: mvn clean install -Dgpg.skip=true
-
将生成的 aws-cloudtrail-processing-library-1.6.1.jar 复制到您项目中的目录(通常是
lib
)。 -
在 Eclipse Project Explorer 中右键单击您的项目名称,选择 Build Path,然后选择 Configure。
-
在 Java Build Path 窗口中,选择 Libraries 选项卡。
-
选择添加 JAR...,然后导航到您复制 aws-cloudtrail-processing-library-1.6.1.jar 的路径。
-
选择 OK 以完成将
.jar
添加到您的项目的过程。
将库添加到 IntelliJ 项目
将 CloudTrail Processing Library 添加到 IntelliJ 项目
-
从 GitHub 下载或克隆 CloudTrail Processing Library 源代码:
-
从源构建 .jar 文件,如自述文件
中所述: mvn clean install -Dgpg.skip=true
-
从 File 中,选择 Project Structure。
-
选择 Modules,然后选择 Dependencies。
-
选择 + JARS or Directories,然后转至您构建
aws-cloudtrail-processing-library-1.6.1.jar
的路径。 -
选择 Apply,然后选择 OK 以完成将
.jar
添加到您的项目的过程。
配置 CloudTrail Processing Library
您可以通过创建运行时加载的类路径属性文件,或通过创建 ClientConfiguration
对象并手动设置选项来配置 CloudTrail Processing Library。
提供属性文件
您可以编写向您的应用程序提供配置选项的类路径属性文件。以下示例文件显示了您可设置的选项:
# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false
以下参数为必需参数:
-
sqsUrl
– 提供从中提取您的 CloudTrail 通知的 URL。如果不指定此值,则AWSCloudTrailProcessingExecutor
会引发IllegalStateException
。 -
accessKey
– 您账户的唯一标识符,例如 AKIAIOSFODNN7EXAMPLE。 -
secretKey
– 您账户的唯一标识符,例如 wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY。
accessKey
和 secretKey
参数会将您的 Amazon 凭证提供给库,因此库可代表您访问 Amazon。
其他参数的默认值由库设置。有关更多信息,请参阅 Amazon CloudTrail Processing Library 参考。
创建 ClientConfiguration
您可以通过在 ClientConfiguration
对象上设置选项来提供针对 AWSCloudTrailProcessingExecutor
的选项,而不是在类路径属性中设置选项,如下面的示例所示:
ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);
实施事件处理器
要处理 CloudTrail 日志,您必须实施接收 CloudTrail 日志数据的 EventsProcessor
。以下是一个实施示例:
public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }
实施 EventsProcessor
时,您将实施 process()
回调,AWSCloudTrailProcessingExecutor
使用该回调向您发送 CloudTrail 事件。事件在 CloudTrailClientEvent
对象的列表中提供。
CloudTrailClientEvent
对象提供您可用来读取 CloudTrail 事件和传输信息的 CloudTrailEvent
和 CloudTrailEventMetadata
。
该简单示例会输出传递到 SampleEventsProcessor
的每个事件的事件信息。在您自己的实现中,您可以按照所需方式处理日志。只要 AWSCloudTrailProcessingExecutor
有事件要发送并仍在运行,它就会继续将事件发送至您的 EventsProcessor
。
实例化和运行处理执行程序
在编写 EventsProcessor
并设置 CloudTrail Processing Library 的配置值(在属性文件中设置或通过使用 ClientConfiguration
类设置) 后,您可以使用这些元素来初始化并使用 AWSCloudTrailProcessingExecutor
。
使用 AWSCloudTrailProcessingExecutor
处理 CloudTrail 事件
-
实例化
AWSCloudTrailProcessingExecutor.Builder
对象。Builder
的构造函数采用一个EventsProcessor
对象和一个类路径属性文件名。 -
调用
Builder
的build()
工厂方法,以配置并获取AWSCloudTrailProcessingExecutor
对象。 -
使用
AWSCloudTrailProcessingExecutor
的start()
和stop()
方法开始和结束 CloudTrail 事件处理。
public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }
高级主题
筛选要处理的事件
默认情况下,您的 Amazon SQS 队列的 S3 存储桶中的所有日志及其包含的所有事件均会发送到您的 EventsProcessor
。CloudTrail Processing Library 提供了可选接口,您可实施这些接口来筛选用于获取 CloudTrail 日志的源并筛选您想处理的事件。
SourceFilter
-
您可以实施
SourceFilter
接口,以选择是否要处理来自提供的源的日志。SourceFilter
声明单个调用方法filterSource()
,该方法接收一个CloudTrailSource
对象。要阻止源中的事件被处理,请从false
() 返回filterSource()
。CloudTrail Processing Library 会在对 Amazon SQS 队列上的日志执行库轮询后调用
filterSource()
方法。在库启动日志的事件筛选或处理之前会发生此事。以下是一个实施示例:
public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }
如果您未提供自己的
SourceFilter
,则会使用DefaultSourceFilter
,这样将允许处理所有源 (它总是返回true
)。 EventFilter
-
您可以实施
EventFilter
接口,以选择是否要将 CloudTrail 事件发送到EventsProcessor
。EventFilter
声明单个调用方法filterEvent()
,该方法接收一个CloudTrailEvent
对象。要阻止事件被处理,请从false
() 返回filterEvent()
。CloudTrail Processing Library 会在对 Amazon SQS 队列上的日志执行库轮询后和执行源筛选后调用
filterEvent()
方法。这在库开始处理日志的事件之前发生。请参阅下面的实施示例:
public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }
如果您未提供自己的
EventFilter
,则会使用DefaultEventFilter
,这样将允许处理所有事件 (它总是返回true
)。
处理数据事件
CloudTrail 处理数据事件时,无论是整数 (int
) 还是 float
(包含小数的数字),都会保留原始格式的数字。在数据事件的字段中包含整数的事件中,CloudTrail 历史上将这些数字处理为浮点数。目前,CloudTrail 通过保留原始格式来处理这些字段中的数字。
作为最佳实践,为了避免中断自动化,可以灵活地对待您用来处理或筛选 CloudTrail 数据事件的任何代码或自动化,并允许 int
和 float
格式的数字。为了获得最佳结果,请使用 1.4.0 或更高版本的 CloudTrail Processing Library。
以下示例代码段显示了 float
格式的数字 2.0
,用于数据事件的 ResponseParameters
数据块中的 desiredCount
参数。
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...
以下示例代码段显示了 int
格式的数字 2
,用于数据事件的 ResponseParameters
数据块中的 desiredCount
参数。
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...
报告进度
实施 ProgressReporter
接口以自定义 CloudTrail Processing Library 进度的报告。ProgressReporter
声明两种方法:reportStart()
和 reportEnd()
,在以下操作的开头和结尾将调用这两种方法:
-
轮询来自 Amazon SQS 的消息
-
分析来自 Amazon SQS 的消息
-
处理 CloudTrail 日志的 Amazon SQS 源
-
删除来自 Amazon SQS 的消息
-
下载 CloudTrail 日志文件
-
处理 CloudTrail 日志文件
这两种方法均会收到一个 ProgressStatus
对象,该对象包含有关已执行操作的信息。progressState
成员包含标识当前操作的 ProgressState
枚举的一个成员。此成员可以包含 progressInfo
成员中的其他信息。此外,您从 reportStart()
返回的任何对象都会传递给 reportEnd()
,以便您能够提供上下文信息,例如事件开始处理时的时间。
下面是一个实施示例,该示例提供了有关完成操作所需时间的信息:
public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }
如果您未实施自己的 ProgressReporter
,则将改用 DefaultExceptionHandler
(它会输出正在运行的状态的名称)。
处理错误
您可使用 ExceptionHandler
接口在日志处理期间发生异常时提供特殊处理。ExceptionHandler
声明单个调用方法 handleException()
,该方法将接收一个带有关于已发生异常的上下文的 ProcessingLibraryException
对象。
您可以使用传入的 ProcessingLibraryException
的 getStatus()
方法来查明发生异常时所执行的操作,并获取有关该操作的状态的附加信息。ProcessingLibraryException
派生自 Java 的标准 Exception
类,因此您也可以通过调用任一 Exception 方法来检索有关异常的信息。
请参阅下面的实施示例:
public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }
如果您未提供自己的 ExceptionHandler
,则改用 DefaultExceptionHandler
(它会输出标准错误消息)。
注意
如果 deleteMessageUponFailure
参数为 true
,则 CloudTrail Processing Library 不会区分一般异常与处理错误,并可能会删除队列消息。
-
例如,使用
SourceFilter
按时间戳筛选消息。 -
但是,您没有访问可接收 CloudTrail 日志文件的 S3 存储桶所需的权限。由于您没有所需的权限,因此会引发
AmazonServiceException
。CloudTrail Processing Library 将此信息包含在CallBackException
中。 -
DefaultExceptionHandler
会将其记录为错误,但不会确定根本原因,这个原因就是您没有所需的权限。CloudTrail Processing Library 会将其视为处理错误,并删除该消息,即使该消息包含有效的 CloudTrail 日志文件也是如此。
如果您要用 SourceFilter
来筛选消息,请验证您的 ExceptionHandler
是否可将服务异常与处理错误区分开。
其他资源
有关 CloudTrail Processing Library 的更多信息,请参阅以下内容:
-
CloudTrail Processing Library
GitHub 项目,包括演示如何实施 CloudTrail Processing Library 的示例 代码。