本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 CloudTrail 处理库
CloudTrail 处理库是一个 Java 库,它提供了一种简单的处理方法 Amazon CloudTrail 日志。您可以提供有关 CloudTrail SQS队列的配置详细信息并编写代码来处理事件。剩下的就交给 CloudTrail 处理库了。它会轮询您的 Amazon 队SQS列,读取和解析队列消息,下载 CloudTrail 日志文件,解析日志文件中的事件,并将事件作为 Java 对象传递到您的代码。
CloudTrail 处理库具有高度的可扩展性和容错能力。它可以并行处理日志文件,以便您可以根据需要处理任意数量的日志。它会处理与网络超时和无法访问的资源相关的网络故障。
以下主题向您展示如何使用 CloudTrail 处理库来处理 Java 项目中的 CloudTrail 日志。
该库是作为Apache许可的开源项目提供的,可在以下网址获得:。 GitHub https://github.com/aws/aws-cloudtrail-processing-library
最低要求
要使用 CloudTrail 处理库,必须具备以下条件:
处理 CloudTrail 日志
要在 Java 应用程序中处理 CloudTrail 日志,请执行以下操作:
将 CloudTrail 处理库添加到您的项目中
要使用 CloudTrail 处理库,请将其添加到 Java 项目的类路径中。
将库添加到 Apache Ant 项目
将 CloudTrail 处理库添加到 Apache Ant 项目中
-
从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:
-
从源代码生成.jar 文件,如以下所README
述: mvn clean install -Dgpg.skip=true
-
将生成的 .jar 文件复制到您的项目中,并将它添加到您项目的
build.xml
文件中。例如:<classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>
将库添加到 Apache Maven 项目
CloudTrail 处理库可用于 Apache Mavenpom.xml
文件中编写单个依赖项。
将 CloudTrail 处理库添加到 Maven 项目
-
打开您的 Maven 项目的
pom.xml
文件,并添加如下依赖项:<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>
将库添加到 Eclipse 项目
将 CloudTrail 处理库添加到 Eclipse 项目中
-
从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:
-
从源代码生成.jar 文件,如以下所README
述: mvn clean install -Dgpg.skip=true
-
将生成的 aws-cloudtrail-processing-library -1.6.1.jar 复制到项目中的某个目录中(通常)。
lib
-
在 Eclipse Project Explorer 中右键单击您的项目名称,选择 Build Path,然后选择 Configure。
-
在 Java Build Path 窗口中,选择 Libraries 选项卡。
-
选择添加JARs... 然后导航到你复制的路径 aws-cloudtrail-processing-library -1.6.1.jar。
-
选择 OK 以完成将
.jar
添加到您的项目的过程。
将库添加到 IntelliJ 项目
将 CloudTrail 处理库添加到 IntelliJ 项目中
-
从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:
-
从源代码生成.jar 文件,如以下所README
述: mvn clean install -Dgpg.skip=true
-
从 File 中,选择 Project Structure。
-
选择 Modules,然后选择 Dependencies。
-
选择 + JARS 或 “目录”,然后转到您构建的路径
aws-cloudtrail-processing-library-1.6.1.jar
。 -
选择 Apply,然后选择 OK 以完成将
.jar
添加到您的项目的过程。
配置 CloudTrail 处理库
您可以通过创建在运行时加载的类路径属性文件来配置 CloudTrail 处理库,也可以手动创建ClientConfiguration
对象并设置选项。
提供属性文件
您可以编写向您的应用程序提供配置选项的类路径属性文件。以下示例文件显示了您可设置的选项:
# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false
以下参数为必需参数:
-
sqsUrl
— 提供URL从中提取 CloudTrail通知的方法。如果不指定此值,则AWSCloudTrailProcessingExecutor
会引发IllegalStateException
。 -
accessKey
— 您的账户的唯一标识符,例如AKIAIOSFODNN7EXAMPLE。 -
secretKey
— 您的账户的唯一标识符,例如 wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY.
accessKey
和secretKey
参数为您提供 Amazon
库的凭证,以便图书馆可以访问 Amazon 代表你。
其他参数的默认值由库设置。有关更多信息,请参阅 。Amazon CloudTrail 处理库参考。
创建一个 ClientConfiguration
您可以通过在 ClientConfiguration
对象上设置选项来提供针对 AWSCloudTrailProcessingExecutor
的选项,而不是在类路径属性中设置选项,如下面的示例所示:
ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);
实施事件处理器
要处理 CloudTrail 日志,必须实现接收 CloudTrail 日志数据的。EventsProcessor
以下是一个实施示例:
public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }
在实现时EventsProcessor
,您需要实现AWSCloudTrailProcessingExecutor
用于向您发送 CloudTrail 事件的process()
回调。事件在 CloudTrailClientEvent
对象的列表中提供。
该CloudTrailClientEvent
对象提供了一个CloudTrailEvent
和CloudTrailEventMetadata
,你可以用它来读取 CloudTrail 事件和交付信息。
该简单示例会输出传递到 SampleEventsProcessor
的每个事件的事件信息。在您自己的实现中,您可以按照所需方式处理日志。只要 AWSCloudTrailProcessingExecutor
有事件要发送并仍在运行,它就会继续将事件发送至您的 EventsProcessor
。
实例化和运行处理执行程序
在您编写EventsProcessor
并设置 CloudTrail 处理库的配置值(在属性文件中或通过使用ClientConfiguration
类)之后,您可以使用这些元素来初始化和使用AWSCloudTrailProcessingExecutor
。
用于AWSCloudTrailProcessingExecutor
处理 CloudTrail 事件
-
实例化
AWSCloudTrailProcessingExecutor.Builder
对象。Builder
的构造函数采用一个EventsProcessor
对象和一个类路径属性文件名。 -
调用
Builder
的build()
工厂方法,以配置并获取AWSCloudTrailProcessingExecutor
对象。 -
使用
start()
和AWSCloudTrailProcessingExecutor
stop()
方法开始和结束 CloudTrail 事件处理。
public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }
高级主题
筛选要处理的事件
默认情况下,Amazon SQS 队列的 S3 存储桶中的所有日志及其包含的所有事件都将发送到您的EventsProcessor
。P CloudTrail rocessing Library 提供了可选接口,您可以实现这些接口来筛选用于获取 CloudTrail 日志的源并筛选您有兴趣处理的事件。
SourceFilter
-
您可以实施
SourceFilter
接口,以选择是否要处理来自提供的源的日志。SourceFilter
声明单个调用方法filterSource()
,该方法接收一个CloudTrailSource
对象。要阻止源中的事件被处理,请从false
() 返回filterSource()
。在库轮询 Amazon SQS 队列中的日志之后, CloudTrail 处理库会调用该
filterSource()
方法。在库启动日志的事件筛选或处理之前会发生此事。以下是一个实施示例:
public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }
如果您未提供自己的
SourceFilter
,则会使用DefaultSourceFilter
,这样将允许处理所有源 (它总是返回true
)。 EventFilter
-
您可以实现
EventFilter
接口来选择是否向您的发送 CloudTrail事件EventsProcessor
。EventFilter
声明一个接收CloudTrailEvent
对象的filterEvent()
单个回调方法。要阻止事件被处理,请从false
() 返回filterEvent()
。CloudTrail 处理库会在库轮询 Amazon SQS 队列中的日志以及筛选源代码之后调用该
filterEvent()
方法。这在库开始处理日志的事件之前发生。请参阅下面的实施示例:
public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }
如果您未提供自己的
EventFilter
,则会使用DefaultEventFilter
,这样将允许处理所有事件 (它总是返回true
)。
处理数据事件
CloudTrail 处理数据事件时,它会保留原始格式的数字,无论是整数 (int
) 还是float
(包含十进制的数字)。在数据事件字段中包含整数的事件中, CloudTrail 历史上会将这些数字作为浮点数进行处理。目前,通过保留这些字段的原始格式来 CloudTrail 处理这些字段中的数字。
作为最佳实践,为避免破坏自动化,请灵活使用任何用于处理或筛选 CloudTrail 数据事件的代码或自动化,并允许两者兼int
而有float
格式的数字。为获得最佳效果,请使用 CloudTrail 处理库 1.4.0 或更高版本。
以下示例代码段显示了 float
格式的数字 2.0
,用于数据事件的 ResponseParameters
数据块中的 desiredCount
参数。
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...
以下示例代码段显示了 int
格式的数字 2
,用于数据事件的 ResponseParameters
数据块中的 desiredCount
参数。
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...
报告进度
实现ProgressReporter
接口以自定义 CloudTrail 处理库进度报告。 ProgressReporter
声明了两个方法:reportStart()
和reportEnd()
,它们在以下操作的开头和结尾处被调用:
-
轮询来自亚马逊的消息 SQS
-
正在解析来自亚马逊的消息 SQS
-
正在处理 Amazon SQS 来源的 CloudTrail 日志
-
从亚马逊删除消息 SQS
-
正在下载 CloudTrail 日志文件
-
处理 CloudTrail 日志文件
这两种方法均会收到一个 ProgressStatus
对象,该对象包含有关已执行操作的信息。progressState
成员包含标识当前操作的 ProgressState
枚举的一个成员。此成员可以包含 progressInfo
成员中的其他信息。此外,您从 reportStart()
返回的任何对象都会传递给 reportEnd()
,以便您能够提供上下文信息,例如事件开始处理时的时间。
下面是一个实施示例,该示例提供了有关完成操作所需时间的信息:
public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }
如果您未实施自己的 ProgressReporter
,则将改用 DefaultExceptionHandler
(它会输出正在运行的状态的名称)。
处理错误
您可使用 ExceptionHandler
接口在日志处理期间发生异常时提供特殊处理。ExceptionHandler
声明单个调用方法 handleException()
,该方法将接收一个带有关于已发生异常的上下文的 ProcessingLibraryException
对象。
您可以使用传入的 ProcessingLibraryException
的 getStatus()
方法来查明发生异常时所执行的操作,并获取有关该操作的状态的附加信息。ProcessingLibraryException
派生自 Java 的标准 Exception
类,因此您也可以通过调用任一 Exception 方法来检索有关异常的信息。
请参阅下面的实施示例:
public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }
如果您未提供自己的 ExceptionHandler
,则改用 DefaultExceptionHandler
(它会输出标准错误消息)。
注意
如果deleteMessageUponFailure
参数为true
,则 CloudTrail 处理库不区分一般异常和处理错误,并且可能会删除队列消息。
-
例如,使用
SourceFilter
按时间戳筛选消息。 -
但是,您没有访问接收 CloudTrail 日志文件的 S3 存储桶所需的权限。由于您没有所需的权限,因此会引发
AmazonServiceException
。 CloudTrail 处理库将其封装在CallBackException
. -
DefaultExceptionHandler
会将其记录为错误,但不会确定根本原因,这个原因就是您没有所需的权限。 CloudTrail 处理库将此视为处理错误并删除该消息,即使该消息包含有效的 CloudTrail 日志文件也是如此。
如果您要用 SourceFilter
来筛选消息,请验证您的 ExceptionHandler
是否可将服务异常与处理错误区分开。
其他 资源
有关 CloudTrail 处理库的更多信息,请参阅以下内容:
-
CloudTrail 处理库
GitHub 项目,其中包括演示如何实现 CloudTrail 处理库应用程序的示例 代码。