Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
教程:将 Lambda 与 Kinesis Data Streams 结合使用
在本教程中,您将创建 Lambda 函数来处理 Amazon Kinesis 数据流中的事件。
-
自定义应用程序将记录写入流。
-
Amazon Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。
-
Amazon Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来运行 Lambda 函数。
先决条件
本教程假设您对 Lambda 基本操作和 Lambda 控制台有一定了解。如果您还没有了解,请按照 使用控制台创建 Lambda 函数 中的说明创建您的第一个 Lambda 函数。
要完成以下步骤,您需要 Amazon CLI 版本 2。在单独的数据块中列出了命令和预期输出:
aws --version
您应看到以下输出:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
对于长命令,使用转义字符 (\
) 将命令拆分为多行。
在 Linux 和 macOS 中,可使用您首选的 shell 和程序包管理器。
在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip
)。安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。本指南中的示例 CLI 命令使用 Linux 格式。如果您使用的是 Windows CLI,则必须重新格式化包含内联 JSON 文档的命令。
创建执行角色
创建执行角色,向您的函数授予访问 Amazon 资源的权限。
创建执行角色
-
在 IAM 控制台中,打开 Roles(角色)页面。
-
选择创建角色。
-
创建具有以下属性的角色。
-
Trusted entity(可信任的实体)– Amazon Lambda。
-
Permissions(权限)– AWSLambdaKinesisExecutionRole。
-
Role name(角色名称)– lambda-kinesis-role
。
AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。
创建函数
创建一个处理您的 Kinesis 消息的 Lambda 函数。函数代码会将 Kinesis 记录的事件 ID 和事件数据记录到 CloudWatch Logs 中。
本教程使用 Node.js 18.x 运行时系统,但我们还提供了其他运行时系统语言的示例代码。您可以选择以下框中的选项卡,查看适用于您感兴趣的运行时系统的代码。您将在此步骤中使用的 JavaScript 代码是 JavaScript 选项卡中显示的第一个示例。
- .NET
-
- Amazon SDK for .NET
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 .NET 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- 适用于 Go V2 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Go 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Java 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- 适用于 JavaScript 的 SDK(v3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 JavaScript 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
通过 TypeScript 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- 适用于 PHP 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 PHP 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
使用 Python 将 Kinesis 事件与 Lambda 结合使用。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- 适用于 Ruby 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 Ruby 将 Kinesis 事件与 Lambda 结合使用。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- 适用于 Rust 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
通过 Rust 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
创建函数
-
为项目创建一个目录,然后切换到该目录。
mkdir kinesis-tutorial
cd kinesis-tutorial
-
将示例 JavaScript 代码复制到名为 index.js
的新文件中。
-
创建部署程序包。
zip function.zip index.js
-
使用 create-function
命令创建 Lambda 函数。
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
测试 Lambda 函数
使用 invoke
Amazon Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。
测试 Lambda 函数
-
将以下 JSON 复制到文件中并将其保存为 input.txt
。
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
使用 invoke
命令将事件发送到该函数。
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
如果使用 cli-binary-format 版本 2,则 Amazon CLI 选项是必需的。要将其设为默认设置,请运行 aws configure set cli-binary-format raw-in-base64-out
。有关更多信息,请参阅版本 2 的 Amazon Command Line Interface 用户指南中的 Amazon CLI 支持的全局命令行选项。
响应将保存到 out.txt
中。
使用 create-stream
命令创建流。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
运行下面的 describe-stream
命令以获取流 ARN。
aws kinesis describe-stream --stream-name lambda-stream
您应看到以下输出:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。
运行以下 Amazon CLI add-event-source
命令。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings
命令获取事件源映射的列表。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
在该响应中,您可以验证状态值是否为 enabled
。可以禁用事件源映射,以临时暂停轮询而不丢失任何记录。
要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data
值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台中查看这些日志。
清除资源
除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 Amazon 资源,可防止您的 Amazon Web Services 账户 产生不必要的费用。
删除执行角色
-
打开 IAM 控制台的角色页面。
-
选择您创建的执行角色。
-
选择删除。
-
在文本输入字段中输入角色名称,然后选择 Delete(删除)。
删除 Lambda 函数
-
打开 Lamba 控制台的 Functions(函数)页面。
-
选择您创建的函数。
-
依次选择操作和删除。
-
在文本输入字段中键入 delete
,然后选择 Delete(删除)。