AWS Lambda 活动 - AWS IoT Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

AWS Lambda 活动

您可以使用 lambda 活动,对消息执行更复杂的处理。例如,您可以利用外部API输出的数据丰富消息,或者根据来自 Amazon DynamoDB. 不过,您可以使用此活动来执行任何类型的基于消息的处理,包括筛选哪些消息将存储在数据存储中。

的 AWS Lambda 此活动中使用的函数必须接收并返回JSON对象阵列。在以下示例中, Lambda 函数修改,然后返回其 event 参数。

batchSize 决定您 Lambda 函数在每个调用上接收。设定时,请记住 Lambda 函数的最大超时为900秒。所以 Lambda 函数必须能够在不到900秒的时间内处理批处理中的所有消息。

{ "lambda": { "name": "MyLambdaActivity", "lambdaName": "mylambda", "batchSize": 10, "next": "MyDatastoreActivity" } }

您必须添加策略才能授予 AWS IoT Analytics 可调用您的 Lambda 功能。运行以下命令并替换 function-name 名称 Lambda 功能。

aws lambda add-permission --function-name function-name --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com

命令返回以下内容:

{ "Statement": "{\"Sid\":\"iotanalytics\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-region:function:mfunction-name\"}" }

有关更多信息,请参阅 使用基于资源的策略 AWS LambdaAWS Lambda Developer Guide.

Lambda 函数示例1

在本示例中, Lambda 函数根据原始消息中的数据添加附加信息。设备发布具有类似于以下示例的有效载荷的消息。

{ "thingid": "00001234abcd", "temperature": 26, "humidity": 29, "location": { "lat": 52.4332935, "lon": 13.231694 }, "ip": "192.168.178.54", "datetime": "2018-02-15T07:06:01" }

设备具有以下管道定义。

{ "pipeline": { "activities": [ { "channel": { "channelName": "foobar_channel", "name": "foobar_channel_activity", "next": "lambda_foobar_activity" } }, { "lambda": { "lambdaName": "MyAnalyticsLambdaFunction", "batchSize": 5, "name": "lambda_foobar_activity", "next": "foobar_store_activity" } }, { "datastore": { "datastoreName": "foobar_datastore", "name": "foobar_store_activity" } } ], "name": "foobar_pipeline", "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline" } }

以下 Lambda Python功能(MyAnalyticsLambdaFunction)将GMapsURL和温度(以华氏为单位)添加到消息。

import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def c_to_f(c): return 9.0/5.0 * c + 32 def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) maps_url = 'N/A' for e in event: #e['foo'] = 'addedByLambda' if 'location' in e: lat = e['location']['lat'] lon = e['location']['lon'] maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon) if 'temperature' in e: e['temperature_f'] = c_to_f(e['temperature']) logger.info("maps_url: {}".format(maps_url)) e['maps_url'] = maps_url logger.info("event after processing: {}".format(event)) return event

Lambda 函数示例2

一种有用的方法是压缩并序列化消息负载,以降低传输和存储成本。在第二个示例中, Lambda 函数假定消息有效负载代表JSON原始负载,已压缩该原始负载,然后base64编码(序列化)为字符串。它返回原始JSON。

import base64 import gzip import json import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def decode_to_bytes(e): return base64.b64decode(e) def decompress_to_string(binary_data): return gzip.decompress(binary_data).decode('utf-8') def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) decompressed_data = [] for e in event: binary_data = decode_to_bytes(e) decompressed_string = decompress_to_string(binary_data) decompressed_data.append(json.loads(decompressed_string)) logger.info("event after processing: {}".format(decompressed_data)) return decompressed_data