示例 函数代码
要处理来自 Amazon Kinesis 的事件,请遍历事件对象中包含的记录,并对每个对象中包含的 Base64 编码的数据进行解码。
此页面上的代码不支持聚合记录。您可以在 Kinesis Producer Library 配置中禁用聚合,也可以使用 Kinesis Record Aggregation 库
示例代码具有以下语言。
Node.js 12.x
以下示例代码接收 Kinesis 事件输入并对其所包含的消息进行处理。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
例 index.js
console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };
压缩示例代码以创建部署程序包。有关说明,请参阅 使用 .zip 文件归档部署 Node.js Lambda 函数。
Java 11
以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Java 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
在代码中,recordHandler
是处理程序。该处理程序使用了在 KinesisEvent
库中定义的预定义 aws-lambda-java-events
类。
例 ProcessKinesisEvents.java
package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Void>{ @Override public Void handleRequest(KinesisEvent event, Context context) { for(KinesisEventRecord rec : event.getRecords()) { System.out.println(new String(rec.getKinesis().getData().array())); } return null; } }
如果该处理程序正常返回并且没有异常,则 Lambda 认为输入的记录批次得到成功处理并开始读取流中的新记录。如果该处理程序引发异常,则 Lambda 认为输入的记录批次未得到处理,并用相同的记录批次再次调用该函数。
Dependencies
aws-lambda-java-core
aws-lambda-java-events
aws-java-sdk
使用 Lambda 库依赖项构建代码,创建部署程序包。有关说明,请参阅 使用 .zip 或 JAR 文件归档部署 Java Lambda 函数。
C#
以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 C# 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
在代码中,HandleKinesisRecord
是处理程序。该处理程序使用了在 KinesisEvent
库中定义的预定义 Amazon.Lambda.KinesisEvents
类。
例 ProcessingKinesisEvents.cs
using System; using System.IO; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; namespace KinesisStreams { public class KinesisSample { [LambdaSerializer(typeof(JsonSerializer))] public void HandleKinesisRecord(KinesisEvent kinesisEvent) { Console.WriteLine($"Beginning to process {kinesisEvent.Records.Count} records..."); foreach (var record in kinesisEvent.Records) { Console.WriteLine($"Event ID: {record.EventId}"); Console.WriteLine($"Event Name: {record.EventName}"); string recordData = GetRecordContents(record.Kinesis); Console.WriteLine($"Record Data:"); Console.WriteLine(recordData); } Console.WriteLine("Stream processing complete."); } private string GetRecordContents(KinesisEvent.Record streamRecord) { using (var reader = new StreamReader(streamRecord.Data, Encoding.ASCII)) { return reader.ReadToEnd(); } } } }
用上述示例替换 .NET Core 项目中的 Program.cs
。有关说明,请参阅 使用 .zip 文件归档部署 C# Lambda 函数。
Python 3
以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Python 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
例 ProcessKinesisRecords.py
from __future__ import print_function #import json import base64 def lambda_handler(event, context): for record in event['Records']: #Kinesis data is base64 encoded so decode here payload=base64.b64decode(record["kinesis"]["data"]) print("Decoded payload: " + str(payload))
压缩示例代码以创建部署程序包。有关说明,请参阅 使用 .zip 文件归档部署 Python Lambda 函数。
Go
以下是将 Kinesis 事件记录数据作为输入接收并对其进行处理的示例 Go 代码。为了展示这个过程,代码会将一些传入的事件数据写入 CloudWatch Logs。
例 ProcessKinesisRecords.go
import ( "strings" "github.com/aws/aws-lambda-go/events" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) { for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data dataText := string(dataBytes) fmt.Printf("%s Data = %s \n", record.EventName, dataText) } }
通过 go build
生成可执行文件并创建部署程序包。有关说明,请参阅使用 .zip 文件归档部署 Go Lambda 函数。