示例函数代码 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

示例函数代码

要处理来自 Amazon Kinesis 的事件,请遍历事件对象中包含的记录,并对每个对象中包含的 Base64 编码的数据进行解码。

注意

此页面上的代码不支持聚合记录。您可以在 Kinesis Producer Library 配置中禁用聚合,也可以使用 Kinesis Record Aggregation 库来取消聚合记录。

示例代码具有以下语言。

Node.js 12.x

以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

压缩示例代码以创建部署程序包。有关说明,请参阅使用 .zip 文件存档部署 Node.js Lambda 函数

Java 11

以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Java 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

在代码中,recordHandler 是处理程序。该处理程序使用了在 aws-lambda-java-events 库中定义的预定义 KinesisEvent 类。

例 ProcessKinesisEvents.java

package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Void>{ @Override public Void handleRequest(KinesisEvent event, Context context) { for(KinesisEventRecord rec : event.getRecords()) { System.out.println(new String(rec.getKinesis().getData().array())); } return null; } }

如果该处理程序正常返回并且没有异常,则 Lambda 认为输入的记录批次得到成功处理并开始读取流中的新记录。如果该处理程序引发异常,则 Lambda 认为输入的记录批次未得到处理,并用相同的记录批次再次调用该函数。

附属物

  • aws-lambda-java-core

  • aws-lambda-java-events

  • aws-java-sdk

使用 Lambda 库依赖项构建代码以创建部署程序包。有关说明,请参阅使用 .zip 或 JAR 文件存档部署 Java Lambda 函数

C#

以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 C# 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

在代码中,HandleKinesisRecord 是处理程序。该处理程序使用了在 Amazon.Lambda.KinesisEvents 库中定义的预定义 KinesisEvent 类。

例 ProcessingKinesisEvents.cs

using System; using System.IO; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; namespace KinesisStreams { public class KinesisSample { [LambdaSerializer(typeof(JsonSerializer))] public void HandleKinesisRecord(KinesisEvent kinesisEvent) { Console.WriteLine($"Beginning to process {kinesisEvent.Records.Count} records..."); foreach (var record in kinesisEvent.Records) { Console.WriteLine($"Event ID: {record.EventId}"); Console.WriteLine($"Event Name: {record.EventName}"); string recordData = GetRecordContents(record.Kinesis); Console.WriteLine($"Record Data:"); Console.WriteLine(recordData); } Console.WriteLine("Stream processing complete."); } private string GetRecordContents(KinesisEvent.Record streamRecord) { using (var reader = new StreamReader(streamRecord.Data, Encoding.ASCII)) { return reader.ReadToEnd(); } } } }

使用以上示例替换.NET Core 中的 Program.cs有关说明,请参阅使用 .zip 文件存档部署 C# Lambda 函数

Python 3

以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Python 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

例 ProcessKinesisRecords.py

from __future__ import print_function #import json import base64 def lambda_handler(event, context): for record in event['Records']: #Kinesis data is base64 encoded so decode here payload=base64.b64decode(record["kinesis"]["data"]) print("Decoded payload: " + str(payload))

压缩示例代码以创建部署程序包。有关说明,请参阅使用 .zip 文件存档部署 Python Lambda 函数

转到

以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Go 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。

例 ProcessKinesisRecords.go

import ( "strings" "github.com/aws/aws-lambda-go/events" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) { for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data dataText := string(dataBytes) fmt.Printf("%s Data = %s \n", record.EventName, dataText) } }

使用 go build 构建可执行文件并创建部署程序包。有关说明,请参阅使用 .zip 文件存档部署 Go Lambda 函数