将数据从 Amazon S3 摄取到 Timestream 以实现 InfluxDB 自动化 - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

从2025年6月20日起,亚马逊Timestream版 LiveAnalytics 将不再向新客户开放。如果您想使用亚马逊 Timestream LiveAnalytics,请在该日期之前注册。现有客户可以继续照常使用该服务。有关更多信息,请参阅 Amazon Timestream 以了解 LiveAnalytics 可用性变更。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据从 Amazon S3 摄取到 Timestream 以实现 InfluxDB 自动化

在 Timestream for e LiveAnalytics xport 工具完成卸载过程后,自动化过程的下一步就开始了。这种自动化使用 InfluxDB的导入工具将数据传输到其专门的时间序列结构中。该过程转换了 Timestream 的数据模型,使其与 InfluxDB 的测量、标签和字段概念相匹配。最后,它使用 InfluxDB的线路协议有效地加载数据。

完成迁移的工作流程分为四个阶段:

  1. 使用 Timestream LiveAnalytics 导出工具卸载数据。

  2. 数据转换:使用 Amazon Athena 将 LiveAnalytics 数据的 Timestream 转换为 InfluxDB 线路协议格式(基于基数评估后定义的架构)。

  3. 数据提取:将线路协议数据集提取到 Influ xDB 实例的 Timestream 中。

  4. 验证:(可选)您可以验证是否已提取每个线路协议点(--add-validation-field true在数据转换步骤中需要)。

数据转换

为了进行数据转换,我们开发了一个脚本,使用Amazon Athena将 LiveAnalytics 导出的数据镶木地板格式的时间流转换为InfluxDB的线路协议格式。Amazon Athena 提供无服务器查询服务和一种经济实惠的方式,无需专用的计算资源即可转换大量时间序列数据。

脚本执行以下操作:

  • 将来自亚马逊 S3 存储桶 LiveAnalytics 的数据导出的 Timestream 加载到亚马逊 Athena 表中。

  • 执行数据映射,将存储在 Athena 表中的数据转换为线路协议,并将其存储在 S3 存储桶中。

数据映射

下表显示了如何将 LiveAnalytics 数据的 Timestream 映射到线路协议数据。

概念的时间 LiveAnalytics 流 线路协议概念

表名

测量

Dimensions

标签

度量名称

标签(可选)

措施

字段

时间

Timestamp

先决条件和安装

请参阅转换脚本的 README 中的 “先决条件” 和 “安装” 部分。

用法

要将存储在存储桶 example_s3_bucket 中的数据从 example_database 中的 LiveAnalytics 表的时间流转换为 example_table,请运行以下命令:

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

脚本完成后,

  • 在 Athena 中,将创建 example_database_example_table 表,其中包含数据的时间流。 LiveAnalytics

  • 在 Athena 中,将创建 lp_example_database_example_table 表,其中包含转换为线路协议点的数据的时间流。 LiveAnalytics

  • 在 S3 存储桶 example_s3_bucket 中,将在路径中存储线路协议 example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output数据。

建议

有关脚本最新用法的更多详细信息,请参阅转换脚本的自述文件,以及后续迁移步骤(例如验证)需要输出。如果为了提高基数而排除维度,请使用--dimensions-to-fields参数将特定维度更改为字段,从而调整架构以减少基数。

添加要验证的字段

有关如何添加用于验证的字段的信息,请参阅转换脚本自述文件中的 “添加用于验证的字段” 部分。

将数据提取到 InfluxDB 的 Timestream 中

InfluxDB 摄取脚本将压缩线路协议数据集提取到 InfluxDB 的 Timestream 中。包含 gzip 压缩行协议文件的目录作为命令行参数与摄取目标 InfluxDB 存储桶一起传入。该脚本旨在使用多处理一次提取多个文件,以利用InfluxDB和执行脚本的机器上的资源。

该脚本执行以下操作:

  • 提取压缩后的文件并将其提取到 InfluxDB 中。

  • 实现重试机制和错误处理。

  • 跟踪成功和失败的摄取以供恢复。

  • 优化从线路协议数据集读取时的 I/O 操作。

先决条件和安装

请参阅摄取脚本的 READ ME 中的 “先决条件和安装” 部分。 GitHub

数据准备

提取所需的压缩线路协议文件由数据转换脚本生成。请按照以下步骤准备数据:

  1. 设置一个具有足够存储空间的 EC2 实例,以容纳转换后的数据集。

  2. 将转换后的数据从 S3 存储桶同步到您的本地目录:

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. 确保您对数据目录中的所有文件具有读取权限。

  4. 运行以下摄取脚本,将数据提取到 InfluxDB 的 Timestream 中。

用法

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本用法

python influxdb_ingestion.py my_bucket ./data_files

摄入率

我们已经对摄入率进行了一些测试。使用 C5N.9XL EC2 实例进行摄取测试,使用 10 个工作人员执行摄取脚本,并将大约 500 GB 的线路协议摄取到 8XL Timestream 用于 InfluxDB 实例:

  • 3K IOPS 15.86 GB/小时。

  • 12K IOPS 70.34 GB/小时。

  • 16K IOPS 71.28 GB/小时。

建议

  • 使用具有足够 CPU 内核的 EC2 实例来处理并行处理。

  • 确保实例有足够的存储空间来容纳整个转换后的数据集,并有额外的提取空间。

    • 一次提取的文件数等于脚本执行期间配置的工作程序数。

  • 将 EC2 实例放置在与 InfluxDB 实例相同的区域和可用区(如果可能),以最大限度地减少延迟。

  • 考虑使用针对网络操作进行了优化的实例类型,例如 C5N。

  • 如果需要较高的摄取速率,则建议InfluxDB实例的时间流至少使用1.2K IOPS。通过增加依赖于 Timestream 的 InfluxDB 实例大小的脚本的工作程序数量,可以获得其他优化。

有关更多信息,请参阅摄取脚本的自述文件。