从 Timestream for InfluxDB 3 中查询数据 - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

从 Timestream for InfluxDB 3 中查询数据

Amazon Timestream for InfluxDB 3 提供多个查询 API 和协议,以检索时间序列数据。该平台通过 HTTP 和 Flight(gRPC)协议同时支持 SQL 和 InfluxQL 查询语言,为不同使用案例和客户端偏好提供灵活选择。

查询方法概述

InfluxDB 3 支持以下查询方法:

  • 原生 v3 HTTP API:通过 REST 端点的 SQL 和 InfluxQL 查询。

  • influxdb3 CLI:用于交互式查询的命令行界面。

  • Flight+gRPC 协议:适用于客户端库的高性能二进制协议。

  • v1 兼容性 API:用于向后兼容的传统 InfluxQL 查询。

使用 v3 HTTP 查询 API

v3 API 为 SQL 和 InfluxQL 查询提供专用端点,同时支持 GET 和 POST 方法。

SQL 查询(/api/v3/query_sql

GET 请求示例:

curl --get "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"

POST 请求示例(对于复杂查询):

curl "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --json '{ "db": "DATABASE_NAME", "q": "SELECT * FROM home WHERE room = '\''Kitchen'\'' AND temp > 20", "format": "jsonl" }'

InfluxQL 查询(/api/v3/query_influxql

对于 InfluxQL 查询,请执行以下操作:

curl --get "https://your-cluster-endpoint:8086/api/v3/query_influxql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT mean(temp) FROM home WHERE time >= now() - 1d GROUP BY room"

查询参数

您可以使用以下查询参数:

参数 描述 必填
db 数据库名称
q 查询字符串(SQL 或 InfluxQL)
format 响应格式(json、jsonl、csv、pretty、parquet) 否(默认值:json)
params 参数化查询的参数

使用 Influxdb3 CLI

InfluxDB 3 命令行界面(CLI)通过 influxdb3 命令进行调用。该界面提供一种交互式数据查询方式,支持多种输出格式。

以下显示基本查询:

influxdb3 query \ --host "your-cluster-endpoint:8086" \ --token "YOUR_TOKEN" \ --database "DATABASE_NAME" \ "SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"

以下显示使用不同输出格式的查询:

# JSON output influxdb3 query \ --database "DATABASE_NAME" \ --format json \ "SELECT * FROM home LIMIT 10" # CSV output influxdb3 query \ --database "DATABASE_NAME" \ --format csv \ "SELECT * FROM home LIMIT 10" # Parquet file output influxdb3 query \ --database "DATABASE_NAME" \ --format parquet \ --output results.parquet \ "SELECT * FROM home"

支持以下输出格式:

  • pretty(默认):人类可读的表格格式。

  • json:标准 JSON 数组。

  • jsonl:JSON 行(适合流式传输)。

  • csv:逗号分隔的值。

  • parquet:二进制列式格式(需要文件输出)。

使用 v1 兼容性 API

对于传统的 InfluxQL 查询,使用 /query 端点:

curl --get "https://your-cluster-endpoint:8086/query" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home" \ --data-urlencode "epoch=ms"

v3 HTTP 查询 API 的身份验证选项

  • 持有者令牌:Authorization: Bearer TOKEN

  • 基本身份验证:--user "any:TOKEN"

  • 查询参数:?p=TOKEN

v3 HTTP 查询 API 的响应格式

  • JSON(默认)

  • CSV(带 Accept: application/csv 标头)

SQL 查询示例

以下显示基本查询:

-- Select specific fields with time filter SELECT temp, humidity, room FROM home WHERE time >= now() - INTERVAL '7 days' AND room = 'Kitchen' ORDER BY time DESC -- Show all tables in database SHOW TABLES -- Show columns in a table SHOW COLUMNS FROM home

以下显示聚合查询:

-- Aggregate by tags SELECT room, AVG(temp) as avg_temp, MAX(humidity) as max_humidity FROM home WHERE time >= now() - INTERVAL '24 hours' GROUP BY room -- Time-based aggregation using DATE_BIN SELECT DATE_BIN(INTERVAL '1 hour', time) as time, AVG(temp) as avg_temp, COUNT(*) as reading_count FROM home GROUP BY 1 ORDER BY time DESC

以下显示高级 SQL 功能:

-- Parameterized queries (via API) SELECT * FROM home WHERE room = $room AND temp > $min_temp AND time >= $start_time -- Gap filling with interpolation SELECT date_bin_gapfill(INTERVAL '5 minutes', time) as time, room, interpolate(avg(temp)) as temp FROM home WHERE time >= '2025-01-01T00:00:00Z' AND time <= '2025-01-01T12:00:00Z' GROUP BY 1, room -- Type casting SELECT temp::INTEGER as temp_int, CAST(humidity AS VARCHAR) as humidity_str FROM home

以下显示 InfluxQL 查询示例:

-- Basic query with time filter SELECT * FROM home WHERE time >= now() - 1d -- Aggregation with GROUP BY time SELECT MEAN(temp), MAX(humidity) FROM home WHERE time >= now() - 7d GROUP BY time(1h), room -- Using selector functions SELECT FIRST(temp), LAST(temp) FROM home WHERE time >= now() - 1h GROUP BY room

以下显示如何查询系统表以了解数据库结构和监控性能:

-- View all tables with schema information SELECT * FROM information_schema.tables -- View column details for a specific table SELECT * FROM information_schema.columns WHERE table_name = 'home' -- Monitor recent queries SELECT * FROM system.queries ORDER BY issue_time DESC LIMIT 10 -- Check cache configurations SELECT * FROM system.last_caches SELECT * FROM system.distinct_caches

查询性能最佳实践

  • 使用时间筛选器:始终包括时间范围筛选器,以限制扫描的数据。

  • 利用索引:设计查询,以有效使用标签筛选器。

  • 选择适当的输出格式

    • 使用 jsonl 流式传输大型结果。

    • 使用 parquet 进行数据导出和分析。

    • 使用 csv 实现电子表格兼容性。

  • 优化聚合:使用 DATE_BIN 进行基于时间的分组。

  • 使用参数化查询:防止注入攻击以及提高可重用性。

  • 监控查询性能:检查 system.queries 表是否存在缓慢查询。

支持客户端库查询

InfluxDB 3 客户端库使用 Flight+gRPC 协议,以实现最佳性能。以下 Python 代码示例演示如何实现此目标。

from influxdb3 import InfluxDBClient3 client = InfluxDBClient3( host="your-cluster-endpoint:8086", token="YOUR_TOKEN", database="DATABASE_NAME" ) # SQL query sql_result = client.query("SELECT * FROM home WHERE room = 'Kitchen'") # InfluxQL query influxql_result = client.query( "SELECT MEAN(temp) FROM home WHERE time >= now() - 1h GROUP BY room", language="influxql" )

比较 SQL 和 InfluxQL 的查询功能

下表比较 SQL 和 InfluxQL 的查询功能:

功能 SQL InfluxQL
联接 支持 不支持
开窗函数 全面支持 有限
子查询 支持 不支持
时间函数 DATE_BIN, INTERVAL time()分组
参数化查询 原生支持 不支持
填充缺失值 date_bin_gapfill() fill() 函数

通过了解这些查询功能并根据使用案例选择合适的方法,您可以有效地从 Timestream for InfluxDB 3 中检索和分析时间序列数据。

SQL 优势:

  • 功能齐全的 SQL 实施,支持复杂查询。

  • 支持联接、并集和窗口函数。

  • 对于具有 SQL 背景的用户而言,这是熟悉的语法。

  • 更广泛的分析能力。

InfluxQL 优势:

  • 专为时间序列数据设计。

  • 更简洁的常见时间序列操作语法。

  • 向后兼容性,适用于从 InfluxDB v1 迁移的用户。

  • 专用时间序列函数。

查询优化功能

InfluxDB 3 提供多种优化功能,以提高特定使用案例的查询性能。这些功能通过内存缓存和自定义索引策略,为频繁访问的数据模式提供亚毫秒级响应时间。

最后一个值缓存(LVC)

最后一个值缓存(LVC)将指定字段最新的 N 个值存储在内存中,使需要最新数据点的查询能够实现低于 10 毫秒的响应时间。此功能在核心版和企业版中均可用。

LVC 工作原理

LVC 为每个唯一键列组合(通常为标签)维护一个内存表,其中存储最近的值。例如,使用传感器数据:

Table: home ├── Tags: room, wall └── Fields: temp, humidity, co LVC Configuration: - Key columns: room, wall - Value columns: temp, humidity, co - Count: 4 (last 4 values)

缓存存储:

房间 墙壁 temp 湿度 co 时间
厨房 东部 22.7 36.5 26 2025-01-26T20:00:00Z
厨房 东部 22.7 36.0 9 2025-01-26T17:00:00Z
厨房 东部 22.7 36.2 3 2025-01-26T15:00:00Z
厨房 东部 22.7 36.1 0 2025-01-26T10:00:00Z
客厅 北部 22.2 36.4 17 2025-01-26T20:00:00Z
... ... ... ... ... ...

创建 LVC

使用以下命令创建 LVC:

influxdb3 create last_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table home \ --key-columns room,wall \ --value-columns temp,hum,co \ --count 5 \ --ttl 30mins \ homeLastCache

查询 LVC

使用以下命令查询 LVC:

-- Query the last cached values SELECT * FROM last_cache('home', 'homeLastCache') -- Filter specific series SELECT * FROM last_cache('home', 'homeLastCache') WHERE room = 'Kitchen'

LVC 最佳实践

  • 管理基数:仅将基本标签作为键列,以最大限度减少内存使用量。

  • 优化值计数:在查询需求和内存消耗之间实现平衡。

  • 考虑 TTL:为缓存条目设置适当的生存时间。

  • 显示器内存:缓存大小 =(key_column_cardinality × count × value_columns)。

唯一值缓存(DVC)

唯一值缓存(DVC)维护内存中指定列的唯一值,将元数据查询响应时间缩短至 30 毫秒以内。核心版和企业版均可用。

DVC 工作原理

DVC 存储指定列的所有唯一值组合,特别适合需列出可用标签值或元数据的查询。

位置数据缓存示例:

country county city
奥地利 萨尔斯堡 萨尔斯堡
奥地利 维也纳 维也纳
比利时 安特卫普 安特卫普
比利时 西弗兰德斯 布鲁日
捷克共和国 布拉格 布拉格

创建 DVC

使用以下命令创建 DVC:

influxdb3 create distinct_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ --columns country,county,city \ --max-cardinality 10000 \ --max-age 24h \ windDistinctCache

查询 DVC

使用以下命令查询 DVC:

-- Get all distinct values SELECT * FROM distinct_cache('wind_data', 'windDistinctCache') -- Get distinct countries SELECT DISTINCT country FROM distinct_cache('wind_data', 'windDistinctCache')

DVC 最佳实践

  • 设置基数限制:定义最大唯一值组合以控制内存。

  • 配置最大年龄:自动清除过期值。

  • 缓存策略列:重点缓存元数据查询中频繁使用的列。

  • 显示器缓存大小:基数越高意味着需要更多内存。

文件索引仅在企业版中可用

仅在企业版中可用。文件索引允许自定义数据在存储中的索引方式,从而显著提升单系列查询性能。

默认索引与自定义索引

默认索引:InfluxDB 对所有标签及时间进行索引。 

自定义索引:仅对与查询相关的列进行索引。

示例优化:

Schema columns: country, state_province, county, city, postal_code Query patterns: Always filter by country, state_province, city Custom index: time, country, state_province, city (skip county, postal_code)

以下展示如何创建自定义文件索引

influxdb3 create file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ country,state_province,city

以下展示如何删除文件索引:

influxdb3 delete file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data

文件索引注意事项

  • 需要压缩:索引在数据压缩(gen2+)期间构建。

  • 仅限字符串列:可同时索引标签和字符串字段。

  • 查询模式分析:创建自定义索引之前,先分析工作负载。

  • 单系列优化:最适合针对特定系列的查询。

管理缓存信息

使用系统表查看缓存配置和统计信息:

-- View Last Value Caches SELECT * FROM system.last_caches -- View Distinct Value Caches SELECT * FROM system.distinct_caches -- Check cache memory usage SELECT cache_name, table_name, key_columns, value_count, memory_size_bytes FROM system.last_caches

性能优化策略

根据查询模式选择恰当的优化功能:

查询模式 推荐的功能 预期性能
各系列的最新值 最后一个值缓存 <10ms
可用标签值 唯一值缓存 <30ms
单系列查询 文件索引(企业版) 显著改进
时间范围聚合 标准索引 基准性能

内存和资源注意事项

缓存内存公式:

  • LVC:内存 = key_cardinality × value_count × value_columns × data_size

  • DVC:内存 = distinct_combinations × column_count × data_size

最佳实践

  • 从小型缓存开始,并监控内存使用情况

  • 使用 TTL 设置,限制缓存增长

  • 仅缓存经常查询的数据

  • 通过系统表监控缓存命中率

  • 对于企业版:将缓存与自定义文件索引相结合,实现最佳性能

通过合理运用这些优化功能,您可以在有效管理资源消耗的同时,显著提升特定查询模式的性能。