适用于 Java 的 AWS 开发工具包版本 2
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

异步编程

AWS SDK for Java 2.0 具有真正的非阻塞异步客户端,可以跨多个线程实现高并发度。AWS SDK for Java 1.11.x 具有异步客户端,该客户端是围绕线程池和阻塞同步客户端(未提供非阻塞 I/O 的所有优势)的包装程序。

同步方法会阻止执行您的线程,直到客户端接收到服务的响应。异步方法会立即返回,并控制调用的线程,而不必等待响应。

由于异步方法在收到响应之前返回,所以需要通过某种方法接收返回的响应。AWS SDK for Java 2.0 异步客户端方法将返回 CompletableFuture 对象,该对象可让您在响应准备就绪时访问响应。

非流式操作

对于非流式操作,异步方法调用类似于同步方法。但是,AWS SDK for Java 中的异步方法会返回 CompletableFuture 对象,其中包含之后 的异步操作的结果。

当结果可用时,使用要完成的操作调用 CompletableFuture whenComplete() 方法。CompletableFuture 实现 Future 接口,以便您还可以通过调用 get() 方法来获得响应对象。

以下示例演示一个调用 Amazon DynamoDB 函数以获取表列表的异步操作,该操作收到可包含 ListTablesResponse 对象的 CompletableFuture。在调用 whenComplete() 时定义的操作仅在异步调用完成时完成。

导入

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.utils.FunctionalUtils; import java.util.List; import java.util.concurrent.CompletableFuture;

代码

public class DynamoDBAsync { public static void main(String[] args) throws InterruptedException { // Creates a default async client with credentials and regions loaded from the environment DynamoDbAsyncClient client = DynamoDbAsyncClient.create(); CompletableFuture<ListTablesResponse> response = client.listTables(ListTablesRequest.builder() .build()); // Map the response to another CompletableFuture containing just the table names CompletableFuture<List<String>> tableNames = response.thenApply(ListTablesResponse::tableNames); // When future is complete (either successfully or in error) handle the response tableNames.whenComplete((tables, err) -> { try { if (tables != null) { tables.forEach(System.out::println); } else { // Handle error err.printStackTrace(); } } finally { // Lets the application shut down. Only close the client when you are completely done with it. client.close(); } }); tableNames.join(); } }

流式操作

对于流式操作,您必须提供 AsyncRequestBody 来以递增方式提供内容,或者提供 AsyncResponseTransformer 以接收和处理响应。

以下示例使用 PutObject 操作,将文件异步上传到 Amazon S3。

导入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.utils.FunctionalUtils; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

public class S3AsyncOps { private static final String BUCKET = "sample-bucket"; private static final String KEY = "testfile.in"; public static void main(String[] args) { S3AsyncClient client = S3AsyncClient.create(); CompletableFuture<PutObjectResponse> future = client.putObject( PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .build(), AsyncRequestBody.fromFile(Paths.get("myfile.in")) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("my response: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Lets the application shut down. Only close the client when you are completely done with it. client.close(); } }); future.join(); } }

以下示例使用 GetObject 操作,从 Amazon S3 异步获取文件。

导入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.utils.FunctionalUtils; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

public class S3AsyncStreamOps { private static final String BUCKET = "sample-bucket"; private static final String KEY = "testfile.out"; public static void main(String[] args) { S3AsyncClient client = S3AsyncClient.create(); final CompletableFuture<GetObjectResponse> futureGet = client.getObject( GetObjectRequest.builder() .bucket(BUCKET) .key(KEY) .build(), AsyncResponseTransformer.toFile(Paths.get("myfile.out"))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println(resp); } else { // Handle error err.printStackTrace(); } } finally { // Lets the application shut down. Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

高级操作

AWS SDK for Java 2.0使用 Netty 处理 I/O 线程,这是一种异步的事件驱动网络应用程序框架。AWS SDK for Java 2.0在 Netty 之后创建 ExecutorService,以完成从 HTTP 客户端请求返回到 Netty 客户端的 Futures。这种抽象化可以减少在客服人员选择停止或休眠线程时,应用程序中断异步处理的风险。默认情况下,为每个异步客户端生成 50 个线程,并在 ExecutorService 中的队列内管理。

创建异步客户端时,高级用户可在构建时使用以下选项指定其线程池大小。

代码

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

要优化性能,您可以管理自己的线程池执行程序,并在配置客户端时包括它。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10_000), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();

如果您偏向于完全不使用线程池,请使用 Runnable::run 来取代线程池执行程序的使用。

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Runnable::run ) ) .build();