Migration procedure for detector models in Amazon IoT Events - Amazon IoT Events
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

End of support notice: On May 20, 2026, Amazon end support for Amazon IoT Events. After May 20, 2026, you will no longer be able to access the Amazon IoT Events console or Amazon IoT Events resources. For more information, visit this Amazon IoT Events end of support.

Migration procedure for detector models in Amazon IoT Events

This section describes alternative solutions that deliver similar detector model functionality as you migrate away from Amazon IoT Events.

You can migrate data ingestion through Amazon IoT Core rules to a combination of other Amazon services. Instead of data ingestion through the BatchPutMessage API, the data can be routed to the Amazon IoT Core MQTT topic.

This migration approach leverages Amazon IoT Core MQTT topics as the entry point for your IoT data, replacing the direct input to Amazon IoT Events. MQTT topics are chosen for several key reasons. They offer broad compatibility with IoT devices due to MQTT's widespread use in the industry. These topics can handle high volumes of messages from numerous devices, ensuring scalability. They also provide flexibility in routing and filtering messages based on content or device type. Additionally, Amazon IoT Core MQTT topics integrate seamlessly with other Amazon services, facilitating the migration process.

Data flows from MQTT topics into an architecture combining Amazon Kinesis Data Streams, a Amazon Lambda function, a Amazon DynamoDB table, and Amazon EventBridge schedules. This combination of services replicates and enhances the functionality previously provided by Amazon IoT Events, offering you more flexibility and control over your IoT data processing pipeline.

Comparing architectures

The current Amazon IoT Events architecture ingests data through an Amazon IoT Core rule and the BatchPutMessage API. This architecture uses Amazon IoT Core for data ingestion and event publishing, with messages routed through Amazon IoT Events inputs to detector models that define the state logic. An IAM role manages the necessary permissions.

The new solution maintains Amazon IoT Core for data ingestion (now with dedicated input and output MQTT topics). It introduces Kinesis Data Streams for data partitioning and an evaluator Lambda function for state logic. Device states are now stored in a DynamoDB table, and an enhanced IAM role manages permissions across these services.

Purpose Solution Differences

Data ingestion – Receives data from IoT devices

Amazon IoT Core

Now requires two distinct MQTT topics: one for ingesting device data and another for publishing output events

Message direction – Routes incoming messages to appropriate services

Amazon IoT Core message routing rule

Maintains same routing functionality but now directs messages to Kinesis Data Streams instead of Amazon IoT Events

Data processing – Handles and organizes incoming data streams

Kinesis Data Streams

Replaces Amazon IoT Events input functionality, providing data ingestion with device ID partitioning for message processing

Logic evaluation – Processes state changes and triggers actions

Evaluator Lambda

Replaces Amazon IoT Events detector model, providing customizable state logic evaluation through code instead of visual workflow

State management – Maintains device states

DynamoDB table

New component that provides persistent storage of device states, replacing internal Amazon IoT Events state management

Security – Manages service permissions

IAM role

Updated permissions now include access to Kinesis Data Streams, DynamoDB, and EventBridge in addition to existing Amazon IoT Core permissions

Step 1: (Optional) export Amazon IoT Events detector model configurations

Before creating new resources, export your Amazon IoT Events detector model definitions. These contain your event processing logic and can serve as a historical reference for implementing your new solution.

Console

Using the Amazon IoT Events Amazon Web Services Management Console, perform the following steps to export your detector model configurations:

To export detector models using the Amazon Web Services Management Console
  1. Log into the Amazon IoT Events console .

  2. In the left navigation pane, choose Detector models.

  3. Select the detector model to export.

  4. Choose Export. Read the information message regarding the output and then choose Export again.

  5. Repeat the process for each detector model that you want to export.

A file containing a JSON output of your detector model is added to your browser's download folder. You can optionally save each detector model configuration to preserve historical data.

Amazon CLI

Using the Amazon CLI, run the following commands to export your detector model configurations:

To export detector models using Amazon CLI
  1. List all detector models in your account:

    aws iotevents list-detector-models
  2. For each detector model, export its configuration by running:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. Save the output for each detector model.

Step 2: Create an IAM role

Create an IAM role to provide permissions to replicate the functionality of Amazon IoT Events. The role in this example grants access to DynamoDB for state management, EventBridge for scheduling, Kinesis Data Streams for data ingestion, Amazon IoT Core for publishing messages, and CloudWatch for logging. Together, these services to work as a replacement for Amazon IoT Events.

  1. Create an IAM role with the following permissions. For more detailed instructions on creating an IAM role, see Create a role to delegate permissions to an Amazon service in the IAM User Guide.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws-cn:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws-cn:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws-cn:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws-cn:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws-cn:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws-cn:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. Add the following IAM role trust policy. A trust policy allows the specified Amazon services to assume the IAM role so that they can to perform necessary actions. For more detailed instructions on creating an IAM trust policy, see Create a role using custom trust policies in the IAM User Guide.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

Step 3: Create Amazon Kinesis Data Streams

Create Amazon Kinesis Data Streams using the Amazon Web Services Management Console or Amazon CLI.

Console

To create a Kinesis data stream using the Amazon Web Services Management Console, follow the procedure found on the Create a data stream page in the Amazon Kinesis Data Streams Developer Guide.

Adjust the shard count based on your device count and message payload size.

Amazon CLI

Using Amazon CLI, create Amazon Kinesis Data Streams to ingest and partition the data from your devices.

Kinesis Data Streams are used in this migration to replace the data ingestion functionality of Amazon IoT Events. It provides a scalable and efficient way to collect, process, and analyze real-time streaming data from your IoT devices, while providing flexible data handling and integration with other Amazon services.

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

Adjust the shard count based on your device count and message payload size.

Step 4: Create or update the MQTT message routing rule

You can create a new MQTT message routing rule or update an existing rule.

Console
  1. Determine if you need a new MQTT message routing rule or if you can update an existing rule.

  2. Open the Amazon IoT Core console.

  3. In the navigation pane, choose Message Routing, and then choose Rules.

  4. In the Manage section, choose Message routing, and then Rules.

  5. Choose Create rule.

  6. On the Specify rule properties page, enter the Amazon IoT Core rule name for Rule name. For Rule Description - optional, enter a description to identify that you're processing events and forwarding them to Kinesis Data Streams.

  7. On the Configure SQL statement page, enter the following for the SQL statement: SELECT * FROM 'your-database', then choose Next.

  8. On the Attach rules actions page, and under Rule actions, choose kinesis.

  9. Choose your Kinesis stream for the stream. For the partition key, enter your-instance-id. Select the appropriate role for the IAM role, and then choose Add rule action.

For more information, see Creating Amazon IoT rules to route device data to other services.

Amazon CLI
  1. Create a JSON file with the following contents. This JSON configuration file defines an Amazon IoT Core rule that selects all messages from a topic and forwards them to the specified Kinesis stream, using the instance ID as the partition key.

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws-cn:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. Create the MQTT topic rule using the Amazon CLI. This step uses the Amazon CLI to create an Amazon IoT Core topic rule using the configuration defined in the events_rule.json file.

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

Step 5: Get the endpoint for the destination MQTT topic

Use the destination MQTT topic to configure where your topics publish outgoing messages, replacing the functionality previously handled by Amazon IoT Events. The endpoint is unique to your Amazon account and region.

Console
  1. Open the Amazon IoT Core console.

  2. In the Connect section on the left navigation panel, choose Domain configuration.

  3. Choose the iot:Data-ATS domain configuration to open the configuration's detail page.

  4. Copy the Domain name value. This value is the endpoint. Save the endpoint value because you'll need it in later steps.

Amazon CLI

Run the following command to get the Amazon IoT Core endpoint for publishing outgoing messages for your account.

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

Step 6: Create an Amazon DynamoDB table

A Amazon DynamoDB table replaces the state management functionality of Amazon IoT Events, providing a scalable and flexible way to persist and manage the state of your devices and the detector model logic in your new solution architecture.

Console

Create a Amazon DynamoDB table to persist the state of the detector models. For more information, see Create a table in DynamoDB in the Amazon DynamoDB Developer Guide.

Use the following for the table details:

  • For Table name, enter a table name of your choosing.

  • For Partition key, enter your own instance ID.

  • You can use the Default settings for the Table settings

Amazon CLI

Run the following command to create a DynamoDB table.

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

Step 7: Create an Amazon Lambda function (console)

The Lambda function serves as the core processing engine, replacing the detector model evaluation logic of Amazon IoT Events. In the example, we integrate with other Amazon services to handle incoming data, manage state, and trigger actions based on your defined rules.

Create a Lambda function with NodeJS runtime. Use the following code snippet, replacing the hard-coded constants:

  1. Open the Amazon Lambda console.

  2. Choose Create function.

  3. Enter a name for the Function name.

  4. Select NodeJS 22.x as the Runtime.

  5. In the Change default execution role dropdown, choose Use existing role, and then select the IAM role that you created in earlier steps.

  6. Choose Create function.

  7. Paste in the following code snippet after replacing the hard coded constants.

  8. After your function creates, under the Code tab, paste the following code example, replacing the your-destination-endpoint endpoint with your own.

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws-cn::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws-cn::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

Step 8: Add an Amazon Kinesis Data Streams trigger

Add a Kinesis Data Streams trigger to the Lambda function using the Amazon Web Services Management Console or Amazon CLI.

Adding a Kinesis Data Streams trigger to your Lambda function establishes the connection between your data ingestion pipeline and your processing logic, letting it automatically evaluate incoming IoT data streams and react to events in real-time, similar to how Amazon IoT Events processes inputs.

Console

For more information, see Create an event source mapping to invoke a Lambda function in the Amazon Lambda Developer Guide.

Use the following for the event source mapping details:

Amazon CLI

Run the following command to create the Lambda function trigger.

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws-cn:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

Step 9: Test data ingestion and output functionality (Amazon CLI)

Publish a payload to the MQTT topic based on what you defined in your detector model. The following is an example payload to the MQTT topic your-topic-name to test an implementation.

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

You should see an MQTT message published to a topic with the following (or similar) content:

{ "state": "alarm detected, timer started" }