Amazon Kinesis - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Kinesis

您可以将数据从 Kinesis Data Streams 发送到 Timestream LiveAnalytics ,以便使用 Apache Flink 托管服务的 Timestream 示例数据连接器。有关更多信息,请参阅 Apache Flink 的 适用于 Apache Flink 的亚马逊托管服务

使用 EventBridge 管道将 Kinesis 数据发送到 Timestream

你可以使用 Pip EventBridge es 将数据从 Kinesis 流发送到 Amazon Time LiveAnalytics stream 作为表格。

Pipes 旨在在支持的源和目标之间 point-to-point进行集成,并支持高级转换和扩展。在开发事件驱动型架构时,管道可减少对专业知识和集成代码的需求。要设置管道,请选择源、添加可选筛选、定义可选富集,然后为事件数据选择目标。

源向管道发送事件, EventBridge 管道过滤匹配的事件并将其路由到目标。

这种集成使您能够利用时间序列数据分析功能 Timestream的强大功能,同时简化您的数据摄取管道。

将 Pip EventBridge es 与配合使用 Timestream 具有以下好处:

  • 实时数据摄取:将数据从 Kinesis 直接流式传输到 Timestream LiveAnalytics,从而实现实时分析和监控。

  • 无缝集成:利用 EventBridge 管道管理数据流,无需复杂的自定义集成。

  • 增强的筛选和转换:在 Kinesis 记录存储之前对其进行筛选或转换 Timestream ,以满足您的特定数据处理要求。

  • 可扩展性:利用内置的并行和批处理功能,处理高吞吐量数据流,并确保高效的数据处理。

配置

要设置 Pi EventBridge pe 以将数据从 Kinesis 流式传输到 Timestream,请按照以下步骤操作:

  1. 创建 Kinesis 流

    确保您有活跃的 Kinesis 数据流,可用于摄取数据。

  2. 创建 Timestream 数据库和表

    设置存储数据的 Timestream 数据库和表。

  3. 配置 EventBridge 管道:

    • 来源:选择 Kinesis 流作为来源。

    • 目标:选择 Timestream 作为目标。

    • 批处理设置:定义批处理窗口及批处理大小,以优化数据处理并减少延迟。

重要

设置管道时,我们建议通过摄取少量记录以测试所有配置是否正确。请注意,成功创建管道并不能保证管道配置正确,也不能保证数据流畅无误。可能存在运行时错误,例如表不正确、动态路径参数不正确或应用映射后的 Timestream 记录无效,这些错误将在实际数据流过管道时被发现。

以下配置决定数据摄取的速率:

  • BatchSize:将发送到 Timestream 的批次的最大大小。 LiveAnalytics范围:0-100。建议将此值保持为 100,以获得最大吞吐量。

  • MaximumBatchingWindowInSeconds:在将批次发送到 Timestream 作为目标之前,等待填充 batchSize 的最长时间。 LiveAnalytics 根据传入事件的速率,此配置将决定摄取的延迟,建议将此值保持 Timestream 在 < 10 秒,以保持近乎实时地向其发送数据。

  • ParallelizationFactor:每个分片中要同时处理的批次数。建议使用最大值 10,以实现最大吞吐量和近乎实时的摄取。

    如果您的数据流被多个目标读取,请使用增强型扇出功能为管道提供专属消费者,从而实现高吞吐量。有关更多信息,请参阅Kinesis Data Streams 用户指南中的使用 Kinesis Data Streams API 开发增强型扇出消费者

注意

每个账户可实现的最大吞吐量受限于并发管道执行数

以下配置可防止数据丢失:

  • DeadLetterConfig:建议始终进行配置, DeadLetterConfig 以避免 LiveAnalytics 由于用户错误而无法将事件提取到 Timestream 时丢失任何数据。

使用以下配置设置优化管道性能,这有助于防止记录导致性能下降或堵塞。

  • MaximumRecordAgeInSeconds: 超过此日期的记录将不予处理,并将直接移至 DLQ。我们建议将此值设置为不高于目标 Timestream 表配置的内存存储保留期。

  • MaximumRetryAttempts:记录发送到 DeadLetterQueue之前重试记录的次数。建议将此项配置为 10。这应该能够帮助解决任何暂时性问题,对于持续存在的问题,记录将被移至直播的其余部分 DeadLetterQueue 并解除封锁。

  • OnPartialBatchItemFailure:对于支持部分批处理的来源,我们建议您启用此功能并将其配置为 AUTOMATIC_BISECT,以便在 DLQ 之前对失败的记录进行额外重试。 dropping/sending

配置示例

以下是如何配置 Pi EventBridge pe 以将数据从 Kinesis 流传输到表的 Timestream 示例:

例 IAM 的政策更新 Timestream
JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:WriteRecords" ], "Resource": [ "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table" ] }, { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints" ], "Resource": "*" } ] }
例 Kinesis 流配置
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream 目标配置
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }

事件转换

EventBridge 管道允许您在数据到达之前对其进行转换 Timestream。您可以定义转换规则来修改传入的 Kinesis 记录,例如更改字段名称。

假设您的 Kinesis 直播包含温度和湿度数据。在将这些字段插入之前,您可以使用 EventBridge 转换来重命名这些字段 Timestream。

最佳实践

批处理和缓冲

  • 配置批处理窗口和大小,以在写入延迟和处理效率之间取得平衡。

  • 使用批处理窗口在处理前积累足够的数据,从而减少频繁进行小规模批处理的开销。

并行处理

利用该ParallelizationFactor设置来提高并发度,特别是对于高吞吐量流。这确保每个分片的多批次数据能够同时进行处理。

数据转换

利用 Pip EventBridge es 的转换功能对记录进行筛选和增强,然后再将其存储到其中 Timestream。这有助于确保数据与分析要求保持一致。

安全性

  • 确保用于 Pip EventBridge es 的 IAM 角色具有读取 Kinesis 和写入所需的权限 Timestream。

  • 使用加密和访问控制措施,以保护传输中数据和静态数据。

调试失败

  • 自动禁用管道

    如果目标不存在或存在权限问题,管道将在约 2 小时后自动禁用

  • 限制

    管道具备自动后退并重试的能力,直至节流有所减弱。

  • 启用日志

    建议您启用错误级别的日志,并包含执行数据,以便更深入地分析失败原因。如果出现任何故障,这些日志将包含 request/response 发送/接收的发件人。 Timestream这有助于您了解相关的错误,并在修复错误后重新处理记录(如有需要)。

监控

建议您设置以下方面的警报,以检测数据流的任何问题:

  • 源中记录的最长存留时间

    • GetRecords.IteratorAgeMilliseconds

  • 管道中的故障指标

    • ExecutionFailed

    • TargetStageFailed

  • Timestream 写入 API 错误

    • UserErrors

有关其他监控指标,请参阅《EventBridge 用户指南》中的监控 EventBridge