将数据从 Amazon S3 摄取到适用于 InfluxDB 的 Timestream 自动化系统 - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据从 Amazon S3 摄取到适用于 InfluxDB 的 Timestream 自动化系统

在 Timestream for e LiveAnalytics xport 工具完成卸载过程后,自动化过程的下一步就开始了。此自动化使用 InfluxDB 的导入工具,将数据传输到其专用的时间序列结构中。该过程对 Timestream 的数据模型进行转换,使其匹配 InfluxDB 的测量值、标签和字段概念。最后,该过程使用 InfluxDB 的行协议高效加载数据。

完成迁移的工作流程分为四大阶段:

  1. 使用 Timestream LiveAnalytics 导出工具卸载数据。

  2. 数据转换:使用 Amazon Athena 将 LiveAnalytics 数据的 Timestream 转换为 InfluxDB 线路协议格式(基于基数评估后定义的架构)。

  3. 数据摄取:将行协议数据集摄取到适用于 InfluxDB 的 Timestream 实例中。

  4. 验证:(可选)您可以验证是否已摄取每个行协议点(数据转换步骤中需要 --add-validation-field true)。

数据转换

为了进行数据转换,我们开发了一个脚本,使用Amazon Athena将 LiveAnalytics 导出的数据镶木地板格式的时间流转换为InfluxDB的线路协议格式。Amazon Athena 提供无服务器查询服务,并以经济高效的方式处理海量时间序列数据,无需专用计算资源。

脚本执行以下操作:

  • 将来自亚马逊 S3 存储桶 LiveAnalytics 的数据导出的 Timestream 加载到亚马逊 Athena 表中。

  • 执行数据映射,将存储在 Athena 表中的数据转换为行协议格式,并将其存储在 S3 存储桶中。

数据映射

下表显示了如何将 LiveAnalytics 数据的 Timestream 映射到线路协议数据。

概念的时间 LiveAnalytics 流 行协议概念

表名

测量

Dimensions

标签

度量名称

标签(可选)

措施

字段

时间

Timestamp

先决条件和安装

请参阅转换脚本的自述文件中的“先决条件”和“安装”部分。

用法

要将存储在存储桶 example_s3_bucket 中的数据从 example_database 中的 LiveAnalytics 表的时间流转换为 example_table,请运行以下命令:

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

脚本完成后:

  • 在 Athena 中,将创建 example_database_example_table 表,其中包含数据的时间流。 LiveAnalytics

  • 在 Athena 中,将创建 lp_example_database_example_table 表,其中包含转换为线路协议点的数据的时间流。 LiveAnalytics

  • 在 S3 存储桶 example_s3_bucket 中,将在路径 example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output 中存储行协议数据。

建议

请参阅转换脚本的自述文件,了解脚本的最新用法详情,以及后续迁移步骤(例如验证)需要这些输出。如果为提高基数而排除维度,请使用 --dimensions-to-fields 参数将特定维度更改为字段,从而调整架构以降低基数。

添加用于验证的字段

有关如何添加用于验证的字段的信息,请参阅转换脚本自述文件中的添加用于验证的字段部分。

将数据摄取到适用于 InfluxDB 的 Timestream

InfluxDB 摄取脚本将压缩的行协议数据集摄取到适用于 InfluxDB 的 Timestream 中。包含 gzip 压缩行协议文件的目录作为命令行参数与摄取目标 InfluxDB 存储桶一起传递。该脚本采用多进程设计,可同时处理多个文件,从而充分利用 InfluxDB 及执行脚本的计算机资源。

脚本执行以下操作:

  • 提取压缩后的文件并将其摄取到 InfluxDB 中。

  • 实施重试机制和错误处理。

  • 跟踪成功和失败的摄取以便恢复。

  • 优化从线路协议数据集读取时的 I/O 操作。

先决条件和安装

请参阅摄取脚本的 READ ME 中的 “先决条件和安装” 部分。 GitHub

数据准备

摄取所需的压缩行协议文件由数据转换脚本生成。按照以下步骤准备数据:

  1. 设置一个具有足够存储空间的 EC2 实例,以容纳转换后的数据集。

  2. 将转换后的数据从 S3 存储桶同步到本地目录:

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. 确保您对数据目录中的所有文件都具有读取权限。

  4. 运行以下摄取脚本,将数据摄取到适用于 InfluxDB 的 Timestream 中。

用法

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本用法

python influxdb_ingestion.py my_bucket ./data_files

摄取率

我们已针对摄取率进行了一些测试。使用 C5N.9XL EC2 实例进行摄取测试,使用 10 个工作人员执行摄取脚本,并将大约 500 GB 的线路协议摄取到 8XL Timestream 用于 InfluxDB 实例:

  • 3K IOPS 15.86GB/小时。

  • 12K IOPS 70.34GB/小时。

  • 16K IOPS 71.28GB/小时。

建议

  • 使用具有足够 CPU 内核的 EC2 实例来处理并行处理。

  • 确保实例有足够的存储空间可容纳整个转换后的数据集,并预留额外空间用于提取操作。

    • 每次提取的文件数量等于脚本执行期间配置的工作线程数。

  • 将 EC2 实例放置在与 InfluxDB 实例相同的区域和可用区(如果可能),以最大限度地减少延迟。

  • 考虑使用针对网络操作进行优化的实例类型,例如 C5N。

  • 如果需要满足高吞吐量需求,建议为适用于 InfluxDB 的 Timestream 实例配置至少 12K IOPS。通过增加依赖适用于 InfluxDB 的 Timestream 实例大小的脚本的工作线程数量,可实现额外的优化效果。

有关更多信息,请参阅提取脚本的自述文件