Lambda 持久性函数的最佳实践 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Lambda 持久性函数的最佳实践

持久性函数使用基于重放的执行模型,该模型需要的模式与传统 Lambda 函数不同。遵循这些最佳实践,以构建可靠、经济高效的工作流。

编写确定性代码

在重放期间,您的函数将从头开始运行,并且必须遵循与原始运行相同的执行路径。持久操作之外的代码必须是确定性的,以在输入相同的情况下产生相同的结果。

将非确定性操作封装在多个步骤中:

  • 随机数生成和 UUID

  • 当前时间或时间戳

  • 外部 API 调用和数据库查询

  • 文件系统操作

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; import { randomUUID } from 'crypto'; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { // Generate transaction ID inside a step const transactionId = await context.step('generate-transaction-id', async () => { return randomUUID(); }); // Use the same ID throughout execution, even during replay const payment = await context.step('process-payment', async () => { return processPayment(event.amount, transactionId); }); return { statusCode: 200, transactionId, payment }; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext import uuid @durable_execution def handler(event, context: DurableContext): # Generate transaction ID inside a step transaction_id = context.step( lambda _: str(uuid.uuid4()), name='generate-transaction-id' ) # Use the same ID throughout execution, even during replay payment = context.step( lambda _: process_payment(event['amount'], transaction_id), name='process-payment' ) return {'statusCode': 200, 'transactionId': transaction_id, 'payment': payment}
重要提示

不要使用全局变量或闭包来在各个步骤之间共享状态。通过返回值传递数据。全局状态在重放期间中断,因为步骤返回了缓存的结果,但全局变量重置。

避免闭包突变:在闭包中捕获的变量可能会在重放期间丢失突变。步骤会返回缓存的结果,但步骤之外的变量更新不会重放。

TypeScript
// ❌ WRONG: Mutations lost on replay export const handler = withDurableExecution(async (event, context) => { let total = 0; for (const item of items) { await context.step(async () => { total += item.price; // ⚠️ Mutation lost on replay! return saveItem(item); }); } return { total }; // Inconsistent value! }); // ✅ CORRECT: Accumulate with return values export const handler = withDurableExecution(async (event, context) => { let total = 0; for (const item of items) { total = await context.step(async () => { const newTotal = total + item.price; await saveItem(item); return newTotal; // Return updated value }); } return { total }; // Consistent! }); // ✅ EVEN BETTER: Use map for parallel processing export const handler = withDurableExecution(async (event, context) => { const results = await context.map( items, async (ctx, item) => { await ctx.step(async () => saveItem(item)); return item.price; } ); const total = results.getResults().reduce((sum, price) => sum + price, 0); return { total }; });
Python
# ❌ WRONG: Mutations lost on replay @durable_execution def handler(event, context: DurableContext): total = 0 for item in items: context.step( lambda _: save_item_and_mutate(item, total), # ⚠️ Mutation lost on replay! name=f'save-item-{item["id"]}' ) return {'total': total} # Inconsistent value! # ✅ CORRECT: Accumulate with return values @durable_execution def handler(event, context: DurableContext): total = 0 for item in items: total = context.step( lambda _: save_item_and_return_total(item, total), name=f'save-item-{item["id"]}' ) return {'total': total} # Consistent! # ✅ EVEN BETTER: Use map for parallel processing @durable_execution def handler(event, context: DurableContext): def process_item(ctx, item): ctx.step(lambda _: save_item(item)) return item['price'] results = context.map(items, process_item) total = sum(results.get_results()) return {'total': total}

进行幂等性设计

由于重试或重放,操作可能会执行多次。非幂等性操作会导致重复的副作用,例如向客户收取两次费用或发送多封电子邮件。

使用幂等性令牌:在步骤中生成令牌,并将其与外部 API 调用结合使用,以防止重复操作。

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { // Generate idempotency token once const idempotencyToken = await context.step('generate-idempotency-token', async () => { return crypto.randomUUID(); }); // Use token to prevent duplicate charges const charge = await context.step('charge-payment', async () => { return paymentService.charge({ amount: event.amount, cardToken: event.cardToken, idempotencyKey: idempotencyToken }); }); return { statusCode: 200, charge }; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext import uuid @durable_execution def handler(event, context: DurableContext): # Generate idempotency token once idempotency_token = context.step( lambda _: str(uuid.uuid4()), name='generate-idempotency-token' ) # Use token to prevent duplicate charges def charge_payment(_): return payment_service.charge( amount=event['amount'], card_token=event['cardToken'], idempotency_key=idempotency_token ) charge = context.step(charge_payment, name='charge-payment') return {'statusCode': 200, 'charge': charge}

使用最多一次语义:对于绝不能重复的关键操作(财务交易、库存扣减),请配置最多一次的执行模式。

TypeScript
// Critical operation that must not duplicate await context.step('deduct-inventory', async () => { return inventoryService.deduct(event.productId, event.quantity); }, { executionMode: 'AT_MOST_ONCE_PER_RETRY' });
Python
# Critical operation that must not duplicate context.step( lambda _: inventory_service.deduct(event['productId'], event['quantity']), name='deduct-inventory', config=StepConfig(execution_mode='AT_MOST_ONCE_PER_RETRY') )

数据库幂等性:使用写入前检查模式、条件更新或更新插入操作来防止出现重复记录。

高效管理状态

每个检查点都会将状态保存到持久存储中。大型状态对象会增加成本、减慢检查点创建速度,并影响性能。仅存储重要的工作流协调数据。

尽量精简状态数据:

  • 存储 ID 和引用,而不是完整对象

  • 根据需要在步骤内获取详细数据

  • 使用 Amazon S3 或 DynamoDB 处理大型数据,在状态中传递引用

  • 避免在各步骤之间传递大量有效载荷

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { // Store only the order ID, not the full order object const orderId = event.orderId; // Fetch data within each step as needed await context.step('validate-order', async () => { const order = await orderService.getOrder(orderId); return validateOrder(order); }); await context.step('process-payment', async () => { const order = await orderService.getOrder(orderId); return processPayment(order); }); return { statusCode: 200, orderId }; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext @durable_execution def handler(event, context: DurableContext): # Store only the order ID, not the full order object order_id = event['orderId'] # Fetch data within each step as needed context.step( lambda _: validate_order(order_service.get_order(order_id)), name='validate-order' ) context.step( lambda _: process_payment(order_service.get_order(order_id)), name='process-payment' ) return {'statusCode': 200, 'orderId': order_id}

设计有效的步骤

步骤是持久性函数中的基本工作单位。精心设计的步骤使工作流程更易于理解、调试和维护。

步骤设计原则:

  • 使用描述性名称:使用类似 validate-order 的名称而非 step1,会让日志和错误更容易理解

  • 保持名称固定不变:不要使用带有时间戳或随机值的动态名称。步骤名称必须是确定性的,以便进行重放

  • 合理控制步骤粒度:将复杂的操作分解为重点明确的步骤,但要避免过细的小步骤,因为这些小步骤会增加检查点的开销

  • 对相关操作进行分组:那些应该同时成功或失败的操作应归入同一步骤中

高效使用等待操作

等待操作会暂停执行,但不会消耗资源或产生费用。使用它们而不是让 Lambda 保持运行。

基于时间的等待:context.wait() 用于延迟,而不是 setTimeoutsleep

外部回调:在等待外部系统时使用 context.waitForCallback()。请务必设置超时,以避免出现无限制的等待情况。

轮询:context.waitForCondition() 与指数回退一起使用,在不给外部服务造成过大负担的情况下对其进行轮询。

TypeScript
// Wait 24 hours without cost await context.wait({ seconds: 86400 }); // Wait for external callback with timeout const result = await context.waitForCallback( 'external-job', async (callbackId) => { await externalService.submitJob({ data: event.data, webhookUrl: `https://api.example.com/callbacks/${callbackId}` }); }, { timeout: { seconds: 3600 } } );
Python
# Wait 24 hours without cost context.wait(86400) # Wait for external callback with timeout result = context.wait_for_callback( lambda callback_id: external_service.submit_job( data=event['data'], webhook_url=f'https://api.example.com/callbacks/{callback_id}' ), name='external-job', config=WaitForCallbackConfig(timeout_seconds=3600) )

其他注意事项

错误处理:重试网络超时和速率限制这类暂时出现的故障。不要对永久性故障进行重试操作,例如输入无效或身份验证错误。使用适当的最大尝试次数和回退率配置重试策略。有关详细示例,请参阅错误处理和重试

性能:通过存储引用而不是完整有效载荷,最大限度地减少检查点的大小。使用 context.parallel()context.map() 来同时执行独立的操作。批量执行相关操作以减少检查点开销。

版本控制:通过使用版本号或别名来调用函数,以便将执行操作固定在特定的代码版本上。确保新的代码版本可以处理旧版本的状态。不要以中断重放的方式重命名步骤或更改其行为。

序列化:使用与 JSON 兼容的类型来处理操作的输入和结果。将日期转换为 ISO 字符串,将自定义对象转换为普通对象,然后再将其传递给持久操作。

监控:启用带有执行 ID 和步骤名称的结构化日志记录。设置 CloudWatch 警报,以了解错误率和执行持续时间。使用跟踪来识别瓶颈。有关详细指导,请参阅监控和调试

测试:测试成功路径、错误处理和重放行为。测试回调和等待的超时场景。使用本地测试来缩短迭代时间。有关详细指导,请参阅测试持久性函数

需要避免的常见错误:不要嵌套 context.step() 调用,使用子上下文替代之。将非确定性操作封装在多个步骤中。始终为回调设置超时。在保持步骤粒度的同时减少检查点开销。将引用信息而非大型对象存储在状态中。

其他资源