适用于 Java 的 AWS 开发工具包
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

订阅 Amazon Kinesis 数据流

以下示例向您演示如何使用 subscribeToShard 方法检索和处理 Amazon Kinesis Data Streams 中的数据。Kinesis Data Streams 现在采用增强的扇出功能和低延迟 HTTP/2 数据检索 API,同时让开发人员能够更易于在同一 Kinesis 数据流上运行多个低延迟、高性能的应用程序。

设置

首先,创建一个异步 Kinesis 客户端和一个 SubscribeToShardRequest 对象。这些对象用于以下每个示例中以订阅 Kinesis 事件。

导入

import java.net.URI; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;

代码

KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId("shardId-000000000000") .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();

使用 Builder 接口

您可以使用 builder 方法来简化创建 SubscribeToShardResponseHandler 的过程。

使用生成器,您可以通过方法调用设置每个生命周期回调,而非实施完整的接口。

代码

private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onComplete(() -> System.out.println("All records stream successfully")) // Must supply some type of subscriber .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

要对发布者进行更多控制,您可以使用 publisherTransformer 方法自定义发布者。

代码

private static CompletableFuture<Void> responseHandlerBuilder_PublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100)) .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

请参阅 GitHub 上的完整示例

使用自定义响应处理程序

要完全控制订阅者和发布者,请实施 SubscribeToShardResponseHandler 接口。

在本示例中,您实施 onEventStream 方法,此方法允许您对发布者具有完全访问权限。此示例演示如何将发布者转换为事件记录以供订阅者打印。

代码

private static CompletableFuture<Void> responseHandlerBuilder_Classic(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() { @Override public void responseReceived(SubscribeToShardResponse response) { System.out.println("Receieved initial response"); } @Override public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { publisher // Filter to only SubscribeToShardEvents .filter(SubscribeToShardEvent.class) // Flat map into a publisher of just records .flatMapIterable(SubscribeToShardEvent::records) // Limit to 1000 total records .limit(1000) // Batch records into lists of 25 .buffer(25) // Print out each record batch .subscribe(batch -> System.out.println("Record Batch - " + batch)); } @Override public void complete() { System.out.println("All records stream successfully"); } @Override public void exceptionOccurred(Throwable throwable) { System.err.println("Error during stream - " + throwable.getMessage()); } }; return client.subscribeToShard(request, responseHandler); }

请参阅 GitHub 上的完整示例

使用 Visitor 接口

您可以使用 Visitor 对象订阅您有兴趣观看的特定事件。

代码

private static CompletableFuture<Void> responseHandlerBuilder_VisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor .builder() .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e)) .build(); SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); }

请参阅 GitHub 上的完整示例

使用自定义订阅者

您也可以实施您自己的自定义订阅者以订阅流。

此代码段显示了一个示例订阅者。

代码

private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> { private Subscription subscription; private AtomicInteger eventCount = new AtomicInteger(0); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) { System.out.println("Received event " + shardSubscriptionEventStream); if (eventCount.incrementAndGet() >= 100) { // You can cancel the subscription at any time if you wish to stop receiving events. subscription.cancel(); } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred while stream - " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Finished streaming all events"); } }

您可以将自定义订阅者传递到 subscribe 方法,这与预览示例类似。下面的代码段显示了此示例。

代码

private static CompletableFuture<Void> responseHandlerBuilder_Subscriber(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(MySubscriber::new) .build(); return client.subscribeToShard(request, responseHandler); }

请参阅 GitHub 上的完整示例

使用第三方库

您可以使用其他第三方库,而不是实现自定义订阅者。本示例演示了如何使用 RxJava 实现,但您可以使用实现反应式流接口的任何库。有关该库的更多信息,请参阅 Github 上的 RxJava 维基页面

要使用该库,请将其作为依赖项添加。如果您使用 Maven,示例将显示要使用的 POM 代码段。

POM 条目

<target>1.8</target> </configuration> </plugin> </plugins> </build>

导入

import java.net.URI; import java.util.concurrent.CompletableFuture; import io.reactivex.Flowable; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;

此示例在 onEventStream 生命周期方法中使用 RxJava。这样,您就对发布者具有完全访问权限,这可用于创建 Rx Flowable。

代码

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onEventStream(p -> Flowable.fromPublisher(p) .ofType(SubscribeToShardEvent.class) .flatMapIterable(SubscribeToShardEvent::records) .limit(1000) .buffer(25) .subscribe(e -> System.out.println("Record batch = " + e))) .build();

您也可以使用带有 publisherTransformer 发布者的 Flowable 方法。您必须使 Flowable 发布者适应 SdkPublisher,如以下示例所示。

代码

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100))) .build();

请参阅 GitHub 上的完整示例

更多信息