Amazon Kinesis Data Streams
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

执行基本流操作

本节介绍如何通过 AWS CLI 从命令行对 Kinesis data stream 执行基本操作。确保您熟悉 Kinesis Data Streams 主要概念教程:使用 Amazon Kinesis Data Streams 可视化 Web 流量中讨论的概念。

注意

在创建流后,将象征性地向您的账户收取 Kinesis Data Streams 使用费,因为 Kinesis Data Streams 没有获得 AWS 免费套餐的资格。当您完成本教程时,请删除 AWS 资源以停止产生费用。有关更多信息,请参阅 步骤 4:清除

第 1 步:创建流

您的第一步是创建一个流并验证它是否已创建成功。使用以下命令创建一个名为“Foo”的流:

aws kinesis create-stream --stream-name Foo --shard-count 1

参数 --shard-count 是必需的,并且在本教程的这一部分中,您将在您的流中使用一个分片。接下来,发出以下命令以检查流的创建进度:

aws kinesis describe-stream --stream-name Foo

您应获得类似于以下示例的输出:

{ "StreamDescription": { "StreamStatus": "CREATING", "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo", "Shards": [] } }

在此示例中,流的状态为 CREATING,这表示它还未完全做好使用准备。在几分钟后再次检查,您应看到类似于以下示例的输出:

{ "StreamDescription": { "StreamStatus": "ACTIVE", "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo", "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "EndingHashKey": "170141183460469231731687303715884105727", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49546986683135544286507457935754639466300920667981217794" } } ] } }

此输出包含您在本教程中无需关注的信息。目前,您需要重点关注的是 "StreamStatus": "ACTIVE"(告知您流已做好使用准备)和有关您请求的单个分片的信息。您还可以通过使用 list-streams 命令验证您的新流是否存在,如下所示:

aws kinesis list-streams

输出:

{ "StreamNames": [ "Foo" ] }

步骤 2:放置记录

既然您已经拥有活动的流,您便已做好放置一些数据的准备。在本教程中,您将使用最简单的命令 put-record,该命令会将一个包含文本“testdata”的数据记录放入流中:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

如果成功,此命令将生成类似于以下示例的输出:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

恭喜,您刚刚已将数据添加到流!接下来您将了解如何从流中获取数据。

步骤 3:获取记录

您需要先为您感兴趣的分片获取分片迭代器,然后才能从流中获取数据。分片迭代器表示使用者(在本例中为 get-record 命令)要从中读取数据的流和分片的位置。您将使用 get-shard-iterator 命令,如下所示:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

请记住,aws kinesis 命令后面有一个 Kinesis Data Streams API,因此如果您对显示的任何参数感兴趣,都可以在 GetShardIterator API 参考主题中阅读有关它们的信息。执行成功将产生与以下示例类似的输出(水平滚动可查看完整输出):

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

看起来像随机字符的长字符串就是分片迭代器(您的字符串将与此不同)。您需要将分片迭代器复制/粘贴到接下来显示的 get 命令中。分片迭代器的有效生命周期为 300 秒,应该足以让您将分片迭代器复制/粘贴到下一个命令中。请注意,在将分片迭代器粘贴到写一个命令之前,您需要从中删除所有换行符。如果您收到分片迭代器不再有效的错误消息,只需再次执行 get-shard-iterator 命令。

get-records 命令从流中获取数据,并解析为对 Kinesis Data Streams API 中的 GetRecords 的调用。分片迭代器指定了分片中的一个位置,您希望从该位置开始按顺序读取数据记录。如果迭代器指向的分片中的部分没有可用的记录,GetRecords 将返回空白列表。请注意,可能需要进行多次调用才能到达分片中包含记录的部分。

在以下 get-records 命令示例中(水平滚动可查看完整命令):

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

如果您是从 Unix 类型的命令处理器(如 bash)运行本教程,则可以使用嵌套命令自动执行分片迭代器的获取,如下所示(水平滚动可查看完整命令):

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

如果您从支持 PowerShell 的系统运行此教程,则可以使用如下所示的命令自动获取分片迭代器 (水平滚动可查看完整命令):

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records 命令的成功结果将从您在获取分片迭代器时指定的分片的流中请求记录,如以下示例所示(水平滚动可查看完整输出):

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

请注意,get-records 在上面被描述为请求,这意味着即使您的流中有记录,您可能也会收到零个或零个以上的记录,并且任何返回的记录都无法表示当前您的流中的所有记录。这是完全正常的,并且生产代码只会以适当的时间间隔轮询流中的记录(此轮询速度因您的特定应用程序设计要求而异)。

在本教程的这一部分,您可能首先注意到的与您的记录的有关的事情是:数据似乎是垃圾代码;它不是我们发送的明文 testdata。这归因于 put-record 使用 Base64 编码支持您发送二进制数据的方式。但是,AWS CLI 中的 Kinesis Data Streams 支持未提供 Base64 解码,因为对输出到 stdout 的原始二进制内容的 Base64 解码在某些平台和终端上可能会导致非预期的行为和潜在的安全问题。如果您使用 Base64 解码程序(例如,https://www.base64decode.org/)对 dGVzdGRhdGE= 进行手动解码,您将看到它实际上是 testdata。这对本教程来说已足够,在实践中,AWS CLI 很少用于使用数据,更多时候是用于监控流的状态和获取信息,如前面所示(describe-streamlist-streams)。将来的教程将向您展示如何使用 Kinesis 客户端库 (KCL) 构建生产质量的使用者应用程序,KCL 将会为您处理 Base64。有关 KCL 的更多信息,请参阅使用 Kinesis Client Library 1.x 开发使用者

get-records 并非总是会返回在流/分片中指定的所有记录。当出现这种情况时,请使用最后一个结果中的 NextShardIterator 获取下一组记录。因此,如果更多数据正在被放入流中(正常情况下在生产应用程序中),您每次都可以使用 get-records 持续轮询数据。但是,如果您在 300 秒的分片迭代器生命周期内未使用下一个分片迭代器调用 get-records,则会收到一条错误消息,并且需要使用 get-shard-iterator 命令来获取新的分片迭代器。

此输出中还提供了 MillisBehindLatest,它是从流的末端响应 GetRecords 操作的毫秒数,指示使用者落后当前时间多远。零值指示正进行记录处理,此时没有新的记录要处理。在本教程中,如果您一边阅读教程一边操作,则可能会看到这个数值非常大。这不是问题,数据记录将会在流中保留 24 小时以等待您进行检索。此时间范围称为保留期,可以配置为最多 168 小时(7 天)。

请注意,一个成功的 get-records 结果总是有一个 NextShardIterator,即使目前流中没有更多记录。这是一个假定创建器在任何给定时间内正在将更多记录放入流中的轮询模型。虽然您可编写自己的轮询例程,但如果您使用之前提到的 KCL 开发使用者应用程序,则系统将会为您执行此轮询。

如果您调用 get-records,直到您正在提取的流和分片中没有更多记录,您将看到带有空白记录的输出,类似于以下示例(水平滚动可查看完整输出):

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

步骤 4:清除

最后,如前所述,您希望删除您的流以释放资源和避免您的账户产生意外费用。在实践中,每当您创建了不会使用的流时,请执行此操作,因为费用是按流量计算的,无论您是否在使用流放入和获取数据。清除命令很简单:

aws kinesis delete-stream --stream-name Foo

成功之后不会生成输出,因此您可能希望使用 describe-stream 来检查删除进度:

aws kinesis describe-stream --stream-name Foo

如果您在执行删除命令后立即执行此命令,您可能会看到类似于以下示例的输出:

{ "StreamDescription": { "StreamStatus": "DELETING", "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo", "Shards": [] } }

在流完全删除后,describe-stream 将生成“未找到”错误:

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: Stream Foo under account 112233445566 not found.