本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Lambda活动
您可以使用lambda
活动对消息执行复杂处理。例如,您可以使用来自外部 API 操作输出的数据来丰富消息,或者根据来自 Amazon DynamoDB 的逻辑筛选消息。但是,在进入数据存储之前,您不能使用此管道活动来添加其他消息或删除现有消息。
这些区域有:Amazon Lambda在 a 中使用的函数lambda
活动必须接收并返回一个 JSON 对象数组。有关示例,请参阅 Lambda 函数示例 1。
授权Amazon IoT Analytics具有权限以调用 Lambda 函数,您必须添加策略。例如,运行以下 CLI 命令并替换为exampleFunctionName
将 Lambda 函数的名称替换为123456789012
用你的Amazon账户 ID,并使用调用给定 Lambda 函数的管道的 Amazon 资源名称 (ARN)。
aws lambda add-permission --function-name
exampleFunctionName
--action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account123456789012
--source-arn arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
该命令将返回以下:
{ "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:
exampleFunctionName
\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"123456789012
\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:us-east-1
:123456789012
:pipeline/examplePipeline
\"}}}" }
有关更多信息,请参阅使用 的基于资源的策略Amazon Lambda在Amazon Lambda开发人员指南.
Lambda 函数示例 1
在此示例中,Lambda 函数根据原始消息中的数据添加信息。设备发布带有负载的消息,类似于以下示例。
{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }
并且该设备具有以下管道定义。
{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }
以下 Lambda Python 函数 (MyAnalyticsLambdaFunction
) 将 GMaps URL 和温度(以华氏度为单位)添加到消息中。
import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event
Lambda 函数示例 2
一种有用的方法是压缩并序列化消息负载,以降低传输和存储成本。在第二个示例中,Lambda 函数假设消息负载代表一个 JSON 原件,该原件经过压缩,然后以 base64 编码(序列化)为字符串。它返回原始的 JSON。
import base64 import gzip import json import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def decode_to_bytes(e): return base64.b64decode(e) def decompress_to_string(binary_data): return gzip.decompress(binary_data).decode('utf-8') def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) decompressed_data = [] for e in event: binary_data = decode_to_bytes(e) decompressed_string = decompress_to_string(binary_data) decompressed_data.append(json.loads(decompressed_string)) logger.info("event after processing: {}".format(decompressed_data)) return decompressed_data