使用异步编程 Amazon SDK for Java 2.x - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用异步编程 Amazon SDK for Java 2.x

这些 Amazon SDK for Java 2.x 功能具有非阻塞 I/O 支持的异步客户端,可在几个线程之间实现高并发性。但是,不能保证完全 I/O 不阻塞。在某些情况下,异步客户端可能会执行阻塞调用,例如凭据检索、使用Amazon 签名版本 4 (Sigv4) 进行请求签名或端点发现。

同步方法会阻止执行您的线程,直到客户端接收到服务的响应。异步方法会立即返回,并控制调用的线程,而不必等待响应。

由于异步方法在收到响应之前返回,所以需要通过某种方法在响应准备就绪时接收响应。2.x 中 适用于 Java 的 Amazon SDK 返回CompletableFuture 对象中的异步客户端方法,允许您在响应准备就绪时访问响应。

使用异步客户端 APIs

异步客户端方法的签名与同步客户端方法的签名相同,但是异步方法返回的CompletableFuture对象包含将来异步操作的结果。如果在执行 SDK 的异步方法时引发错误,则错误将引发为CompletionException

您可以用来获取结果的一种方法是将一个whenComplete()方法链接到 SDK 方法调用CompletableFuture返回的方法上。该whenComplete()方法接收结果或类型为 Throwable 对象的类型,CompletionException具体取决于异步调用的完成方式。在结果返回whenComplete()到调用代码之前,您可以向提供操作来处理或检查结果。

如果要返回 SDK 方法返回的对象以外的内容,请改用该handle()方法。该handle()方法采用的参数与相同whenComplete(),但您可以处理结果并返回对象。

要等待异步链完成并检索完成结果,可以调用join()方法。如果该Throwable对象未在链中处理,则该join()方法会抛出一个未选中的CompletionException,用于包装原始异常。您可以使用访问原始例外CompletionException#getCause()。您也可以调用该CompletableFuture#get()方法来获取完成结果。但是,该get()方法可能会抛出已检查的异常。

以下示例显示了如何使用 DynamoDB 异步listTables()客户端的方法的两种变体。传递给的操作whenComplete()仅记录成功的响应,而handle()版本则提取表名列表并返回列表。在这两种情况下,如果异步链中生成错误,则会重新抛出错误,以便客户端代码有机会对其进行处理。

导入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

代码

whenComplete() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. if (listTablesResponse.hasTableNames()){ System.out.println("Table exist in this region: " + region.id()); } } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .whenComplete((listTablesResponse, throwable) -> { if (listTablesResponse != null) { // Consume the response. System.out.println("The SDK's listTables method completed successfully."); } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); }
handle() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. tableNames.forEach(System.out::println); } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .handle((listTablesResponse, throwable) -> { if (listTablesResponse != null) { return listTablesResponse.tableNames(); // Return the list of table names. } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); } }

使用异步方法处理流式传输

对于流式传输内容的异步方法,必须提供AsyncRequestBody以增量方式提供内容,或者提供AsyncResponseTransformer用于接收和处理响应。

以下示例使用操作的 Amazon S3 异步形式将文件异步上传到。PutObject

导入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

以下示例使用GetObject操作的异步形式从 Amazon S3 中获取文件。

导入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

配置高级异步选项

适用于 Java 的 Amazon SDK 2.x 使用 Netty(一种异步事件驱动的网络应用程序框架)来处理线程。 I/O 适用于 Java 的 Amazon SDK 2.x 在 Netty 之后创建 ExecutorService,以完成从 HTTP 客户端请求返回到 Netty 客户端的 future。这种抽象化可以减少在客服人员选择停止或休眠线程时,应用程序中断异步处理的风险。默认情况下,每个异步客户端都会根据处理器数量创建一个线程池,并管理 ExecutorService 队列中的任务。

在构建异步客户端ExecutorService时,可以指定特定的 JDK 实现。以下代码段创建了一个ExecutorService具有固定线程数的。

代码

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

要优化性能,您可以管理自己的线程池执行器,并在配置客户端时将其包括在内。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();