创建流 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建流

使用以下步骤创建 Kinesis 数据流。

构建 Kinesis Data Streams 客户端

必须先构建客户端对象,然后才能处理 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

有关更多信息,请参阅 。Kinesis Data Streams 区域和终端中的Amazon一般参考

创建流

现在您已创建 Kinesis Data Streams 客户端,接下来可创建要处理的流,您可使用 Kinesis 数据流控制台或以编程方式来完成此操作。要以编程方式创建流,请实例化 CreateStreamRequest 对象并指定流名称以及流要使用的分片数量。

CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

流名称用于标识流。此名称的使用范围限定在Amazon应用程序使用的帐户。它还受区域限制。也就是说,两个不同的流Amazon帐户可以具有相同的名称,并且Amazon账户中的相同账户,但位于两个不同区域的相同账户中的两个流可具有相同的名称,但不能具有相同的名称。

流的吞吐量是分片数量的函数;配置更大的吞吐量需要更多分片。更多的分片还会增加Amazon流的费用。有关计算适合您的应用程序的分片数量的更多信息,请参阅确定 Kinesis Data Streams 的初始大小

在配置 createStreamRequest 对象之后,通过对客户端调用 createStream 方法来创建流。在调用 createStream 之后,应等待流达到 ACTIVE 状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream 方法。但是,如果流不存在,describeStream 将引发异常。因此,请将 describeStream 调用包括在 try/catch 块中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }