Amazon API Gateway
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:在 API Gateway 中创建 REST API 作为 Amazon Kinesis 代理

此页面介绍了如何利用 AWS 类型的集成创建和配置 REST API 以访问 Kinesis。

注意

要将 API Gateway API 与 Kinesis 集成,您必须选择一个同时提供 API Gateway 和 Kinesis 服务的区域。有关区域可用性,请参阅区域和终端节点

为了进行说明,我们创建一个示例 API,以使客户端能够执行以下操作:

  1. 列出 Kinesis 中用户的可用流

  2. 创建、描述或删除指定流

  3. 从指定流读取数据记录或将数据记录写入指定流

为了完成上述任务,API 分别在各种资源上使用了多种方法来调用以下内容:

  1. Kinesis 中的 ListStreams 操作

  2. CreateStreamDescribeStreamDeleteStream 操作

  3. Kinesis 中的 GetRecordsPutRecords(包括 PutRecord)操作

具体来说,我们按如下所示构建 API:

  • 在 API 的 /streams 资源上使用 HTTP GET 方法,并将此方法与 Kinesis 中的 ListStreams 操作集成,以列出调用方账户中的流。

  • 在 API 的 /streams/{stream-name} 资源上使用 HTTP POST 方法,并将此方法与 Kinesis 中的 CreateStream 操作集成,以在调用方账户中创建指定流。

  • 在 API 的 /streams/{stream-name} 资源上使用 HTTP GET 方法,并将此方法与 Kinesis 中的 DescribeStream 操作集成,以描述调用方账户中的指定流。

  • 在 API 的 /streams/{stream-name} 资源上使用 HTTP DELETE 方法,并将此方法与 Kinesis 中的 DeleteStream 操作集成,以删除调用方账户中的流。

  • 在 API 的 /streams/{stream-name}/record 资源上使用 HTTP PUT 方法,并将此方法与 Kinesis 中的 PutRecord 操作集成。这使客户端能够向指定流添加一个数据记录。

  • 在 API 的 /streams/{stream-name}/records 资源上使用 HTTP PUT 方法,并将此方法与 Kinesis 中的 PutRecords 操作集成。这使客户端能够向指定流添加一个数据记录列表。

  • 在 API 的 /streams/{stream-name}/records 资源上使用 HTTP GET 方法,并将此方法与 Kinesis 中的 GetRecords 操作集成。这使客户端能够使用指定的分片迭代器在指定流中列出数据记录。分片迭代器指定了分片位置,可以从该位置开始按顺序读取数据记录。

  • 在 API 的 /streams/{stream-name}/sharditerator 资源上使用 HTTP GET 方法,并将此方法与 Kinesis 中的 GetShardIterator 操作集成。此辅助标记方法必须提供给 Kinesis 中的 ListStreams 操作。

您可以将此处提供的说明应用于其他 Kinesis 操作。有关 Kinesis 操作的完整列表,请参阅 Amazon Kinesis API 参考

您可以使用 API Gateway 导入 API 将示例 API 导入 API Gateway,而不是使用 API Gateway 控制台创建示例 API。有关如何使用 Import API 的信息,请参阅 将 REST API 导入 API Gateway 中

如果您没有 AWS 账户,请通过以下步骤创建一个账户。

注册 AWS

  1. 打开 http://www.amazonaws.cn/,然后选择 Create an AWS Account

  2. 按照屏幕上的说明进行操作。

为 API 创建 IAM 角色和策略以访问 Kinesis

要允许 API 调用 Kinesis 操作,您必须将适当的 IAM 策略附加到 IAM 角色。本部分介绍了如何验证和创建 (如果需要) 所需的 IAM 角色和策略。

要启用对 Kinesis 的只读访问权限,您可以使用 AmazonKinesisReadOnlyAccess 策略来允许调用 Kinesis 中的 Get*List*Describe* 操作。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:Get*", "kinesis:List*", "kinesis:Describe*" ], "Resource": "*" } ] }

此策略可作为 IAM 预配置的托管策略使用,其 ARN 为 arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess

要在 Kinesis 中启用读写操作,您可以使用 AmazonKinesisFullAccess 策略。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "kinesis:*", "Resource": "*" } ] }

此策略还可作为 IAM 预配置的托管策略使用。其 ARN 为 arn:aws:iam::aws:policy/AmazonKinesisFullAccess

决定使用哪种 IAM 策略后,请将其附加到新的或现有的 IAM 角色。请确保 API Gateway 控制服务 (apigateway.amazonaws.com) 是角色的可信实体,并且可以担任执行角色 (sts:AssumeRole)。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

如果您在 IAM 控制台中创建执行角色并选择 Amazon API Gateway 角色类型,此信任策略会自动附加。

记下执行角色的 ARN。您在创建 API 方法并设置其集成请求时,需要使用此 ARN。

开始创建 API 作为 Kinesis 代理

使用以下步骤在 API Gateway 控制台中创建 API。

创建 API 作为 Kinesis 的 AWS 服务代理

  1. 在 API Gateway 控制台中,选择创建 API

  2. 选择新 API

  3. API 名称中,键入 KinesisProxy。将其他字段保留默认值。

  4. 如果您愿意,可以在描述中键入描述。

  5. 选择 Create API (创建 API)

API 创建完成后,API Gateway 控制台将显示资源页面,该页面仅包含 API 的根 (/) 资源。

列出 Kinesis 中的流

Kinesis 支持使用以下 REST API 调用执行 ListStreams 操作:

POST /?Action=ListStreams HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> { ... }

在上述 REST API 请求中,已经在 Action 查询参数中指定了操作。或者,您可以在 X-Amz-Target 标头中指定操作:

POST / HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> X-Amz-Target: Kinesis_20131202.ListStreams { ... }

在本教程中,我们使用查询参数指定操作。

要在 API 中公开 Kinesis 操作,请将 /streams 资源添加到 API 的根中。然后,在此资源上设置 GET 方法,并将该方法与 Kinesis 的 ListStreams 操作集成。

以下过程介绍如何使用 API Gateway 控制台列出 Kinesis 流。

使用 API Gateway 控制台列出 Kinesis 流

  1. 选择 API 根资源。在操作中,选择 Create Resource (创建资源)

    Resource Name (资源名称) 中,键入 Streams,将 Resource Path (资源路径) 和其他字段保留默认设置,然后选择 Create Resource (创建资源)

  2. 选择 /Streams 资源。从操作中,选择 Create Method (创建方法),再从列表中选择 GET,然后选择对勾图标以完成方法创建。

    注意

    客户端调用的方法的 HTTP 动词可能不同于后端所需的集成的 HTTP 动词。我们在此处选择了 GET,因为直观来看,列出流是一个“读取”操作。

  3. 在方法的设置窗格中,选择 AWS 服务

    1. 对于 AWS 区域,选择一个区域(例如 us-east-1)。

    2. 对于 AWS 服务,选择 Kinesis

    3. AWS 子域留空。

    4. 对于 HTTP 方法,选择 POST

      注意

      我们在此处选择了 POST,因为 Kinesis 要求使用它来调用 ListStreams 操作。

    5. 对于 Action Type,选择 Use action name

    6. 对于 Action,键入 ListStreams

    7. 对于 Execution role (执行角色),键入执行角色的 ARN。

    8. 对于 Content Handling (内容处理),保留默认值 Passthrough (传递)

    9. 选择保存完成方法的初始设置。

    
                        为 Kinesis ListStreams 操作设置 Streams:GET 方法。
  4. 仍然在 Integration Request (集成请求) 窗格中,展开 HTTP Headers (HTTP 标头) 部分:

    1. 选择 Add header (添加标头)

    2. 名称列中,键入 Content-Type

    3. Mapped from (映射来源) 列中,键入 'application/x-amz-json-1.1'

    4. 选择对勾图标以保存设置。

    我们使用了请求参数映射将 Content-Type 标头设置为静态值 'application/x-amz-json-1.1',以告知 Kinesis 该输入是特定版本的 JSON。

  5. 展开 Body Mapping Templates (正文映射模板) 部分:

    1. 选择 Add mapping template (添加映射模板)

    2. 对于 Content-Type (内容-类型),键入 application/json

    3. 选择对勾图标以保存 Content-Type (内容-类型) 设置。在 Change passthrough behavior (更改传递行为) 中选择 Yes, secure this integration (是,锁定此集成)

    4. 在模板编辑器中键入 {}

    5. 选择保存按钮以保存映射模板。

    ListStreams 请求使用以下 JSON 格式的负载:

    { "ExclusiveStartStreamName": "string", "Limit": number }

    但这些属性为可选属性,为了使用默认值,我们在此选择了一个空的 JSON 负载。

    
                        为 Streams:GET 方法设置数据映射以调用 Kinesis ListStreams 操作。
  6. 在流传输资源上测试 GET 方法以调用 Kinesis 中的 ListStreams 操作:

    在 API Gateway 控制台中,选择资源窗格中的 /streams/GET 条目,选择测试调用选项,然后选择测试

    如果您已经在 Kinesis 中创建了两个分别名为“myStream”和“yourStream”的流,则成功的测试将返回一个包含以下负载的 200 OK 响应:

    { "HasMoreStreams": false, "StreamNames": [ "myStream", "yourStream" ] }

在 Kinesis 中创建、描述和删除流

在 Kinesis 中创建、描述和删除流分别需要作出以下 Kinesis REST API 请求:

POST /?Action=CreateStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardCount": number, "StreamName": "string" }
POST /?Action=DescribeStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ExclusiveStartShardId": "string", "Limit": number, "StreamName": "string" }
POST /?Action=DeleteStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "StreamName":"string" }

我们可以构建 API 来接受必需的输入作为方法请求的 JSON 负载,并将负载传递到集成请求。但是,为了提供更多方法与集成请求之间以及方法与集成响应之间的数据映射示例,我们创建 API 的方式稍有不同。

我们在待指定的 GET 资源上公开 POSTDeleteStream HTTP 方法。我们使用 {stream-name} 路径变量作为流传输资源的占位符,并将这些 API 方法分别与 Kinesis 的 DescribeStreamCreateStreamDeleteStream 操作集成。我们要求客户端传递其他输入数据作为标头、查询参数或有效负载的方法请求。我们提供了集成数据所需的映射模板来转换请求负载。

在流传输资源上配置和测试 GET 方法

  1. 使用之前创建的 {stream-name} 资源下的 /streams 路径变量创建一个子资源。

    
                        创建 {stream-name} 资源
  2. 向此资源中添加 POSTGETDELETE HTTP 动词。

    在此资源上创建方法后,API 的结构将如下所示:

    
                        在流传输资源上创建 POST、GET 和 DELETE 方法
  3. 如下所示,设置 GET /streams/{stream-name} 方法以调用 Kinesis 中的 POST /?Action=DescribeStream 操作。

    
                        设置 GET-on-Stream 方法以调用 Kinesis 中的 DescribeStream 操作
  4. 将以下 Content-Type 标头映射添加到集成请求:

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  5. 添加以下正文映射模板,以将数据从 GET /streams/{stream-name} 方法请求映射到 POST /?Action=DescribeStream 集成请求:

    { "StreamName": "$input.params('stream-name')" }

    此映射模板使用方法请求的 stream-name 路径参数值为 Kinesis 的 DescribeStream 操作生成所需的集成请求负载。

  6. 测试 GET /stream/{stream-name} 方法以调用 Kinesis 中的 DescribeStream 操作:

    在 API Gateway 控制台的资源窗格中选择 /streams/{stream-name}/GET,选择测试开始测试,在 stream-name路径字段中键入现有 Kinesis 流的名称,然后选择测试。如果测试成功,将返回一个 200 OK 响应,其所含负载与以下内容类似:

    { "StreamDescription": { "HasMoreShards": false, "RetentionPeriodHours": 24, "Shards": [ { "HashKeyRange": { "EndingHashKey": "68056473384187692692674921486353642290", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461454070523309915164834022007924120923395850242" }, "ShardId": "shardId-000000000000" }, ... { "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "272225893536750770770699685945414569164" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461543273504104037657400164881014714369419771970" }, "ShardId": "shardId-000000000004" } ], "StreamARN": "arn:aws:kinesis:us-east-1:12345678901:stream/myStream", "StreamName": "myStream", "StreamStatus": "ACTIVE" } }

    部署 API 后,您可以根据此 API 方法做出 REST 请求:

    GET https://your-api-id.execute-api.region.amazonaws.com/stage/streams/myStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z

在流传输资源上配置和测试 POST 方法

  1. 设置 POST /streams/{stream-name} 方法以调用 Kinesis 中的 POST /?Action=CreateStream 操作。如果您将 GET /streams/{stream-name} 操作替换成 DescribeStream,该任务将按照相同的过程设置 CreateStream 方法。

  2. 将以下 Content-Type 标头映射添加到集成请求:

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  3. 添加以下正文映射模板,以将数据从 POST /streams/{stream-name} 方法请求映射到 POST /?Action=CreateStream 集成请求:

    { "ShardCount": #if($input.path('$.ShardCount') == '') 5 #else $input.path('$.ShardCount') #end, "StreamName": "$input.params('stream-name')" }

    在上述映射模板中,如果客户端未在方法请求负载中指定值,我们会将 ShardCount 设为固定值 5。

  4. 测试 POST /streams/{stream-name} 方法以在 Kinesis 中创建指定流:

    在 API Gateway 控制台的资源窗格中选择 /streams/{stream-name}/POST,选择测试开始测试,在 stream-name路径中键入现有 Kinesis 流的名称,然后选择测试。如果测试成功,将返回一个不含数据的 200 OK 响应。

    部署 API 后,您也可以针对流传输资源上的 POST 方法发出 REST API 请求,以调用 Kinesis 中的 CreateStream 操作:

    POST https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z { "ShardCount": 5 }

在流传输资源上配置和测试 DELETE 方法

  1. DELETE /streams/{stream-name} 方法设置为与 Kinesis 中的 POST /?Action=DeleteStream 操作集成。如果您将 GET /streams/{stream-name} 操作替换成 DescribeStream,该任务将按照相同的过程设置 DeleteStream 方法。

  2. 将以下 Content-Type 标头映射添加到集成请求:

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  3. 添加以下正文映射模板,以将数据从 DELETE /streams/{stream-name} 方法请求映射到 POST /?Action=DeleteStream 的相应集成请求:

    { "StreamName": "$input.params('stream-name')" }

    此映射模板将使用客户端提供的 URL 路径名称 DELETE /streams/{stream-name}stream-name 操作生成所需的输入。

  4. 测试 DELETE 方法以删除 Kinesis 中的指定流:

    在 API Gateway 控制台的资源窗格中选择 /streams/{stream-name}/DELETE 方法节点,选择测试开始测试,在 stream-name路径中键入现有 Kinesis 流的名称,然后选择测试。如果测试成功,将返回一个不含数据的 200 OK 响应。

    部署 API 后,您也可以针对流传输资源上的 DELETE 方法发出以下 REST API 请求,以调用 Kinesis 中的 DeleteStream 操作:

    DELETE https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z {}

从 Kinesis 中的流获取记录并向其添加记录

在 Kinesis 中创建流后,您可以将数据记录添加到流中,也可以从流中读取数据。添加数据记录包括调用 Kinesis 中的 PutRecordsPutRecord 操作。前者可向流添加多条记录,而后者可向流添加一条记录。

POST /?Action=PutRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Records": [ { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string" } ], "StreamName": "string" }

或者

POST /?Action=PutRecord HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string", "SequenceNumberForOrdering": "string", "StreamName": "string" }

其中,StreamName 用于标识要添加记录的目标流。StreamNameDataPartitionKey 是必需的输入数据。在示例中,我们针对所有可选输入数据使用了默认值,并且不会在方法请求的输入中显式指定它们的值。

读取 Kinesis 中的数据相当于调用 GetRecords 操作:

POST /?Action=GetRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardIterator": "string", "Limit": number }

其中,我们要从中获取记录的源流在必需的 ShardIterator 值中进行指定,如以下获取分片迭代器的 Kinesis 操作中所示:

POST /?Action=GetShardIterator HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardId": "string", "ShardIteratorType": "string", "StartingSequenceNumber": "string", "StreamName": "string" }

对于 GetRecordsPutRecords 操作,我们在附加到指定流传输资源 (GET) 的 PUT 资源上分别使用了 /records/{stream-name} 方法。同样,我们在 PutRecord 资源上使用了 PUT 操作作为 /record 方法。

由于 GetRecords 操作将 ShardIterator 值 (该值通过调用 GetShardIterator 辅助标记操作获得) 作为输入,我们在 GET 资源 (ShardIterator) 上使用了 /sharditerator 辅助标记方法。

下图显示了创建方法后资源的 API 结构:


                为 API 创建 Records:GET|PUT|PUT|GET 方法。

以下四个过程介绍了如何设置每个方法,如何将数据从方法请求映射到集成请求,以及如何测试方法。

要设置并测试 PUT /streams/{stream-name}/record 方法以调用 Kinesis 中的 PutRecord,请执行以下操作:

  1. 设置 PUT 方法,如下所示:

    
                        设置 PUT 方法以调用 Kinesis 中的 PutRecord 操作。
  2. 添加以下请求参数映射,以将 Content-Type 标头设置为集成请求中与 AWS 兼容的 JSON 版本:

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  3. 添加以下正文映射模板,以将数据从 PUT /streams/{stream-name}/record 方法请求映射到 POST /?Action=PutRecord 的相应集成请求:

    { "StreamName": "$input.params('stream-name')", "Data": "$util.base64Encode($input.json('$.Data'))", "PartitionKey": "$input.path('$.PartitionKey')" }

    此映射模板假定方法请求负载采用的是以下格式:

    { "Data": "some data", "PartitionKey": "some key" }

    此数据可通过以下 JSON 架构建模:

    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecord proxy single-record payload", "type": "object", "properties": { "Data": { "type": "string" }, "PartitionKey": { "type": "string" } } }

    您可以创建一个模型以包含此架构,并使用该模型来帮助生成映射模板。但您也可以在不使用任何模型的情况下生成映射模板。

  4. 要测试 PUT /streams/{stream-name}/record 方法,请将 stream-name 路径变量设置为现有流的名称,提供所需格式的负载,然后提交方法请求。如果成功,将返回一个包含以下格式负载的 200 OK 响应:

    { "SequenceNumber": "49559409944537880850133345460169886593573102115167928386", "ShardId": "shardId-000000000004" }

要设置并测试 PUT /streams/{stream-name}/records 方法,以调用 Kinesis 中的 PutRecords,请执行以下操作:

  1. 设置 PUT /streams/{stream-name}/records 方法,如下所示:

    
                        为 Kinesis PutRecords 操作设置 PUT /streams/{stream-name}/records 方法。
  2. 添加以下请求参数映射,以将 Content-Type 标头设置为集成请求中与 AWS 兼容的 JSON 版本:

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  3. 添加以下正文映射模板,以将数据从 PUT /streams/{stream-name}/records 方法请求映射到 POST /?Action=PutRecords 的相应集成请求:

    { "StreamName": "$input.params('stream-name')", "Records": [ #foreach($elem in $input.path('$.records')) { "Data": "$util.base64Encode($elem.data)", "PartitionKey": "$elem.partition-key" }#if($foreach.hasNext),#end #end ] }

    此映射模板假设可以通过以下 JSON 架构为方法请求负载建模:

    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecords proxy payload data", "type": "object", "properties": { "records": { "type": "array", "items": { "type": "object", "properties": { "data": { "type": "string" }, "partition-key": { "type": "string" } } } } } }

    您可以创建一个模型以包含此架构,并使用该模型来帮助生成映射模板。但您也可以在不使用任何模型的情况下生成映射模板。

    在本教程中,我们使用了两种稍有不同的负载格式来说明 API 开发人员可以选择向客户端公开或隐藏后端数据格式。一种格式用于 PUT /streams/{stream-name}/records 方法 (上文)。另一种格式用于 PUT /streams/{stream-name}/record 方法 (上一过程)。在生产环境中,您应该将两种格式保持一致。

  4. 要测试 PUT /streams/{stream-name}/records 方法,请将 stream-name 路径变量设置为现有流,提供以下负载,并提交方法请求。

    { "records": [ { "data": "some data", "partition-key": "some key" }, { "data": "some other data", "partition-key": "some key" } ] }

    如果成功,将返回一个包含类似以下输出的负载的 200 OK 响应:

    { "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49559409944537880850133345460167468741933742152373764162", "ShardId": "shardId-000000000004" }, { "SequenceNumber": "49559409944537880850133345460168677667753356781548470338", "ShardId": "shardId-000000000004" } ] }

要设置并测试 GET /streams/{stream-name}/sharditerator 方法,以调用 Kinesis 中的 GetShardIterator,请执行以下操作:

GET /streams/{stream-name}/sharditerator 方法为辅助标记方法,用于在调用 GET /streams/{stream-name}/records 方法之前获得必需的分片迭代器。

  1. GET /streams/{stream-name}/sharditerator 方法设置集成,如下所示:

    
                        设置 GET/streams/{stream-name}/sharditerator 方法。
  2. GetShardIterator 操作需要输入 ShardId 值。要传递客户端提供的 ShardId 值,我们将向方法请求添加一个 shard-id 查询参数,如下所示:

    
                        向 GET-on-ShardIterator 方法请求添加 shard-id 查询参数。

    在以下正文映射模板中,我们将 shard-id 查询参数值设置为 JSON 负载的 ShardId 属性值,以作为 Kinesis 中 GetShardIterator 操作的输入。

  3. 将正文映射模板配置为使用方法请求的 ShardIdStreamName 参数生成 GetShardIterator 操作所需的输入 (shard-idstream-name)。此外,映射模板还需将 ShardIteratorType 设置为 TRIM_HORIZON,并作为默认值。

    { "ShardId": "$input.params('shard-id')", "ShardIteratorType": "TRIM_HORIZON", "StreamName": "$input.params('stream-name')" }
  4. 使用 API Gateway 控制台中的测试选项,输入现有流名称作为 stream-name 路径变量值,将 shard-id Query string (查询字符串) 设置为现有 ShardId 值(例如 shard-000000000004),然后选择测试

    成功的响应负载与以下输出类似:

    { "ShardIterator": "AAAAAAAAAAFYVN3VlFy..." }

    记下此 ShardIterator 值。您需要使用此值来从流中获取记录。

要配置并测试 GET /streams/{stream-name}/records 方法,以调用 Kinesis 中的 GetRecords 操作,请执行以下操作:

  1. 设置 GET /streams/{stream-name}/records 方法,如下所示:

    
                        设置 GET/streams/{stream-name}/records 方法。
  2. GetRecords 操作需要输入 ShardIterator 值。要传递客户端提供的 ShardIterator 值,我们将向方法请求添加一个 Shard-Iterator 标头参数,如下所示:

    
                        将 Shard-Iterator 标头参数添加到 GET-on-Records 方法请求。
  3. 设置以下映射模板,将 Shard-Iterator 标头参数值映射到 JSON 负载的 ShardIterator 属性值,以调用 Kinesis 中的 GetRecords 操作。

    { "ShardIterator": "$input.params('Shard-Iterator')" }
  4. 使用 API Gateway 控制台中的测试选项,键入现有流名称作为 stream-name 路径变量值,将 Shard-Iterator 标头设置为从 GET /streams/{stream-name}/sharditerator 方法的测试运行中获得的 ShardIterator 值(上文),然后选择测试

    成功的响应负载与以下输出类似:

    { "MillisBehindLatest": 0, "NextShardIterator": "AAAAAAAAAAF...", "Records": [ ... ] }