经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建 Lambda 函数以进行预处理
在将记录提取到应用程序时,您的 Amazon Kinesis Data Analytics 应用程序可以使用 Lambda 函数预处理记录。Kinesis Data Analytics 在控制台上提供以下模板以作为数据预处理起点。
使用 Node.js 创建预处理 Lambda 函数
在 Kinesis Data Analytics 控制台上提供了以下模板以使用 Node.js 创建预处理 Lambda 函数:
Lambda 蓝图 | 语言和版本 | 描述 |
---|---|---|
通用 Kinesis Data Analytics 输入处理 | Node.js 6.10 |
Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。 |
压缩输入处理 | Node.js 6.10 | Kinesis Data Analytics 记录处理器,它接收压缩(GZIP 或 Deflate 压缩)JSON 或 CSV 记录以作为输入,并返回解压缩的记录以及处理状态。 |
使用 Python 创建预处理 Lambda 函数
在控制台上提供了以下模板以使用 Python 创建预处理 Lambda 函数:
Lambda 蓝图 | 语言和版本 | 描述 |
---|---|---|
通用 Kinesis Analytics 输入处理 | Python 2.7 |
Kinesis Data Analytics 记录预处理器,它将 JSON 或 CSV 记录作为输入接收,然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。 |
KPL 输入处理 | Python 2.7 | Kinesis Data Analytics 记录处理器,它接收 JSON 或 CSV 记录的 Kinesis 创建器库 (KPL) 聚合以作为输入,并返回取消聚合的记录以及处理状态。 |
使用 Java 创建预处理 Lambda 函数
要使用 Java 创建 Lambda 函数以预处理记录,请使用 Java 事件
以下代码说明了一个使用 Java 的示例 Lambda 函数(用于预处理记录):
public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> { @Override public KinesisAnalyticsInputPreprocessingResponse handleRequest( KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("StreamArn is : " + event.streamArn); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>(); KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp); // Add your record.data pre-processing logic here. // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>)); }); return response; } }
使用 .NET 创建预处理 Lambda 函数
要使用 .NET 创建 Lambda 函数以预处理记录,请使用 .NET 事件
以下代码说明了一个使用 C# 的示例 Lambda 函数(用于预处理记录):
public class Function { public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"StreamArn: {evnt.StreamArn}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsInputPreprocessingResponse { Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}"); context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}"); context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add your record preprocessig logic here. var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsInputPreprocessingResponse.OK }; preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant()); response.Records.Add(preprocessedRecord); } return response; } }
有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅 Amazon.Lambda.KinesisAnalyticsEvents