终止支持通知: Amazon 将于 2025 年 12 月 15 日终止对的支持 Amazon IoT Analytics。2025 年 12 月 15 日之后,您将无法再访问 Amazon IoT Analytics 控制台或 Amazon IoT Analytics 资源。如需了解更多信息,请访问此支持Amazon IoT Analytics 终端。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Lambda 活动
lambda
活动可用于对消息执行复杂的处理。例如,您可使用来自外部 API 操作输出的数据来丰富消息,或者根据来自 Amazon DynamoDB 的逻辑筛选消息。但是,在进入数据存储之前,您无法使用此管道活动来添加其他消息或删除现有消息。
lambda
活动中使用的 Amazon Lambda 函数必须接收并返回一个 JSON 对象数组。有关示例,请参阅Lambda 函数示例 1。
要授予调用 Lambda 函数的 Amazon IoT Analytics 权限,您必须添加策略。例如,运行以下 CLI 命令并exampleFunctionName
替换为您的 Lambda 函数的名称,替换为您的 Amazon 账户 ID,然后使用调123456789012
用给定 Lambda 函数的管道的亚马逊资源名称 (ARN)。
aws lambda add-permission --function-name
exampleFunctionName
--action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account123456789012
--source-arn arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
该命令将返回以下输出:
{ "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:
exampleFunctionName
\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"123456789012
\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
\"}}}" }
有关更多信息,请参阅Amazon Lambda 开发人员指南中的为 Amazon Lambda使用基于资源的策略。
Lambda 函数示例 1
在该示例中,Lambda 函数根据原始消息中的数据添加信息。设备发布一条包含类似于以下负载的消息。
{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }
并且该设备具有以下管道定义。
{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }
以下 Lambda Python 函数 (MyAnalyticsLambdaFunction
) 将 GMaps 网址和温度(以华氏度为单位)添加到消息中。
import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event
Lambda 函数示例 2
一种有用的方法是压缩并序列化消息负载,以降低传输和存储成本。在该第二个示例中,Lambda 函数假定消息负载表示已压缩并以字符串形式进行 Base64 编码(序列化)的 JSON 原始数据。它返回原始 JSON。
import base64 import gzip import json import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def decode_to_bytes(e): return base64.b64decode(e) def decompress_to_string(binary_data): return gzip.decompress(binary_data).decode('utf-8') def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) decompressed_data = [] for e in event: binary_data = decode_to_bytes(e) decompressed_string = decompress_to_string(binary_data) decompressed_data.append(json.loads(decompressed_string)) logger.info("event after processing: {}".format(decompressed_data)) return decompressed_data