要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
从 Timestream for InfluxDB 3 中查询数据
Amazon Timestream for InfluxDB 3 提供多个查询 API 和协议,以检索时间序列数据。该平台通过 HTTP 和 Flight(gRPC)协议同时支持 SQL 和 InfluxQL 查询语言,为不同使用案例和客户端偏好提供灵活选择。
查询方法概述
InfluxDB 3 支持以下查询方法:
-
原生 v3 HTTP API:通过 REST 端点的 SQL 和 InfluxQL 查询。
-
influxdb3 CLI:用于交互式查询的命令行界面。
-
Flight+gRPC 协议:适用于客户端库的高性能二进制协议。
-
v1 兼容性 API:用于向后兼容的传统 InfluxQL 查询。
使用 v3 HTTP 查询 API
v3 API 为 SQL 和 InfluxQL 查询提供专用端点,同时支持 GET 和 POST 方法。
SQL 查询(/api/v3/query_sql)
GET 请求示例:
curl --get "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"
POST 请求示例(对于复杂查询):
curl "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --json '{ "db": "DATABASE_NAME", "q": "SELECT * FROM home WHERE room = '\''Kitchen'\'' AND temp > 20", "format": "jsonl" }'
InfluxQL 查询(/api/v3/query_influxql)
对于 InfluxQL 查询,请执行以下操作:
curl --get "https://your-cluster-endpoint:8086/api/v3/query_influxql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT mean(temp) FROM home WHERE time >= now() - 1d GROUP BY room"
查询参数
您可以使用以下查询参数:
| 参数 | 描述 | 必填? |
|---|---|---|
db
|
数据库名称 | 是 |
q
|
查询字符串(SQL 或 InfluxQL) | 是 |
format
|
响应格式(json、jsonl、csv、pretty、parquet) | 否(默认值:json) |
params
|
参数化查询的参数 | 否 |
使用 Influxdb3 CLI
InfluxDB 3 命令行界面(CLI)通过 influxdb3 命令进行调用。该界面提供一种交互式数据查询方式,支持多种输出格式。
以下显示基本查询:
influxdb3 query \ --host "your-cluster-endpoint:8086" \ --token "YOUR_TOKEN" \ --database "DATABASE_NAME" \ "SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"
以下显示使用不同输出格式的查询:
# JSON output influxdb3 query \ --database "DATABASE_NAME" \ --format json \ "SELECT * FROM home LIMIT 10" # CSV output influxdb3 query \ --database "DATABASE_NAME" \ --format csv \ "SELECT * FROM home LIMIT 10" # Parquet file output influxdb3 query \ --database "DATABASE_NAME" \ --format parquet \ --output results.parquet \ "SELECT * FROM home"
支持以下输出格式:
-
pretty(默认):人类可读的表格格式。
-
json:标准 JSON 数组。
-
jsonl:JSON 行(适合流式传输)。
-
csv:逗号分隔的值。
-
parquet:二进制列式格式(需要文件输出)。
使用 v1 兼容性 API
对于传统的 InfluxQL 查询,使用 /query 端点:
curl --get "https://your-cluster-endpoint:8086/query" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home" \ --data-urlencode "epoch=ms"
v3 HTTP 查询 API 的身份验证选项
-
持有者令牌:
Authorization: Bearer TOKEN -
基本身份验证:
--user "any:TOKEN" -
查询参数:
?p=TOKEN
v3 HTTP 查询 API 的响应格式
-
JSON(默认)
-
CSV(带
Accept: application/csv标头)
SQL 查询示例
以下显示基本查询:
-- Select specific fields with time filter SELECT temp, humidity, room FROM home WHERE time >= now() - INTERVAL '7 days' AND room = 'Kitchen' ORDER BY time DESC -- Show all tables in database SHOW TABLES -- Show columns in a table SHOW COLUMNS FROM home
以下显示聚合查询:
-- Aggregate by tags SELECT room, AVG(temp) as avg_temp, MAX(humidity) as max_humidity FROM home WHERE time >= now() - INTERVAL '24 hours' GROUP BY room -- Time-based aggregation using DATE_BIN SELECT DATE_BIN(INTERVAL '1 hour', time) as time, AVG(temp) as avg_temp, COUNT(*) as reading_count FROM home GROUP BY 1 ORDER BY time DESC
以下显示高级 SQL 功能:
-- Parameterized queries (via API) SELECT * FROM home WHERE room = $room AND temp > $min_temp AND time >= $start_time -- Gap filling with interpolation SELECT date_bin_gapfill(INTERVAL '5 minutes', time) as time, room, interpolate(avg(temp)) as temp FROM home WHERE time >= '2025-01-01T00:00:00Z' AND time <= '2025-01-01T12:00:00Z' GROUP BY 1, room -- Type casting SELECT temp::INTEGER as temp_int, CAST(humidity AS VARCHAR) as humidity_str FROM home
以下显示 InfluxQL 查询示例:
-- Basic query with time filter SELECT * FROM home WHERE time >= now() - 1d -- Aggregation with GROUP BY time SELECT MEAN(temp), MAX(humidity) FROM home WHERE time >= now() - 7d GROUP BY time(1h), room -- Using selector functions SELECT FIRST(temp), LAST(temp) FROM home WHERE time >= now() - 1h GROUP BY room
以下显示如何查询系统表以了解数据库结构和监控性能:
-- View all tables with schema information SELECT * FROM information_schema.tables -- View column details for a specific table SELECT * FROM information_schema.columns WHERE table_name = 'home' -- Monitor recent queries SELECT * FROM system.queries ORDER BY issue_time DESC LIMIT 10 -- Check cache configurations SELECT * FROM system.last_caches SELECT * FROM system.distinct_caches
查询性能最佳实践
-
使用时间筛选器:始终包括时间范围筛选器,以限制扫描的数据。
-
利用索引:设计查询,以有效使用标签筛选器。
-
选择适当的输出格式:
-
使用 jsonl 流式传输大型结果。
-
使用 parquet 进行数据导出和分析。
-
使用 csv 实现电子表格兼容性。
-
-
优化聚合:使用 DATE_BIN 进行基于时间的分组。
-
使用参数化查询:防止注入攻击以及提高可重用性。
-
监控查询性能:检查 system.queries 表是否存在缓慢查询。
支持客户端库查询
InfluxDB 3 客户端库使用 Flight+gRPC 协议,以实现最佳性能。以下 Python 代码示例演示如何实现此目标。
from influxdb3 import InfluxDBClient3 client = InfluxDBClient3( host="your-cluster-endpoint:8086", token="YOUR_TOKEN", database="DATABASE_NAME" ) # SQL query sql_result = client.query("SELECT * FROM home WHERE room = 'Kitchen'") # InfluxQL query influxql_result = client.query( "SELECT MEAN(temp) FROM home WHERE time >= now() - 1h GROUP BY room", language="influxql" )
比较 SQL 和 InfluxQL 的查询功能
下表比较 SQL 和 InfluxQL 的查询功能:
| 功能 | SQL | InfluxQL |
|---|---|---|
| 联接 | 支持 | 不支持 |
| 开窗函数 | 全面支持 | 有限 |
| 子查询。 | 支持 | 不支持 |
| 时间函数 |
DATE_BIN, INTERVAL
|
time()分组 |
| 参数化查询 | 原生支持 | 不支持 |
| 填充缺失值 |
date_bin_gapfill()
|
fill() 函数 |
通过了解这些查询功能并根据使用案例选择合适的方法,您可以有效地从 Timestream for InfluxDB 3 中检索和分析时间序列数据。
SQL 优势:
-
功能齐全的 SQL 实施,支持复杂查询。
-
支持联接、并集和窗口函数。
-
对于具有 SQL 背景的用户而言,这是熟悉的语法。
-
更广泛的分析能力。
InfluxQL 优势:
-
专为时间序列数据设计。
-
更简洁的常见时间序列操作语法。
-
向后兼容性,适用于从 InfluxDB v1 迁移的用户。
-
专用时间序列函数。
查询优化功能
InfluxDB 3 提供多种优化功能,以提高特定使用案例的查询性能。这些功能通过内存缓存和自定义索引策略,为频繁访问的数据模式提供亚毫秒级响应时间。
最后一个值缓存(LVC)
最后一个值缓存(LVC)将指定字段最新的 N 个值存储在内存中,使需要最新数据点的查询能够实现低于 10 毫秒的响应时间。此功能在核心版和企业版中均可用。
LVC 工作原理
LVC 为每个唯一键列组合(通常为标签)维护一个内存表,其中存储最近的值。例如,使用传感器数据:
Table: home ├── Tags: room, wall └── Fields: temp, humidity, co LVC Configuration: - Key columns: room, wall - Value columns: temp, humidity, co - Count: 4 (last 4 values)
缓存存储:
| 房间 | 墙壁 | temp | 湿度 | co | 时间 |
|---|---|---|---|---|---|
| 厨房 | 东部 | 22.7 | 36.5 | 26 | 2025-01-26T20:00:00Z |
| 厨房 | 东部 | 22.7 | 36.0 | 9 | 2025-01-26T17:00:00Z |
| 厨房 | 东部 | 22.7 | 36.2 | 3 | 2025-01-26T15:00:00Z |
| 厨房 | 东部 | 22.7 | 36.1 | 0 | 2025-01-26T10:00:00Z |
| 客厅 | 北部 | 22.2 | 36.4 | 17 | 2025-01-26T20:00:00Z |
| ... | ... | ... | ... | ... | ... |
创建 LVC
使用以下命令创建 LVC:
influxdb3 create last_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table home \ --key-columns room,wall \ --value-columns temp,hum,co \ --count 5 \ --ttl 30mins \ homeLastCache
查询 LVC
使用以下命令查询 LVC:
-- Query the last cached values SELECT * FROM last_cache('home', 'homeLastCache') -- Filter specific series SELECT * FROM last_cache('home', 'homeLastCache') WHERE room = 'Kitchen'
LVC 最佳实践
-
管理基数:仅将基本标签作为键列,以最大限度减少内存使用量。
-
优化值计数:在查询需求和内存消耗之间实现平衡。
-
考虑 TTL:为缓存条目设置适当的生存时间。
-
显示器内存:缓存大小 =(key_column_cardinality × count × value_columns)。
唯一值缓存(DVC)
唯一值缓存(DVC)维护内存中指定列的唯一值,将元数据查询响应时间缩短至 30 毫秒以内。核心版和企业版均可用。
DVC 工作原理
DVC 存储指定列的所有唯一值组合,特别适合需列出可用标签值或元数据的查询。
位置数据缓存示例:
| country | county | city |
|---|---|---|
| 奥地利 | 萨尔斯堡 | 萨尔斯堡 |
| 奥地利 | 维也纳 | 维也纳 |
| 比利时 | 安特卫普 | 安特卫普 |
| 比利时 | 西弗兰德斯 | 布鲁日 |
| 捷克共和国 | 布拉格 | 布拉格 |
创建 DVC
使用以下命令创建 DVC:
influxdb3 create distinct_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ --columns country,county,city \ --max-cardinality 10000 \ --max-age 24h \ windDistinctCache
查询 DVC
使用以下命令查询 DVC:
-- Get all distinct values SELECT * FROM distinct_cache('wind_data', 'windDistinctCache') -- Get distinct countries SELECT DISTINCT country FROM distinct_cache('wind_data', 'windDistinctCache')
DVC 最佳实践
-
设置基数限制:定义最大唯一值组合以控制内存。
-
配置最大年龄:自动清除过期值。
-
缓存策略列:重点缓存元数据查询中频繁使用的列。
-
显示器缓存大小:基数越高意味着需要更多内存。
文件索引仅在企业版中可用
仅在企业版中可用。文件索引允许自定义数据在存储中的索引方式,从而显著提升单系列查询性能。
默认索引与自定义索引
默认索引:InfluxDB 对所有标签及时间进行索引。
自定义索引:仅对与查询相关的列进行索引。
示例优化:
Schema columns: country, state_province, county, city, postal_code Query patterns: Always filter by country, state_province, city Custom index: time, country, state_province, city (skip county, postal_code)
以下展示如何创建自定义文件索引
influxdb3 create file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ country,state_province,city
以下展示如何删除文件索引:
influxdb3 delete file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data
文件索引注意事项
-
需要压缩:索引在数据压缩(gen2+)期间构建。
-
仅限字符串列:可同时索引标签和字符串字段。
-
查询模式分析:创建自定义索引之前,先分析工作负载。
-
单系列优化:最适合针对特定系列的查询。
管理缓存信息
使用系统表查看缓存配置和统计信息:
-- View Last Value Caches SELECT * FROM system.last_caches -- View Distinct Value Caches SELECT * FROM system.distinct_caches -- Check cache memory usage SELECT cache_name, table_name, key_columns, value_count, memory_size_bytes FROM system.last_caches
性能优化策略
根据查询模式选择恰当的优化功能:
| 查询模式 | 推荐的功能 | 预期性能 |
|---|---|---|
| 各系列的最新值 | 最后一个值缓存 | <10ms |
| 可用标签值 | 唯一值缓存 | <30ms |
| 单系列查询 | 文件索引(企业版) | 显著改进 |
| 时间范围聚合 | 标准索引 | 基准性能 |
内存和资源注意事项
缓存内存公式:
-
LVC:内存 = key_cardinality × value_count × value_columns × data_size
-
DVC:内存 = distinct_combinations × column_count × data_size
最佳实践
-
从小型缓存开始,并监控内存使用情况
-
使用 TTL 设置,限制缓存增长
-
仅缓存经常查询的数据
-
通过系统表监控缓存命中率
-
对于企业版:将缓存与自定义文件索引相结合,实现最佳性能
通过合理运用这些优化功能,您可以在有效管理资源消耗的同时,显著提升特定查询模式的性能。