创建 Lambda 函数以进行预处理 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

创建 Lambda 函数以进行预处理

在将记录提取到应用程序时,您的 Amazon Kinesis Data Analytics 应用程序可以使用 Lambda 函数预处理记录。Kinesis Data Analytics 在控制台上提供以下模板以作为数据预处理起点。

使用 Node.js 创建预处理 Lambda 函数

在 Kinesis Data Analytics 控制台上提供了以下模板以使用 Node.js 创建预处理 Lambda 函数:

Lambda 蓝图 语言和版本 Description
常规 Kinesis Data Analytics 输入处理 Node.js 6.10 *

Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。

压缩输入处理 Node.js 6.10 * Kinesis Data Analytics 记录处理器,它接收压缩(GZIP 或 Deflate 压缩)JSON 或 CSV 记录以作为输入,并返回解压缩的记录以及处理状态。

使用 Python 创建预处理 Lambda 函数

在控制台上提供了以下模板以使用 Python 创建预处理 Lambda 函数:

Lambda 蓝图 语言和版本 Description
通用 Kinesis Analytics 输入处理 Python 2.7

Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。

KPL 输入处理 Python 2.7 Kinesis Data Analytics 记录处理器,它接收 JSON 或 CSV 记录的 Kinesis 创建者库 (KPL) 聚合以作为输入,并返回取消聚合的记录以及处理状态。

使用 Java 创建预处理 Lambda 函数

要使用 Java 创建 Lambda 函数以预处理记录,请使用 Java 事件类。

以下代码说明了一个使用 Java 的示例 Lambda 函数(用于预处理记录):

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> { @Override public KinesisAnalyticsInputPreprocessingResponse handleRequest( KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("StreamArn is : " + event.streamArn); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>(); KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp); // Add your record.data pre-processing logic here. // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>)); }); return response; } }

使用 .NET 创建预处理 Lambda 函数

要使用 .NET 创建 Lambda 函数以预处理记录,请使用 .NET 事件类。

以下代码说明了一个使用 C# 的示例 Lambda 函数(用于预处理记录):

public class Function { public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"StreamArn: {evnt.StreamArn}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsInputPreprocessingResponse { Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}"); context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}"); context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add your record preprocessig logic here. var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsInputPreprocessingResponse.OK }; preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant()); response.Records.Add(preprocessedRecord); } return response; } }

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅 Amazon.Lambda.KinesisAnalyticsEvents.。