使用处理引擎插件扩展适用于 InfluxDB 的 Timestream - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用处理引擎插件扩展适用于 InfluxDB 的 Timestream

处理引擎是一种嵌入式 Python 虚拟机,在 Amazon Timestream 的 InfluxDB 3 数据库内部运行。提供核心版和企业版两种版本。使您能够使用自定义 Python 代码扩展数据库,这些代码可以自动执行工作流程、转换数据以及创建自定义 API 端点。

处理引擎执行 Python 插件,以响应特定的数据库事件:

  • 数据写入:对进入数据库的数据进行处理和转换

  • 计划事件:按定义的时间间隔或特定的时间运行代码

  • HTTP 请求:暴露执行代码的自定义 API 端点

该引擎包含内存缓存,用于管理执行之间的状态,使您能够直接在数据库中构建具有状态的应用程序。

InfluxData 经过认证的插件

在发布时,InfluxDB 3 包括一组经过以下认证的预构建、完全可配置的插件: InfluxData

  • 数据转换:处理并丰富传入数据

  • 警报:根据数据阈值发送通知

  • 聚合:计算时间序列数据的统计数据

  • 系统监控:跟踪资源使用情况和运行状况指标

  • 集成:Connect 连接到外部服务和 APIs

这些经过认证的插件已准备就绪,可通过触发器参数进行配置以满足特定需求。

插件类型和触发器规格

插件类型 触发器规格 插件何时运行 使用案例
数据写入 table:<TABLE_NAME>all_tables 何时将数据写入表 数据转换、警报、派生指标
已安排 every:<DURATION>cron:<EXPRESSION> 在指定间隔 定期聚合、报告、运行状况检查
HTTP 请求 request:<REQUEST_PATH> 何时接收 HTTP 请求 自定义 APIs、网络挂钩、用户界面

创建触发器

触发器将插件连接到数据库事件,并定义其执行时间。使用 influxdb3 create trigger 命令。

要创建数据写入触发器,请执行以下操作:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

要创建计划触发器,请执行以下操作:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

要创建 HTTP 请求触发器,请执行以下操作

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

访问端点,网址为:https://your-cluster-endpoint:8086/api/v3/engine/webhook

配置触发器

向插件传递参数

使用触发器参数配置插件行为:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

参数以字典形式传递到插件:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

错误处理行为

配置触发器处理错误的方式:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

异步执行

允许同时运行多个触发器实例:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

管理触发器

要查看数据库的触发器,请执行以下操作:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

写入触发器的表排除项

使用 all_tables 时,要在插件代码中筛选表,请执行以下操作:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

插件实施如下:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

分布式部署注意事项

在多节点部署中,根据节点角色配置插件:

插件类型 节点类型 原因
数据写入插件 摄取程序节点 在摄取点处理数据
HTTP 请求插件 查询器节点 处理 API 流量
计划插件 任何已配置的节点 可使用调度器在任何节点上运行

以下注意事项对于企业部署至关重要:

  • 确保所有相关节点保持相同的插件配置。

  • 将外部客户端(Grafana、控制面板)路由到查询器节点。

  • 确保插件在执行其触发器的节点上可用。

最佳实践

  • 插件配置

    • 使用触发器参数设置可配置值,而非硬编码。

    • 在插件中实施正确的错误处理。

    • 使用 influxdb3_local API 进行数据库操作。

  • 性能优化

    • 对于繁重的处理任务,使用异步执行。

    • 对筛选后的数据实施提前返回。

    • 尽量减少插件内的数据库查询。

  • 错误管理

    • 选择适当的错误行为(日志记录、重试或禁用)。

    • 通过系统表监控插件执行情况。

    • 在生产部署之前,请对插件进行全面测试。

  • 安全注意事项

    • 验证 HTTP 请求插件中的所有输入数据。

    • 使用安全方法存储敏感配置。

    • 将插件权限限制为仅执行必需操作。

监控插件执行情况

查询系统表以监控插件性能:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

处理引擎提供一种扩展 InfluxDB 3 功能的强大方法,同时使数据处理逻辑接近数据,从而减少延迟并简化架构。

InfluxData 经过认证的插件

Amazon Timestream for InfluxDB 3 包含一整套经过认证的预构建插件,无需自定义开发即可扩展数据库功能。这些插件在启动时即可完全配置且随时可用,为数据处理、监控和警报提供高级功能。

要获取完整的文档和源代码,请访问InfluxData插件存储库

可用插件

异常检测插件

基于 MAD 的异常检测

  • 触发器类型:数据写入(实时)

  • 使用案例:流数据的实时异常值检测、传感器监控、质量控制。

  • GitHub: MAD 异常检测文档

工作原理:使用中位数绝对偏差 (MAD),为正常行为建立稳健的基线。随着新数据的到来,它会计算出每个点与中位数相差多少 MADs 点。超过阈值(k * MAD)的点会标记为异常。

主要特点:

  • 数据写入时的实时处理。

  • 保持内存中滑动窗口,以提高效率。

  • 基于计数的警报(例如,连续 5 次异常)。

  • 基于持续时间的警报(例如,持续 2 分钟的异常)。

  • 禁用翻转功能,以防止快速变化的值引发警报疲劳。

示例用法:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

输出:在检测到异常时发送实时通知,包括字段名称、值和持续时间。

数据转换插件

基本转换

  • 触发器类型:计划、数据写入

  • 使用案例:数据标准化、单位转换、字段名称标准化、数据清理。

  • GitHub: 基本转换文档

工作原理:对字段名称和值应用一系列转换操作。可批量(计划)处理历史数据,也可在数据到达时对其进行转换(数据写入)。转换按指定顺序应用,从而支持复杂的数据管道。

主要特点:

  • 字段名称转换:蛇形命名法、删除空格、仅限字母数字。

  • 单位转换:温度、压力、长度、时间单位。

  • 支持正则表达式的自定义字符串替换。

  • 试运行模式,无需写入数据即可进行测试。

  • 历史数据的批量处理。

示例用法:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

输出:使用转换后的数据创建新表,同时保留原始时间戳和标签。

降低取样频率取样器

  • 触发器类型:计划、HTTP

  • 使用案例:数据压缩、长期存储优化、创建摘要统计数据、性能提升。

  • GitHub: 向下采样器文档

工作原理:将高分辨率的时间序列数据聚合为低分辨率的摘要。例如,将 1 秒数据转换为 1 小时平均值。每个降低取样频率取样器点都包含有关原始压缩点数量和所覆盖时间范围的元数据。

主要特点:

  • 多种聚合函数:平均值、总和、最小值、最大值、中位数、导数。

  • 特定于字段的聚合(不同字段使用不同函数)。

  • 元数据跟踪(record_count、time_from、time_to)。

  • HTTP API,用于通过回填进行按需缩减采样。

  • 可配置的批量大小,适用于大型数据集。

示例用法:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

输出:使用聚合值以及显示压缩点数和时间范围的元数据列,创建缩减采样数据。

监控和警报插件

状态更改监控

  • 触发器类型:计划、数据写入

  • 使用案例:状态监控、设备状态跟踪、过程监控、更改检测。

  • GitHub: 状态变更文档

工作原理:跟踪字段值随时间推移而出现的更改,并在更改数量超过配置的阈值时发出警报。可检测值的更改情况(不同的值)和特定的值条件(等于目标值)。包括稳定性检查,以防止噪声信号触发警报。

主要特点:

  • 基于计数的更改检测(例如,十分钟内出现五次更改)。

  • 基于持续时间的监控(例如,状态 = “错误”,持续五分钟)。

  • 用于降噪的状态更改窗口。

  • 具备独立阈值的多字段监控。

  • 可配置的稳定性要求。

示例用法:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

输出:警报包括字段名称、检测到的更改数量、时间窗口以及相关标签值。

系统指标收集器

  • 触发器类型:计划

  • 使用案例:基础设施监控、性能基准、容量规划、资源跟踪。

  • GitHub: 系统指标文档

工作原理:使用 psutil 库,从运行 InfluxDB 的主机收集全面的系统指标。以可配置的间隔收集 CPU、内存、磁盘和网络统计数据。每种指标类型均可独立启用/禁用。

主要特点:

  • 每核 CPU 统计数据,包含负载平均值。

  • 内存使用情况,包含交换和页面错误。

  • 包含计算的 IOPS 和延迟的磁盘 I/O 指标。

  • 带错误跟踪的网络接口统计信息。

  • 可配置的指标收集(启用/禁用特定类型)。

  • 收集失败时自动重试。

示例用法:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

输出:创建多个表(system_cpu、system_memory、system_disk_io 等),包含每个子系统的详细指标。

预测性分析插件

Prophet 预测

  • 触发器类型:计划、HTTP

  • 使用案例:需求预测、容量规划、趋势分析、季节性模式检测。

  • GitHub: Pro phet 预测文档

工作原理:使用 Facebook 的 Prophet 库,以构建时间序列预测模型。可根据历史数据训练模型,并生成对未来时间段的预测。模型可解释趋势、季节性、节假日和变化点。支持模型持久性,以实现一致的预测。

主要特点:

  • 自动季节性检测(每日、每周、每年)。

  • 节假日日历支持(内置和自定义)。

  • 趋势变化的转折点检测。

  • 模型持久性和版本控制。

  • 预测的置信区间。

  • 使用 MSRE 阈值进行验证。

示例用法:

# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check

输出:当预测误差超过阈值时发送通知,包括错误指标值以及受影响的时间范围。

常见配置模式

使用 TOML 配置文件

对于复杂配置,使用 TOML 文件而非内联参数:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

串联插件

通过串联多个插件创建数据处理管道:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

插件最佳实践

  • 保守起步:先设定较高的阈值和较长的窗口期,再根据观察到的模式进行调整。

  • 开发阶段测试:在生产部署之前,使用试运行模式和测试数据库。

  • 监控插件性能:检查系统表中的执行时间和资源使用情况。

  • 使用适当的触发器类型:选择计划触发器用于批量处理,选择数据写入触发器用于实时处理。

  • 明智地配置通知:使用严重性级别和防抖逻辑,避免警报疲劳。

  • 利用模型持久性:对于基于机器学习的插件,保存经过训练的模型以确保一致性。

  • 文档配置:使用描述性触发器名称,并维护配置文档。

监控插件执行情况

要监控插件性能,请执行以下操作:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

常见问题故障排除

下表显示常见问题及可能的解决方案。

问题 解决方案
插件未触发 验证触发器是否已启用,检查 schedule/spec 语法
缺少通知 确认已安装通知器插件,请检查 webhook URLs
高内存使用量 缩减窗口大小,调整批处理间隔
不正确的转换 使用试运行模式,验证字段名称及数据类型
预测不准确 扩大训练数据窗口,调整季节性设置
警报过多 增加触发次数、添加防抖持续时间、调整阈值

这些经过认证的插件为常见的时间序列数据处理需求提供企业就绪功能,既无需进行自定义开发,又通过全面的配置选项保持灵活性。访问GitHub存储库以获取详细的文档、示例和更新。