对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为应用程序目标创建 &LAM; 函数
您的 Kinesis Data Analytics 应用程序 Amazon Lambda 可以使用函数作为输出。 提供了一些模板以创建用作应用程序目标的 函数。可以将这些模板作为应用程序输出后处理的起点。
使用 Node.js 创建 &LAM; 函数目标
在控制台上提供了以下模板以使用 Node.js 创建目标 函数:
&LAM; 作为输出蓝图 | 语言和版本 | 描述 |
---|---|---|
kinesis-analytics-output |
Node.js 12.x | 将输出记录从 应用程序传输到自定义目标。 |
使用 Python 创建 &LAM; 函数目标
在控制台上提供了以下模板以使用 Python 创建目标 函数:
&LAM; 作为输出蓝图 | 语言和版本 | 描述 |
---|---|---|
kinesis-analytics-output-sns |
Python 2.7 | 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon SNS。 |
kinesis-analytics-output-ddb |
Python 2.7 | 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon DynamoDB。 |
使用 Java 创建 &LAM; 函数目标
要使用 Java 创建目标 &LAM; 函数,请使用 Java 事件
以下代码说明了一个使用 Java 的示例目标 &LAM; 函数:
public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }
使用 .NET 创建 &LAM; 函数目标
要使用 .NET 创建目标 &LAM; 函数,请使用 .NET 事件
以下代码说明了一个使用 C# 的示例目标 &LAM; 函数:
public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }
有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅Amazon.Lambda.KinesisAnalyticsEvents