使用异步编程 - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用异步编程

这些 Amazon SDK for Java 2.x 功能支持非阻塞 I/O 的异步客户端,可在几个线程之间实现高并发性。但是,不能保证总的非阻塞 I/O。在某些情况下,异步客户端可能会执行阻塞调用,例如凭据检索、使用Amazon 签名版本 4 (Sigv4) 进行请求签名或端点发现。

同步方法会阻止执行您的线程,直到客户端接收到服务的响应。异步方法会立即返回,并控制调用的线程,而不必等待响应。

由于异步方法在收到响应之前返回,所以需要通过某种方法在响应准备就绪时接收响应。2.x 中 Amazon SDK for Java 返回CompletableFuture 对象中的异步客户端方法,允许您在响应准备就绪时访问响应。

非流式操作

对于非流式操作,异步方法调用类似于同步方法。但是,中的异步方法会 Amazon SDK for Java 返回一个包含将来异步操作结果的CompletableFuture对象。

当结果可用时,使用要完成的操作调用 CompletableFuture whenComplete() 方法。CompletableFuture 实现 Future 接口,以便您还可以通过调用 get() 方法来获得响应对象。

以下是一个异步操作的示例,该操作调用一个 Amazon DynamoDB 函数来获取表列表,接收CompletableFuture可以容纳ListTablesResponse对象的。在调用 whenComplete() 时定义的操作仅在异步调用完成时完成。

导入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

代码

public class DynamoDBAsyncListTables { public static void main(String[] args) throws InterruptedException { // Create the DynamoDbAsyncClient object Region region = Region.US_EAST_1; DynamoDbAsyncClient client = DynamoDbAsyncClient.builder() .region(region) .build(); listTables(client); } public static void listTables(DynamoDbAsyncClient client) { CompletableFuture<ListTablesResponse> response = client.listTables(ListTablesRequest.builder() .build()); // Map the response to another CompletableFuture containing just the table names CompletableFuture<List<String>> tableNames = response.thenApply(ListTablesResponse::tableNames); // When future is complete (either successfully or in error) handle the response tableNames.whenComplete((tables, err) -> { try { if (tables != null) { tables.forEach(System.out::println); } else { // Handle error err.printStackTrace(); } } finally { // Lets the application shut down. Only close the client when you are completely done with it. client.close(); } }); tableNames.join(); } }

以下代码示例说明如何使用异步客户端从表中检索项目。调用getItem的方法, DynamoDbAsyncClient 然后向其传递一个包含所需项目的表名和主键值的GetItemRequest对象。这通常是您传递此操作所需数据的方式。在此示例中,请注意传递了一个字符串值。

导入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;

代码

public static void getItem(DynamoDbAsyncClient client, String tableName, String key, String keyVal) { HashMap<String, AttributeValue> keyToGet = new HashMap<String, AttributeValue>(); keyToGet.put(key, AttributeValue.builder() .s(keyVal).build()); try { // Create a GetItemRequest instance GetItemRequest request = GetItemRequest.builder() .key(keyToGet) .tableName(tableName) .build(); // Invoke the DynamoDbAsyncClient object's getItem java.util.Collection<AttributeValue> returnedItem = client.getItem(request).join().item().values(); // Convert Set to Map Map<String, AttributeValue> map = returnedItem.stream().collect(Collectors.toMap(AttributeValue::s, s->s)); Set<String> keys = map.keySet(); for (String sinKey : keys) { System.out.format("%s: %s\n", sinKey, map.get(sinKey).toString()); } } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); }

请参阅上的完整示例 GitHub。

流式操作

对于流媒体操作,您必须提供AsyncRequestBody以增量方式提供内容,或者提供AsyncResponseTransformer以接收和处理响应。

以下示例使用操作将文件 Amazon S3 异步上传到。PutObject

导入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

以下示例使用GetObject操作从 Amazon S3 异步获取文件。

导入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

代码

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

高级操作

Amazon SDK for Java 2.x 使用 Netty(一种异步事件驱动的网络应用程序框架)来处理 I/O 线程。 Amazon SDK for Java 2.x 在 Netty 之后创建 ExecutorService,以完成从 HTTP 客户端请求返回到 Netty 客户端的 future。这种抽象化可以减少在客服人员选择停止或休眠线程时,应用程序中断异步处理的风险。默认情况下,每个异步客户端都会根据处理器数量创建一个线程池,并管理 ExecutorService 队列中的任务。

创建异步客户端时,高级用户可在构建时使用以下选项指定其线程池大小。

代码

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

要优化性能,您可以管理自己的线程池执行程序,并在配置客户端时包括它。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();