AWS IoT
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

AWS IoT 规则

规则使您的设备能够与 AWS 服务交互。基于 MQTT 主题流分析规则并执行操作。您可以使用规则来支持如下任务:

  • 补充或筛选从设备接收的数据。

  • 将从设备接收的数据写入 Amazon DynamoDB 数据库。

  • 将文件保存到 Amazon S3。

  • 使用 Amazon SNS 向所有用户发送推送通知。

  • 将数据发布到 Amazon SQS 队列。

  • 调用 Lambda 函数来提取数据。

  • 使用 Amazon Kinesis 处理来自大量设备的消息。

  • 将数据发送到 Amazon Elasticsearch Service。

  • 捕获 CloudWatch 指标。

  • 更改 CloudWatch 警报。

  • 将 MQTT 消息中的数据发送到 Amazon Machine Learning,以便根据 Amazon ML 模型进行预测。

  • 向 Salesforce IoT 输入流发送消息。

  • 将消息数据发送到 AWS IoT Analytics 通道。

  • 开始执行 Step Functions 状态机。

您必须先授予 AWS IoT 代表您访问 AWS 资源的权限,然后它才能够执行这些操作。执行这些操作时,即产生了您所使用的 AWS 服务的标准费用。

授予 AWS IoT 所需的访问权限

您可以使用 IAM 角色来控制每个规则可以访问的 AWS 资源。在创建规则之前,您必须创建一个 IAM 角色,并对其应用允许访问所需 AWS 资源的策略。执行规则时,AWS IoT 将担任该角色。

创建 IAM 角色 (AWS CLI)

  1. 将以下信任策略文档(用于授予 AWS IoT 担任相应角色的权限)保存到名为 iot-role-trust.json 的文件:

    { "Version":"2012-10-17", "Statement":[{ "Effect": "Allow", "Principal": { "Service": "iot.amazonaws.com" }, "Action": "sts:AssumeRole" }] }

    使用 create-role 命令创建 IAM 角色,并指定 iot-role-trust.json 文件:

    aws iam create-role --role-name my-iot-role --assume-role-policy-document file://iot-role-trust.json

    此命令的输出如下所示:

    { "Role": { "AssumeRolePolicyDocument": "url-encoded-json", "RoleId": "AKIAIOSFODNN7EXAMPLE", "CreateDate": "2015-09-30T18:43:32.821Z", "RoleName": "my-iot-role", "Path": "/", "Arn": "arn:aws:iam::123456789012:role/my-iot-role" } }
  2. 将以下 JSON 保存到名为 iot-policy.json 的文件中。

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "dynamodb:*", "Resource": "*" }] }

    该 JSON 是授予 AWS IoT 管理员对 DynamoDB 的访问权限的示例策略文档。

    使用 create-policy 命令授予 AWS IoT 在担任该角色后访问您的 AWS 资源的权限,并传入 iot-policy.json 文件:

    aws iam create-policy --policy-name my-iot-policy --policy-document file://my-iot-policy.json

    有关如何在 AWS IoT 策略中授予对 AWS 服务的访问权限的更多信息,请参阅创建 AWS IoT 规则

    create-policy 命令的输出中包含该策略的 ARN。您需要将该策略附加到角色。

    { "Policy": { "PolicyName": "my-iot-policy", "CreateDate": "2015-09-30T19:31:18.620Z", "AttachmentCount": 0, "IsAttachable": true, "PolicyId": "ZXR6A36LTYANPAI7NJ5UV", "DefaultVersionId": "v1", "Path": "/", "Arn": "arn:aws:iam::123456789012:policy/my-iot-policy", "UpdateDate": "2015-09-30T19:31:18.620Z" } }
  3. 使用 attach-role-policy 命令将您的策略附加到角色:

    aws iam attach-role-policy --role-name my-iot-role --policy-arn "arn:aws:iam::123456789012:policy/my-iot-policy"

传递角色权限

规则定义的一部分是 IAM 角色,该角色授予对规则操作中指定的资源的访问权限。一旦触发规则操作,规则引擎会代入该角色。必须在与规则相同的 AWS 账户中定义该角色。

在创建或替换规则时,您实际上将角色提交到规则引擎中。执行这项操作的用户需要具有 iam:PassRole 权限。为确保拥有该权限,您需要创建一个策略以便授予 iam:PassRole 权限,并将其附加到您的 IAM 用户。以下策略介绍了如何向角色提供 iam:PassRole 权限。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "Stmt1", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::123456789012:role/myRole" ] } ] }

在此策略示例中,将 iam:PassRole 权限授予了角色 myRole。该角色使用角色的 ARN 指定。您必须将此策略附加到您的 IAM 用户或用户所属的角色。有关更多信息,请参阅使用管理的策略

注意

Lambda 函数使用基于资源的策略,该策略直接附加至 Lambda 函数本身。创建调用 Lambda 函数的规则时,您未传递角色,因此创建规则的用户无需 iam:PassRole 权限。有关 Lambda 函数授权的更多信息,请参阅使用资源策略授予权限

创建 AWS IoT 规则

您可以配置规则以从连接的设备路由数据。规则包括以下部分:

规则名称

规则的名称。

可选说明

规则的文字说明。

SQL 语句

一种简化的 SQL 语法,用于筛选接收的 MQTT 主题相关消息并向其他位置推送数据。有关更多信息,请参阅 AWS IoT SQL 参考

SQL 版本

评估规则时使用的 SQL 规则引擎的版本。尽管该属性是可选的,但我们强烈建议您指定 SQL 版本。如果未设置该属性,将使用默认值 2015-10-08

一个或多个操作

执行规则时 AWS IoT 执行的操作。例如,您可以将数据插入 DynamoDB 表、将数据写入 Amazon S3 存储桶、发布至 Amazon SNS 主题或调用 Lambda 函数。

错误操作

AWS IoT 在无法执行规则的操作时执行的操作。

当您创建规则时,请注意发布到主题的数据量。如果您创建的规则包含通配符主题模式,它们可能与您的大部分消息匹配,并且您可能需要增加目标操作使用的 AWS 资源容量。另外,如果您创建包含通配符主题模式的重新发布规则,最终可能获得一个造成无限循环的循环规则。

注意

创建和更新规则是管理员级操作。有权创建或更新规则的所有用户都能够访问规则处理的数据。

创建规则 (AWS CLI)

使用 create-topic-rule 命令创建规则:

aws iot create-topic-rule --rule-name my-rule --topic-rule-payload file://my-rule.json

下面是一个负载文件示例,其中包含将发送至 iot/test 主题的所有消息插入指定 DynamoDB 表的规则。SQL 语句筛选信息,角色 ARN 授予 AWS IoT 写入 DynamoDB 表的权限。

{ "sql": "SELECT * FROM 'iot/test'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "dynamoDB": { "tableName": "my-dynamodb-table", "roleArn": "arn:aws:iam::123456789012:role/my-iot-role", "hashKeyField": "topic", "hashKeyValue": "${topic(2)}", "rangeKeyField": "timestamp", "rangeKeyValue": "${timestamp()}" } }] }

下面是一个负载文件示例,其中包含将发送至 iot/test 主题的所有消息插入指定 S3 存储桶的规则。SQL 语句筛选消息,角色 ARN 授予 AWS IoT 写入 Amazon S3 存储桶的权限。

{ "awsIotSqlVersion": "2016-03-23", "sql": "SELECT * FROM 'iot/test'", "ruleDisabled": false, "actions": [ { "s3": { "roleArn": "arn:aws:iam::123456789012:role/aws_iot_s3", "bucketName": "my-bucket", "key": "myS3Key" } } ] }

下面的负载文件示例包含将数据推送至 Amazon ES 的规则:

{ "sql":"SELECT *, timestamp() as timestamp FROM 'iot/test'", "ruleDisabled":false, "awsIotSqlVersion": "2016-03-23", "actions":[ { "elasticsearch":{ "roleArn":"arn:aws:iam::123456789012:role/aws_iot_es", "endpoint":"https://my-endpoint", "index":"my-index", "type":"my-type", "id":"${newuuid()}" } } ] }

下面的负载文件示例包含调用 Lambda 函数的规则:

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "lambda": { "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-lambda-function" } }] }

下面的负载文件示例包含发布至 Amazon SNS 主题的规则:

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "sns": { "targetArn": "arn:aws:sns:us-west-2:123456789012:my-sns-topic", "roleArn": "arn:aws:iam::123456789012:role/my-iot-role" } }] }

下面的负载文件示例包含在不同 MQTT 主题上重新发布的规则:

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "republish": { "topic": "my-mqtt-topic", "roleArn": "arn:aws:iam::123456789012:role/my-iot-role" } }] }

下面的负载文件示例包含将数据推送至 Amazon Kinesis Data Firehose 流的规则:

{ "sql": "SELECT * FROM 'my-topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "firehose": { "roleArn": ""arn:aws:iam::123456789012:role/my-iot-role", "deliveryStreamName": "my-stream-name" } }] }

在下面的负载文件示例中,具有采用 Amazon Machine Learning machinelearning_predict 函数重新发布至某个主题(如果 MQTT 负载中的数据分类为 1)的规则。

{ "sql": "SELECT * FROM 'iot/test' where machinelearning_predict('my-model', 'arn:aws:iam::123456789012:role/my-iot-aml-role', *).predictedLabel=1", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "republish": { "roleArn": "arn:aws:iam::123456789012:role/my-iot-role", "topic": "my-mqtt-topic" } }] }

下面是一个示例负载文件,该文件包含将消息发布到 Salesforce IoT Cloud 输入流的规则。

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "salesforce": { "token": "ABCDEFGHI123456789abcdefghi123456789", "url": "https://ingestion-cluster-id.my-env.sfdcnow.com/streams/stream-id/connection-id/my-event" } }] }

下面是一个负载文件示例,其中包含一个可以开始执行 Step Functions 状态机的规则。

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "stepFunctions": { "stateMachineName": "myCoolStateMachine", "executionNamePrefix": "coolRunning", "roleArn": "arn:aws:iam::123456789012:role/my-iot-role" } }] }

查看您的规则

使用 list-topic-rules 命令列出您的规则:

aws iot list-topic-rules

使用 get-topic-rule 命令获取有关规则的信息:

aws iot get-topic-rule --rule-name my-rule

SQL 版本

AWS IoT 规则引擎使用一种类似 SQL 的语法从 MQTT 消息选择数据。SQL 语句基于 SQL 版本进行解释,该版本由描述此规则的 JSON 文档中的 awsIotSqlVersion 属性指定。有关 JSON 规则文档结构的更多信息,请参阅创建规则。借助 awsIotSqlVersion 属性,您可以指定想要使用的 AWS IoT SQL 规则引擎版本。当部署新版本时,您可继续使用较旧的版本或更改规则以使用新版本。您当前的规则将继续使用创建时所用的版本。

以下 JSON 示例介绍了如何使用 awsIotSqlVersion 属性指定 SQL 版本:

{ "sql": "expression", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [{ "republish": { "topic": "my-mqtt-topic", "roleArn": "arn:aws:iam::123456789012:role/my-iot-role" } }] }

当前支持的版本包括:

  • 2015-10-08,2015 年 10 月 8 日构建的 SQL 原始版本。

  • 2016-03-23,2016 年 3 月 23 日构建的 SQL 版本。

  • beta,最新的 SQL 测试版本。使用此版本可能会给您的规则带来破坏性更改。

2016-03-23 SQL 规则引擎版本中的新增功能

  • 针对选择嵌套 JSON 对象的修复程序。

  • 针对阵列查询的修复程序。

  • 对象间查询支持。

  • 支持将阵列作为顶级对象输出。

  • 添加 encode(value, encodingScheme) 函数,该函数可应用于 JSON 和非 JSON 格式数据。

对象间查询

此功能允许您查询 JSON 对象中的属性。例如,给定了以下 MQTT 消息:

{ "e": [ { "n": "temperature", "u": "Cel", "t": 1234, "v":22.5 }, { "n": "light", "u": "lm", "t": 1235, "v":135 }, { "n": "acidity", "u": "pH", "t": 1235, "v":7 } ] }

以及以下规则:

SELECT (SELECT v FROM e WHERE n = 'temperature') as temperature FROM 'my/topic'

该规则生成以下输出:

{"temperature": [{"v":22.5}]}

使用相同的 MQTT 消息,并给定一个略复杂的规则,如:

SELECT get((SELECT v FROM e WHERE n = 'temperature'),1).v as temperature FROM 'topic'

该规则生成以下输出:

{"temperature":22.5}

Array 作为顶级对象输出

此功能允许规则将阵列作为顶级对象返回。例如,给定了以下 MQTT 消息:

{ "a": {"b":"c"}, "arr":[1,2,3,4] }

以及以下规则:

SELECT VALUE arr FROM 'topic'

该规则生成以下输出:

[1,2,3,4]

对函数进行编码

根据指定的编码方案,将负载(可能是非 JSON 数据)编码为字符串表示形式。

排查规则问题

如果遇到规则问题,应启用 CloudWatch Logs。通过分析您的日志,您可以确定问题是否与授权相关,或者是否为诸如 WHERE 子句状态不匹配的问题。有关使用 Amazon CloudWatch Logs 的更多信息,请参阅设置 CloudWatchLogs

规则错误处理

当 AWS IoT 从设备接收消息时,规则引擎会检查该消息是否匹配规则。如果匹配,将评估规则的 SQL 语句,然后调用规则的操作并向操作传递 SQL 语句的结果。

如果在调用操作时出现问题,则在为规则指定了错误操作的情况下,规则引擎将调用错误操作。例如,在以下情况下可能会发生此操作:

  • 规则没有权限访问 Amazon S3 存储桶。

  • 用户错误导致超过 DynamoDB 预配置吞吐量。

错误操作消息格式

对于每个规则和每条消息,会生成一条消息。例如,如果同一个规则中的两个规则操作失败,则错误操作将收到包含这些错误的一条消息。

错误操作消息如下所示:

{ "ruleName": "TestAction", "topic": "testme/action", "cloudwatchTraceId": "7e146a2c-95b5-6caf-98b9-50e3969734c7", "clientId": "iotconsole-1511213971966-0", "base64OriginalPayload": "ewogICJtZXNzYWdlIjogIkhlbGxvIHZyb20gQVdTIElvVCBjb25zb2xlIgp9", "failures": [ { "failedAction": "S3Action", "failedResource": "us-east-1-s3-verify-user", "errorMessage": "Failed to put S3 object. The error received was The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 9DF5416B9B47B9AF; S3 Extended Request ID: yMah1cwPhqTH267QLPhTKeVPKJB8BO5ndBHzOmWtxLTM6uAvwYYuqieAKyb6qRPTxP1tHXCoR4Y=). Message arrived on: error/action, Action: s3, Bucket: us-east-1-s3-verify-user, Key: \"aaa\". Value of x-amz-id-2: yMah1cwPhqTH267QLPhTKeVPKJB8BO5ndBHzOmWtxLTM6uAvwYYuqieAKyb6qRPTxP1tHXCoR4Y=" } ] }
ruleName

触发错误操作的规则的名称。

topic

收到原始消息的主题。

cloudwatchTraceId

CloudWatch 中错误日志的唯一标识。

clientId

消息发布程序的客户端 ID。

base64OriginalPayload

base64 编码的原始消息负载。

失败
failedAction

无法完成的操作的名称,例如“S3Action”。

failedResource

资源的名称,例如 S3 存储桶的名称。

errorMessage

错误的描述和说明。

错误操作示例

下面是一个具有添加的错误操作的规则示例。以下规则具有将消息数据写入 DynamoDB 表的操作,以及将数据写入 Amazon S3 存储桶的错误操作:

{ "sql" : "SELECT * FROM ..." "actions" : [{ "dynamoDB" : { "table" : "PoorlyConfiguredTable", "hashKeyField" : "AConstantString", "hashKeyValue" : "AHashKey"}} ], "errorAction" : { "s3" : { "roleArn": "arn:aws:iam::123456789012:role/aws_iot_s3", "bucket" : "message-processing-errors", "key" : "${replace(topic(), '/', '-') + '-' + timestamp() + '-' + newuuid()}" }} }

您可以在错误操作的 SQL 语句中使用任意函数或替换,但外部函数除外 (例如,get_thing_shadowaws_lambdamachinelearning_predict)。

有关规则以及如何指定错误操作的更多信息,请参阅创建 AWS IoT 规则

有关使用 CloudWatch 监控规则成功还是失败的更多信息,请参阅AWS IoT 指标与维度

删除规则

用完规则后可以将其删除。

删除规则 (AWS CLI)

使用 delete-topic-rule 命令删除规则:

aws iot delete-topic-rule --rule-name my-rule