本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用异步编程
这些 Amazon SDK for Java 2.x 特点是支持非阻塞 I/O 的异步客户端,可在几个线程之间实现高并发性。但是,不能保证总的非阻塞 I/O。在某些情况下,异步客户端可能会执行阻塞调用,例如凭据检索、使用Amazon 签名版本 4 (Sigv4) 进行请求签名或端点发现。
同步方法会阻止执行您的线程,直到客户端接收到服务的响应。异步方法会立即返回,并控制调用的线程,而不必等待响应。
由于异步方法在收到响应之前返回,所以需要通过某种方法在响应准备就绪时接收响应。2.x 中 Amazon SDK for Java 返回CompletableFuture 对象中的异步客户端方法,允许您在响应准备就绪时访问响应。
使用异步客户端 APIs
异步客户端方法的签名与同步客户端方法的签名相同,但是异步方法返回的CompletableFutureCompletionException
。
你可以用来获得结果的一种方法是将一个whenComplete()
方法链接到方法调用CompletableFuture
返回的SDK方法上。该whenComplete()
方法接收结果或类型为 Throwable 对象的类型,CompletionException
具体取决于异步调用的完成方式。在结果返回whenComplete()
到调用代码之前,您可以向提供操作来处理或检查结果。
如果要返回SDK方法返回的对象以外的内容,请改用该handle()
方法。该handle()
方法采用的参数与相同whenComplete()
,但您可以处理结果并返回对象。
要等待异步链完成并检索完成结果,可以调用join()
方法。如果该Throwable
对象未在链中处理,则该join()
方法会抛出一个未选中的CompletionException
,用于包装原始异常。您可以使用访问原始例外CompletionException#getCause()
。您也可以调用该CompletableFuture#get()
方法来获取完成结果。但是,该get()
方法可能会抛出已检查的异常。
以下示例显示了如何使用 DynamoDB 异步listTables()
客户端的方法的两种变体。传递给的操作whenComplete()
仅记录成功的响应,而handle()
版本则提取表名列表并返回列表。在这两种情况下,如果异步链中生成错误,则会重新抛出错误,以便客户端代码有机会对其进行处理。
导入
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;
代码
使用异步方法处理流式传输
对于流式传输内容的异步方法,必须提供AsyncRequestBody
以下示例使用操作的 Amazon S3 异步形式将文件异步上传到。PutObject
导入
import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
代码
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }
以下示例使用GetObject
操作的异步形式从 Amazon S3 中获取文件。
导入
import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
代码
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }
配置高级异步选项
Amazon SDK for Java 2.x 使用 NettyExecutorService
落后的 Netty,以完成从HTTP客户端请求返回到 Netty 客户端的期货。这种抽象化可以减少在客服人员选择停止或休眠线程时,应用程序中断异步处理的风险。默认情况下,每个异步客户端都会根据处理器数量创建一个线程池,并管理 ExecutorService
队列中的任务。
在构建异步客户端ExecutorService
时,您可以指定具体的JDK实现。以下代码段创建了一个ExecutorService
具有固定线程数的。
代码
S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();
要优化性能,您可以管理自己的线程池执行器,并在配置客户端时将其包括在内。
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(
<custom_value>
), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();