Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
Lambda 持久性函数的最佳实践
持久性函数使用基于重放的执行模型,该模型需要的模式与传统 Lambda 函数不同。遵循这些最佳实践,以构建可靠、经济高效的工作流。
编写确定性代码
在重放期间,您的函数将从头开始运行,并且必须遵循与原始运行相同的执行路径。持久操作之外的代码必须是确定性的,以在输入相同的情况下产生相同的结果。
将非确定性操作封装在多个步骤中:
随机数生成和 UUID
当前时间或时间戳
外部 API 调用和数据库查询
文件系统操作
- TypeScript
-
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
import { randomUUID } from 'crypto';
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
// Generate transaction ID inside a step
const transactionId = await context.step('generate-transaction-id', async () => {
return randomUUID();
});
// Use the same ID throughout execution, even during replay
const payment = await context.step('process-payment', async () => {
return processPayment(event.amount, transactionId);
});
return { statusCode: 200, transactionId, payment };
}
);
- Python
-
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import uuid
@durable_execution
def handler(event, context: DurableContext):
# Generate transaction ID inside a step
transaction_id = context.step(
lambda _: str(uuid.uuid4()),
name='generate-transaction-id'
)
# Use the same ID throughout execution, even during replay
payment = context.step(
lambda _: process_payment(event['amount'], transaction_id),
name='process-payment'
)
return {'statusCode': 200, 'transactionId': transaction_id, 'payment': payment}
重要提示
不要使用全局变量或闭包来在各个步骤之间共享状态。通过返回值传递数据。全局状态在重放期间中断,因为步骤返回了缓存的结果,但全局变量重置。
避免闭包突变:在闭包中捕获的变量可能会在重放期间丢失突变。步骤会返回缓存的结果,但步骤之外的变量更新不会重放。
- TypeScript
-
// ❌ WRONG: Mutations lost on replay
export const handler = withDurableExecution(async (event, context) => {
let total = 0;
for (const item of items) {
await context.step(async () => {
total += item.price; // ⚠️ Mutation lost on replay!
return saveItem(item);
});
}
return { total }; // Inconsistent value!
});
// ✅ CORRECT: Accumulate with return values
export const handler = withDurableExecution(async (event, context) => {
let total = 0;
for (const item of items) {
total = await context.step(async () => {
const newTotal = total + item.price;
await saveItem(item);
return newTotal; // Return updated value
});
}
return { total }; // Consistent!
});
// ✅ EVEN BETTER: Use map for parallel processing
export const handler = withDurableExecution(async (event, context) => {
const results = await context.map(
items,
async (ctx, item) => {
await ctx.step(async () => saveItem(item));
return item.price;
}
);
const total = results.getResults().reduce((sum, price) => sum + price, 0);
return { total };
});
- Python
-
# ❌ WRONG: Mutations lost on replay
@durable_execution
def handler(event, context: DurableContext):
total = 0
for item in items:
context.step(
lambda _: save_item_and_mutate(item, total), # ⚠️ Mutation lost on replay!
name=f'save-item-{item["id"]}'
)
return {'total': total} # Inconsistent value!
# ✅ CORRECT: Accumulate with return values
@durable_execution
def handler(event, context: DurableContext):
total = 0
for item in items:
total = context.step(
lambda _: save_item_and_return_total(item, total),
name=f'save-item-{item["id"]}'
)
return {'total': total} # Consistent!
# ✅ EVEN BETTER: Use map for parallel processing
@durable_execution
def handler(event, context: DurableContext):
def process_item(ctx, item):
ctx.step(lambda _: save_item(item))
return item['price']
results = context.map(items, process_item)
total = sum(results.get_results())
return {'total': total}
进行幂等性设计
由于重试或重放,操作可能会执行多次。非幂等性操作会导致重复的副作用,例如向客户收取两次费用或发送多封电子邮件。
使用幂等性令牌:在步骤中生成令牌,并将其与外部 API 调用结合使用,以防止重复操作。
- TypeScript
-
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
// Generate idempotency token once
const idempotencyToken = await context.step('generate-idempotency-token', async () => {
return crypto.randomUUID();
});
// Use token to prevent duplicate charges
const charge = await context.step('charge-payment', async () => {
return paymentService.charge({
amount: event.amount,
cardToken: event.cardToken,
idempotencyKey: idempotencyToken
});
});
return { statusCode: 200, charge };
}
);
- Python
-
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import uuid
@durable_execution
def handler(event, context: DurableContext):
# Generate idempotency token once
idempotency_token = context.step(
lambda _: str(uuid.uuid4()),
name='generate-idempotency-token'
)
# Use token to prevent duplicate charges
def charge_payment(_):
return payment_service.charge(
amount=event['amount'],
card_token=event['cardToken'],
idempotency_key=idempotency_token
)
charge = context.step(charge_payment, name='charge-payment')
return {'statusCode': 200, 'charge': charge}
使用最多一次语义:对于绝不能重复的关键操作(财务交易、库存扣减),请配置最多一次的执行模式。
- TypeScript
-
// Critical operation that must not duplicate
await context.step('deduct-inventory', async () => {
return inventoryService.deduct(event.productId, event.quantity);
}, {
executionMode: 'AT_MOST_ONCE_PER_RETRY'
});
- Python
-
# Critical operation that must not duplicate
context.step(
lambda _: inventory_service.deduct(event['productId'], event['quantity']),
name='deduct-inventory',
config=StepConfig(execution_mode='AT_MOST_ONCE_PER_RETRY')
)
数据库幂等性:使用写入前检查模式、条件更新或更新插入操作来防止出现重复记录。
高效管理状态
每个检查点都会将状态保存到持久存储中。大型状态对象会增加成本、减慢检查点创建速度,并影响性能。仅存储重要的工作流协调数据。
尽量精简状态数据:
- TypeScript
-
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
// Store only the order ID, not the full order object
const orderId = event.orderId;
// Fetch data within each step as needed
await context.step('validate-order', async () => {
const order = await orderService.getOrder(orderId);
return validateOrder(order);
});
await context.step('process-payment', async () => {
const order = await orderService.getOrder(orderId);
return processPayment(order);
});
return { statusCode: 200, orderId };
}
);
- Python
-
from aws_durable_execution_sdk_python import durable_execution, DurableContext
@durable_execution
def handler(event, context: DurableContext):
# Store only the order ID, not the full order object
order_id = event['orderId']
# Fetch data within each step as needed
context.step(
lambda _: validate_order(order_service.get_order(order_id)),
name='validate-order'
)
context.step(
lambda _: process_payment(order_service.get_order(order_id)),
name='process-payment'
)
return {'statusCode': 200, 'orderId': order_id}
设计有效的步骤
步骤是持久性函数中的基本工作单位。精心设计的步骤使工作流程更易于理解、调试和维护。
步骤设计原则:
使用描述性名称:使用类似 validate-order 的名称而非 step1,会让日志和错误更容易理解
保持名称固定不变:不要使用带有时间戳或随机值的动态名称。步骤名称必须是确定性的,以便进行重放
合理控制步骤粒度:将复杂的操作分解为重点明确的步骤,但要避免过细的小步骤,因为这些小步骤会增加检查点的开销
对相关操作进行分组:那些应该同时成功或失败的操作应归入同一步骤中
高效使用等待操作
等待操作会暂停执行,但不会消耗资源或产生费用。使用它们而不是让 Lambda 保持运行。
基于时间的等待:将 context.wait() 用于延迟,而不是 setTimeout 或 sleep。
外部回调:在等待外部系统时使用 context.waitForCallback()。请务必设置超时,以避免出现无限制的等待情况。
轮询:将 context.waitForCondition() 与指数回退一起使用,在不给外部服务造成过大负担的情况下对其进行轮询。
- TypeScript
-
// Wait 24 hours without cost
await context.wait({ seconds: 86400 });
// Wait for external callback with timeout
const result = await context.waitForCallback(
'external-job',
async (callbackId) => {
await externalService.submitJob({
data: event.data,
webhookUrl: `https://api.example.com/callbacks/${callbackId}`
});
},
{ timeout: { seconds: 3600 } }
);
- Python
-
# Wait 24 hours without cost
context.wait(86400)
# Wait for external callback with timeout
result = context.wait_for_callback(
lambda callback_id: external_service.submit_job(
data=event['data'],
webhook_url=f'https://api.example.com/callbacks/{callback_id}'
),
name='external-job',
config=WaitForCallbackConfig(timeout_seconds=3600)
)
其他注意事项
错误处理:重试网络超时和速率限制这类暂时出现的故障。不要对永久性故障进行重试操作,例如输入无效或身份验证错误。使用适当的最大尝试次数和回退率配置重试策略。有关详细示例,请参阅错误处理和重试。
性能:通过存储引用而不是完整有效载荷,最大限度地减少检查点的大小。使用 context.parallel() 和 context.map() 来同时执行独立的操作。批量执行相关操作以减少检查点开销。
版本控制:通过使用版本号或别名来调用函数,以便将执行操作固定在特定的代码版本上。确保新的代码版本可以处理旧版本的状态。不要以中断重放的方式重命名步骤或更改其行为。
序列化:使用与 JSON 兼容的类型来处理操作的输入和结果。将日期转换为 ISO 字符串,将自定义对象转换为普通对象,然后再将其传递给持久操作。
监控:启用带有执行 ID 和步骤名称的结构化日志记录。设置 CloudWatch 警报,以了解错误率和执行持续时间。使用跟踪来识别瓶颈。有关详细指导,请参阅监控和调试。
测试:测试成功路径、错误处理和重放行为。测试回调和等待的超时场景。使用本地测试来缩短迭代时间。有关详细指导,请参阅测试持久性函数。
需要避免的常见错误:不要嵌套 context.step() 调用,使用子上下文替代之。将非确定性操作封装在多个步骤中。始终为回调设置超时。在保持步骤粒度的同时减少检查点开销。将引用信息而非大型对象存储在状态中。
其他资源