创建 Lambda 函数以进行预处理 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建 Lambda 函数以进行预处理

在将记录提取到应用程序时,您的 Amazon Kinesis Data Analytics 应用程序可以使用 Lambda 函数预处理记录。Kinesis Data Analytics 在控制台上提供以下模板以作为数据预处理起点。

使用 Node.js 创建预处理 Lambda 函数

在 Kinesis Data Analytics 控制台上提供了以下模板以使用 Node.js 创建预处理 Lambda 函数:

Lambda 蓝图 语言和版本 描述
通用 Kinesis Data Analytics 输入处理 Node.js 6.10

Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。

压缩输入处理 Node.js 6.10 Kinesis Data Analytics 记录处理器,它接收压缩(GZIP 或 Deflate 压缩)JSON 或 CSV 记录以作为输入,并返回解压缩的记录以及处理状态。

使用 Python 创建预处理 Lambda 函数

在控制台上提供了以下模板以使用 Python 创建预处理 Lambda 函数:

Lambda 蓝图 语言和版本 描述
通用 Kinesis Analytics 输入处理 Python 2.7

Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。

KPL 输入处理 Python 2.7 Kinesis Data Analytics 记录处理器,它接收 JSON 或 CSV 记录的 Kinesis 创建器库 (KPL) 聚合以作为输入,并返回取消聚合的记录以及处理状态。

使用 Java 创建预处理 Lambda 函数

要使用 Java 创建 Lambda 函数以预处理记录,请使用 Java 事件类。

以下代码说明了一个使用 Java 的示例 Lambda 函数(用于预处理记录):

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> { @Override public KinesisAnalyticsInputPreprocessingResponse handleRequest( KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("StreamArn is : " + event.streamArn); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>(); KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp); // Add your record.data pre-processing logic here. // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>)); }); return response; } }

使用 .NET 创建预处理 Lambda 函数

要使用 .NET 创建 Lambda 函数以预处理记录,请使用 .NET 事件类。

以下代码说明了一个使用 C# 的示例 Lambda 函数(用于预处理记录):

public class Function { public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"StreamArn: {evnt.StreamArn}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsInputPreprocessingResponse { Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}"); context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}"); context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add your record preprocessig logic here. var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsInputPreprocessingResponse.OK }; preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant()); response.Records.Add(preprocessedRecord); } return response; } }

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅 Amazon.Lambda.KinesisAnalyticsEvents