本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon CLI 执行基本 Kinesis 数据流操作。
本节介绍如何通过从命令行对执行 Kinesis 本操作。Amazon CLI. 确保您熟悉Amazon Kinesis Data Streams 术语和概念中讨论的概念。
注意
在创建流后,将象征性地向您的账户收取 Kinesis Data Streams 使用费,因为 Kinesis Data Streams 没有获得Amazon免费套餐。当您完成本教程时,请删除Amazon用于停止产生费用的资源。有关更多信息,请参阅 第 4 步:清除。
第 1 步:创建 流
您的第一步是创建一个流并验证它是否已创建成功。使用以下命令创建一个名为“Foo”的流:
aws kinesis create-stream --stream-name Foo
接下来,发出以下命令以检查流的创建进度:
aws kinesis describe-stream-summary --stream-name Foo
您应获得类似于以下示例的输出:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
在此示例中,流的状态为 CREATING,这表示它还未完全做好使用准备。在几分钟后再次检查,您应看到类似于以下示例的输出:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
此输出包含您在本教程中无需关注的信息。目前,您需要重点关注的是 "StreamStatus": "ACTIVE"
(告知您流已做好使用准备)和有关您请求的单个分片的信息。您还可以通过使用 list-streams
命令验证您的新流是否存在,如下所示:
aws kinesis list-streams
输出:
{
"StreamNames": [
"Foo"
]
}
第 2 步:设置记录
既然您已经拥有活动的流,您便已做好放置一些数据的准备。在本教程中,您将使用最简单的命令 put-record
,该命令会将一个包含文本“testdata”的数据记录放入流中:
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
如果成功,此命令将生成类似于以下示例的输出:
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
恭喜,您刚刚已将数据添加到流!接下来您将了解如何从流中获取数据。
第 3 步:获取记录
GetShard迭代器
您需要先为您感兴趣的分片获取分片迭代器,然后才能从流中获取数据。分片迭代器表示使用者(在本例中为 get-record
命令)要从中读取数据的流和分片的位置。您将使用 get-shard-iterator
命令,如下所示:
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
请记住,aws kinesis
命令后面有一个 Kinesis Data Streams API,因此如果您对显示的任何参数感兴趣,都可以在GetShardIterator
API 参考主题。执行成功将产生与以下示例类似的输出(水平滚动可查看完整输出):
{
"ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
看起来像随机字符的长字符串就是分片迭代器(您的字符串将与此不同)。您需要将分片迭代器复制/粘贴到接下来显示的 get 命令中。分片迭代器的有效生命周期为 300 秒,应该足以让您将分片迭代器复制/粘贴到下一个命令中。请注意,在将分片迭代器粘贴到写一个命令之前,您需要从中删除所有换行符。如果您收到分片迭代器不再有效的错误消息,只需再次执行 get-shard-iterator
命令。
GetRecords
这些区域有:get-records
命令从流中获取数据,然后它解析为调用GetRecords
在 Kinesis Data Streams API 中。分片迭代器指定了分片中的一个位置,您希望从该位置开始按顺序读取数据记录。如果迭代器指向的分片中的部分没有可用的记录,GetRecords
将返回空白列表。请注意,可能需要进行多次调用才能到达分片中包含记录的部分。
在以下 get-records
命令示例中(水平滚动可查看完整命令):
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
如果您是从 Unix 类型的命令处理器(如 bash)运行本教程,则可以使用嵌套命令自动执行分片迭代器的获取,如下所示(水平滚动可查看完整命令):
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR
如果你是从支持的系统上运行本教程PowerShell,您可以使用如下所示的命令自动执行分片迭代器的获取(水平滚动可查看完整命令):
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
get-records
命令的成功结果将从您在获取分片迭代器时指定的分片的流中请求记录,如以下示例所示(水平滚动可查看完整输出):
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
请注意,get-records
在上面被描述为请求,这意味着即使您的流中有记录,您可能也会收到零个或零个以上的记录,并且任何返回的记录都无法表示当前您的流中的所有记录。这是完全正常的,并且生产代码只会以适当的时间间隔轮询流中的记录(此轮询速度因您的特定应用程序设计要求而异)。
在本教程的这一部分你可能会注意到你的记录的第一件事是数据似乎是垃圾 —— 而不是明文testdata
我们发送的。这归因于 put-record
使用 Base64 编码支持您发送二进制数据的方式。但是,Kinesis Data Streams 支持Amazon CLI不提供 Base64解码中因为 Base64 解码到打印到 stdout 的原始二进制内容可能会导致某些平台和终端上的不良行为和潜在的安全问题。如果您使用 Base64 解码程序(例如,https://www.base64decode.org/dGVzdGRhdGE=
进行手动解码,您将看到它实际上是 testdata
。这对本教程来说已足够,在实践中,Amazon CLI 很少用于使用数据,更多时候是用于监控流的状态和获取信息,如前面所示(describe-stream
和 list-streams
)。将来的教程将向您展示如何使用 Kinesis 客户端库 (KCL) 构建生产质量的使用者应用程序,KCL 将会为您处理 Base64。有关 KCL 的更多信息,请参阅使用 KCL 开发具有共享吞吐量的自定义使用者.
get-records
并非总是会返回在流/分片中指定的所有记录。当出现这种情况时,请使用最后一个结果中的 NextShardIterator
获取下一组记录。因此,如果更多数据正在被放入流中(正常情况下在生产应用程序中),您每次都可以使用 get-records
持续轮询数据。但是,如果您在 300 秒的分片迭代器生命周期内未使用下一个分片迭代器调用 get-records
,则会收到一条错误消息,并且需要使用 get-shard-iterator
命令来获取新的分片迭代器。
此输出中还提供了 MillisBehindLatest
,它是从流的末端响应 GetRecords 操作的毫秒数,指示使用者落后当前时间多远。零值指示正进行记录处理,此时没有新的记录要处理。在本教程中,如果您一边阅读教程一边操作,则可能会看到这个数值非常大。默认情况下,数据记录在流中保留 24 小时以等待您检索它们。此时间范围称为保留期,可以配置为最多 365 天。
请注意,一个成功的 get-records
结果总是有一个 NextShardIterator
,即使目前流中没有更多记录。这是一个假定创建器在任何给定时间内正在将更多记录放入流中的轮询模型。虽然您可编写自己的轮询例程,但如果您使用之前提到的 KCL 开发使用者应用程序,则系统将会为您执行此轮询。
如果您调用 get-records
,直到您正在提取的流和分片中没有更多记录,您将看到带有空白记录的输出,类似于以下示例(水平滚动可查看完整输出):
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
第 4 步:清除
最后,如前所述,您希望删除您的流以释放资源和避免您的账户产生意外费用。在实践中,每当您创建了不会使用的流时,请执行此操作,因为费用是按流量计算的,无论您是否在使用流放入和获取数据。清除命令很简单:
aws kinesis delete-stream --stream-name Foo
成功之后不会生成输出,因此您可能希望使用 describe-stream
来检查删除进度:
aws kinesis describe-stream-summary --stream-name Foo
如果您在执行删除命令后立即执行此命令,您可能会看到其中的输出部分类似于以下示例:
{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",
在流完全删除后,describe-stream
将生成“未找到”错误:
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation:
Stream Foo under account 123456789012 not found.