为应用程序目标创建 Lambda 函数 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

为应用程序目标创建 Lambda 函数

您的 Kinesis Data Analytics 应用程序可以将 AWS Lambda 函数作为输出。Kinesis Data Analytics 提供了一些模板,以便创建 Lambda 函数以作为应用程序目标。可以将这些模板作为应用程序输出后处理的起点。

使用 Node.js 创建 Lambda 函数目标

在控制台上提供了以下模板以使用 Node.js 创建目标 Lambda 函数:

将 Lambda 作为输出蓝图 语言和版本 Description
kinesis-analytics-output Node.js 12.x 将输出记录从 Kinesis Data Analytics 应用程序传输到自定义目标。

使用 Python 创建 Lambda 函数目标

在控制台上提供了以下模板以使用 Python 创建目标 Lambda 函数:

将 Lambda 作为输出蓝图 语言和版本 Description
kinesis-analytics-output-sns Python 2.7 将输出记录从 Kinesis Data Analytics 应用程序传输到 Amazon SNS。
kinesis-analytics-output-ddb Python 2.7 将输出记录从 Kinesis Data Analytics 应用程序传输到 Amazon DynamoDB。

使用 Java 创建 Lambda 函数目标

要使用 Java 创建目标 Lambda 函数,请使用 Java 事件类。

以下代码说明了一个使用 Java 的示例目标 Lambda 函数:

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }

使用 .NET 创建 Lambda 函数目标

要使用 .NET 创建目标 Lambda 函数,请使用 .NET 事件类。

以下代码说明了一个使用 C# 的示例目标 Lambda 函数:

public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅 Amazon.Lambda.KinesisAnalyticsEvents.。