Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

创建流

请使用以下步骤创建您的 Kinesis stream。

构建 Kinesis Data Streams 客户端

您必须先构建一个客户端对象,然后才能处理 Kinesis stream。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

有关更多信息,请参阅 AWS General Reference 中的 Kinesis Data Streams 区域和终端节点

创建流

现在您已创建 Kinesis Data Streams 客户端,接下来可创建要处理的流。可通过 Kinesis Data Streams 控制台或以编程方式来创建流。要以编程方式创建流,请实例化 CreateStreamRequest 对象并指定流名称以及流要使用的分片数量。

CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

流名称用于标识流。此名称的使用范围限定在应用程序使用的 AWS 账户内。它还受区域限制。也就是说,两个不同 AWS 账户中的两个流可具有相同的名称,不同区域的相同 AWS 账户中的两个流可具有相同的名称,但相同区域的相同账户中的两个流不能具有相同的名称。

流的吞吐量是分片数量的函数;配置更大的吞吐量需要更多分片。更多的分片还会增加 AWS 向流收取的费用。有关计算您的应用程序适当的分片数量的更多信息,请参阅 确定 Kinesis Stream 的初始大小

在配置 createStreamRequest 对象之后,通过对客户端调用 createStream 方法来创建流。在调用 createStream 之后,应等待流达到 ACTIVE 状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream 方法。但是,如果流不存在,describeStream 将引发异常。因此,请将 describeStream 调用包括在 try/catch 块中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }