Extend Timestream for InfluxDB with processing engine plugins - Amazon Timestream
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

For similar capabilities to Amazon Timestream for LiveAnalytics, consider Amazon Timestream for InfluxDB. It offers simplified data ingestion and single-digit millisecond query response times for real-time analytics. Learn more here.

Extend Timestream for InfluxDB with processing engine plugins

The Processing engine is an embedded Python virtual machine that runs inside your InfluxDB 3 database in Amazon Timestream. It's available in both Core and Enterprise editions. It enables you to extend your database with custom Python code that can automate workflows, transform data, and create custom API endpoints.

The Processing engine executes Python plugins in response to specific database events:

  • Data writes: Process and transform data as it enters the database

  • Scheduled events: Run code at defined intervals or specific times

  • HTTP requests: Expose custom API endpoints that execute your code

The engine includes an in-memory cache for managing state between executions, enabling you to build stateful applications directly within your database.

InfluxData certified plugins

At launch, InfluxDB 3 includes a set of pre-built, fully configurable plugins certified by InfluxData:

  • Data transformation: Process and enrich incoming data

  • Alerting: Send notifications based on data thresholds

  • Aggregation: Calculate statistics on time-series data

  • System monitoring: Track resource usage and health metrics

  • Integration: Connect to external services and APIs

These certified plugins are ready to use and can be configured through trigger arguments to meet your specific requirements.

Plugin types and trigger specifications

Plugin Type Trigger Specification When Plugin Runs Use Cases
Data write table:<TABLE_NAME> or all_tables When data is written to tables Data transformation, alerting, derived metrics
Scheduled every:<DURATION> or cron:<EXPRESSION> At specified intervals Periodic aggregation, reports, health checks
HTTP request request:<REQUEST_PATH> When HTTP requests are received Custom APIs, webhooks, user interfaces

Create triggers

Triggers connect plugins to database events and define when they execute. Use the influxdb3 create trigger command.

To create a data write trigger:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

To create a scheduled trigger:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

To create an HTTP request trigger:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Access the endpoint at: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Configure triggers

Passing arguments to plugins

Configure plugin behavior using trigger arguments:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Arguments are passed to the plugin as a dictionary:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Error handling behavior

Configure how triggers handle errors:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Asynchronous execution

Allow multiple trigger instances to run simultaneously:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Manage triggers

To view triggers for a database:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Table exclusion for write triggers

To filter tables within your plugin code when using all_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

The plugin implementation is as follows:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Distributed deployment considerations

In multi-node deployments, configure plugins based on node roles:

Plugin Type Node Type Reason
Data write plugins Ingester nodes Process data at ingestion point
HTTP request plugins Querier nodes Handle API traffic
Scheduled plugins Any configured node Can run on any node with scheduler

The following considerations are important for enterprise deployments:

  • Maintain identical plugin configurations across all relevant nodes.

  • Route external clients (Grafana, dashboards) to querier nodes.

  • Ensure plugins are available on nodes where their triggers execute.

Best practices

  • Plugin configuration

    • Use trigger arguments for configurable values instead of hardcoding.

    • Implement proper error handling within plugins.

    • Use the influxdb3_local API for database operations.

  • Performance optimization

    • Use asynchronous execution for heavy processing tasks.

    • Implement early returns for filtered data.

    • Minimize database queries within plugins.

  • Error management

    • Choose appropriate error behavior (log, retry, or disable).

    • Monitor plugin execution through system tables.

    • Test plugins thoroughly before production deployment.

  • Security considerations

    • Validate all input data in HTTP request plugins.

    • Use secure methods for storing sensitive configuration.

    • Limit plugin permissions to required operations only.

Monitor plugin execution

Query system tables to monitor plugin performance:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

The Processing engine provides a powerful way to extend InfluxDB 3 functionality while keeping your data processing logic close to your data, reducing latency and simplifying your architecture.

InfluxData certified plugins

Amazon Timestream for InfluxDB 3 includes a comprehensive set of pre-built, certified plugins that extend database functionality without requiring custom development. These plugins are fully configurable and ready to use at launch, providing advanced capabilities for data processing, monitoring, and alerting.

For complete documentation and source code, visit the InfluxData Plugins Repository.

Available plugins

Anomaly detection plugins

MAD-based anomaly detection

  • Trigger types: Data write (real-time)

  • Use cases: Real-time outlier detection for streaming data, sensor monitoring, quality control.

  • GitHubMAD Anomaly Detection Documentation

How it works: Uses Median Absolute Deviation (MAD) to establish a robust baseline for normal behavior. As new data arrives, it calculates how many MADs away from the median each point is. Points exceeding the threshold (k * MAD) are flagged as anomalies.

Key features:

  • Real-time processing as data is written.

  • Maintains in-memory sliding windows for efficiency.

  • Count-based alerts (e.g., 5 consecutive anomalies).

  • Duration-based alerts (e.g., anomaly for 2 minutes).

  • Flip suppression to prevent alert fatigue from rapidly changing values.

Example usage:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Output: Sends real-time notifications when anomalies are detected, including the field name, value, and duration.

Data transformation plugins

Basic transformation

  • Trigger types: Scheduled, Data write

  • Use cases: Data standardization, unit conversions, field name normalization, data cleaning.

  • GitHubBasic Transformation Documentation

How it works: Applies a chain of transformations to field names and values. Can process historical data in batches (scheduled) or transform data as it arrives (data write). Transformations are applied in the order specified, allowing complex data pipelines.

Key features:

  • Field name transformations: snake_case, remove spaces, alphanumeric only.

  • Unit conversions: Temperature, pressure, length, time units.

  • Custom string replacements with regex support.

  • Dry-run mode for testing without writing data.

  • Batch processing for historical data.

Example usage:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Output: Creates a new table with transformed data, preserving original timestamps and tags.

Downsampler

  • Trigger types: Scheduled, HTTP

  • Use cases: Data reduction, long-term storage optimization, creating summary statistics, performance improvement.

  • GitHubDownsampler Documentation

How it works: Aggregates high-resolution time-series data into lower-resolution summaries. For example, converts 1-second data into 1-hour averages. Each downsampled point includes metadata about the number of original points compressed and the time range covered.

Key features:

  • Multiple aggregation functions: avg, sum, min, max, median, derivative.

  • Field-specific aggregations (different functions for different fields).

  • Metadata tracking (record_count, time_from, time_to).

  • HTTP API for on-demand downsampling with backfill.

  • Configurable batch sizes for large datasets.

Example usage:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Output: Creates downsampled data with aggregated values plus metadata columns showing the number of points compressed and time range.

Monitoring and alerting plugins

State change monitor

  • Trigger types: Scheduled, Data write

  • Use cases: Status monitoring, equipment state tracking, process monitoring, change detection.

  • GitHubState Change Documentation

How it works: Tracks field value changes over time and alerts when the number of changes exceeds configured thresholds. Can detect both value changes (different values) and specific value conditions (equals a target value). Includes stability checks to prevent alerts from noisy signals.

Key features:

  • Count-based change detection (e.g., five changes in ten minutes).

  • Duration-based monitoring (e.g., status = "error" for five minutes).

  • State change window for noise reduction.

  • Multi-field monitoring with independent thresholds.

  • Configurable stability requirements.

Example usage:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Output: Alerts include the field name, number of changes detected, time window, and relevant tag values.

System metrics collector

  • Trigger types: Scheduled

  • Use cases: Infrastructure monitoring, performance baselines, capacity planning, resource tracking.

  • GitHubSystem Metrics Documentation

How it works: Uses the psutil library to collect comprehensive system metrics from the host running InfluxDB. Collects CPU, memory, disk, and network statistics at configurable intervals. Each metric type can be enabled/disabled independently.

Key features:

  • Per-core CPU statistics with load averages.

  • Memory usage including swap and page faults.

  • Disk I/O metrics with calculated IOPS and latency.

  • Network interface statistics with error tracking.

  • Configurable metric collection (enable/disable specific types).

  • Automatic retry on collection failures.

Example usage:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Output: Creates multiple tables (system_cpu, system_memory, system_disk_io, etc.) with detailed metrics for each subsystem.

Predictive analytics plugins

Prophet forecasting

  • Trigger types: Scheduled, HTTP

  • Use cases: Demand forecasting, capacity planning, trend analysis, seasonal pattern detection.

  • GitHubProphet Forecasting Documentation

How it works: Uses Facebook's Prophet library to build time-series forecasting models. Can train models on historical data and generate predictions for future time periods. Models account for trends, seasonality, holidays, and changepoints. Supports model persistence for consistent predictions.

Key features:

  • Automatic seasonality detection (daily, weekly, yearly).

  • Holiday calendar support (built-in and custom).

  • Changepoint detection for trend shifts.

  • Model persistence and versioning.

  • Confidence intervals for predictions.

  • Validation with MSRE thresholds.

Example usage:

# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check

Output: Sends notifications when forecast error exceeds thresholds, including the error metric value and affected time range.

Common configuration patterns

Using TOML configuration files

For complex configurations, use TOML files instead of inline arguments:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Chaining plugins

Create data processing pipelines by chaining multiple plugins:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Best practices for plugins

  • Start conservative – Begin with higher thresholds and longer windows, then adjust based on observed patterns.

  • Test in development – Use dry-run modes and test databases before production deployment.

  • Monitor plugin performance – Check execution times and resource usage in system tables.

  • Use appropriate trigger types – Choose scheduled for batch processing, data write for real-time.

  • Configure notifications wisely – Use severity levels and debounce logic to prevent alert fatigue.

  • Leverage model persistence – For ML-based plugins, save trained models for consistency.

  • Document configurations – Use descriptive trigger names and maintain configuration documentation.

Monitor plugin execution

To monitor plugin performance:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Troubleshoot common issues

The following table shows common issues and possible solutions.

Issue Solution
Plugin not triggering Verify trigger is enabled, check schedule/spec syntax
Missing notifications Confirm Notifier plugin installed, check webhook URLs
High memory usage Reduce window sizes, adjust batch processing intervals
Incorrect transformations Use dry-run mode, verify field names and data types
Forecast inaccuracy Increase training data window, adjust seasonality settings
Too many alerts Increase trigger counts, add debounce duration, adjust thresholds

These certified plugins provide enterprise-ready functionality for common time-series data processing needs, eliminating the need for custom development while maintaining flexibility through comprehensive configuration options. Visit the GitHub repository for detailed documentation, examples, and updates.