Amazon IoT Analytics 不再向新客户提供。的现有客户 Amazon IoT Analytics 可以继续照常使用该服务。了解更多
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Lambda 活动
lambda
活动可用于对消息执行复杂的处理。例如,您可使用来自外部 API 操作输出的数据来丰富消息,或者根据来自 Amazon DynamoDB 的逻辑筛选消息。但是,在进入数据存储之前,您无法使用此管道活动来添加其他消息或删除现有消息。
用于lambda
活动的 Amazon Lambda 函数必须接收并返回一组 JSON 对象。有关示例,请参阅 Lambda 函数示例 1。
要授权 Amazon IoT Analytics 调用您的 Lambda 函数,您必须添加一个策略。例如,运行以下 CLI 命令并将 exampleFunctionName
替换为您的 Lambda 函数的名称,将 123456789012
替换为您的 Amazon 账户 ID,然后使用给定 Lambda 函数调用管道的 Amazon 资源名称 (ARN)。
aws lambda add-permission --function-name
exampleFunctionName
--action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account123456789012
--source-arn arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
该命令将返回以下输出:
{ "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:
exampleFunctionName
\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"123456789012
\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
\"}}}" }
有关更多信息,请参阅Amazon Lambda开发人员指南中的为Amazon Lambda使用基于资源的策略。
Lambda 函数示例 1
在该示例中,Lambda 函数根据原始消息中的数据添加信息。设备发布一条包含类似于以下负载的消息。
{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }
并且该设备具有以下管道定义。
{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }
下面的 Lambda Python 函数 (MyAnalyticsLambdaFunction
) 向消息中添加 GMaps URL 和华氏温度。
import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event
Lambda 函数示例 2
一种有用的方法是压缩并序列化消息负载,以降低传输和存储成本。在该第二个示例中,Lambda 函数假定消息负载表示已压缩并以字符串形式进行 Base64 编码(序列化)的 JSON 原始数据。它返回原始 JSON。
import base64 import gzip import json import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def decode_to_bytes(e): return base64.b64decode(e) def decompress_to_string(binary_data): return gzip.decompress(binary_data).decode('utf-8') def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) decompressed_data = [] for e in event: binary_data = decode_to_bytes(e) decompressed_string = decompress_to_string(binary_data) decompressed_data.append(json.loads(decompressed_string)) logger.info("event after processing: {}".format(decompressed_data)) return decompressed_data