通过 API 创建直播 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过 API 创建直播

使用以下步骤创建 Kinesis 数据流。

构建 Kinesis Data Streams 客户端

必须先构建客户端对象,然后才能使用 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

有关更多信息,请参阅中的 Kinesis Data Streams 区域和端点Amazon Web Services 一般参考

创建流

现在,您已经创建了 Kinesis Data Streams 客户端,您可以创建要使用的流,您可以使用 Kinesis Data Streams 控制台或以编程方式完成此操作。要以编程方式创建流,请实例化一个CreateStreamRequest对象并指定该流的名称以及(如果您想使用预配置模式)该流要使用的分片数量。

  • 按需

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
  • 已配置

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

流名称用于标识流。该名称的范围限于应用程序使用的Amazon帐户。它还受区域限制。也就是说,两个不同Amazon账户中的两个流可以有相同的名称,同一个Amazon账户中的两个直播可以有相同的名称,但两个不同区域中的两个流可以有相同的名称,但不能有同一个账户和同一区域中的两个流。

流的吞吐量是分片数量的函数;配置更大的吞吐量需要更多分片。分片越多,直播的Amazon费用也会增加。有关计算适合您的应用程序的分片数量的更多信息,请参阅选择数据流容量模式

在配置 createStreamRequest 对象之后,通过对客户端调用 createStream 方法来创建流。在调用 createStream 之后,应等待流达到 ACTIVE 状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream 方法。但是,如果流不存在,describeStream 将引发异常。因此,请将 describeStream 调用包括在 try/catch 块中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }