持久执行 SDK - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

持久执行 SDK

持久执行 SDK 是构建持久性函数的基础。它提供了为进度创建检查点、处理重试和管理执行流所需的基元。该 SDK 简化了检查点管理与回放的复杂性,使您能够编写具有自动容错功能的顺序代码。

该 SDK 适用于 JavaScript、TypeScript 和 Python。有关完整的 API 文档和示例,请参阅 GitHub 上的 JavaScript/TypeScript SDKPython SDK

DurableContext

该 SDK 为您的函数提供了一个可公开所有持久操作的 DurableContext 对象。此上下文取代了标准的 Lambda 上下文,并提供了创建检查点、管理执行流以及与外部系统进行协调的方法。

要使用该 SDK,请使用持久执行封装器封装您的 Lambda 处理程序:

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { // Your function receives DurableContext instead of Lambda context // Use context.step(), context.wait(), etc. return result; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext @durable_execution def handler(event: dict, context: DurableContext): # Your function receives DurableContext # Use context.step(), context.wait(), etc. return result

封装器会拦截您的函数调用,加载任何现有的检查点日志,并提供管理重放和检查点的 DurableContext

该 SDK 的用途

该 SDK 承担三项关键责任,以实现持久执行:

检查点管理:当您的函数执行持久操作时,该 SDK 会自动创建检查点。每个检查点都记录操作类型、输入和结果。当您的函数完成一个步骤时,该 SDK 会保留检查点,然后再继续操作。这样可以确保您的函数在发生中断后可以从任何已完成的操作中恢复。

重放协调:当您的函数在暂停或中断后恢复时,该 SDK 会执行重放操作。它会从头开始运行您的代码,但会跳过已完成的操作,使用存储的检查点结果而不是重新执行它们。该 SDK 可确保重放操作具有确定性,如果输入和检查点日志相同,您的函数会生成相同的结果。

状态隔离:该 SDK 将执行状态与您的业务逻辑分开维护。每个持久执行都有自己的检查点日志,其他执行无法访问该日志。该 SDK 对静态检查点数据进行加密,并确保重放期间的状态保持一致。

检查点机制的工作原理

当您调用持久操作时,该 SDK 将遵循以下顺序:

  1. 检查现有检查点:该 SDK 会检查此操作是否已在之前的调用中完成。如果已存在检查点,该 SDK 将在不重新执行操作的情况下返回存储的结果。

  2. 执行操作:如果不存在检查点,该 SDK 将执行您的操作代码。对于步骤而言,这意味着要调用您的函数。对于等待而言,这意味着要调度恢复。

  3. 创建检查点:操作完成后,该 SDK 会将结果序列化并创建检查点。检查点包括操作类型、名称、输入、结果和时间戳。

  4. 保留检查点:该 SDK 会调用 Lambda 检查点 API 来保留检查点。这确保了在继续执行之前,检查点是持久的。

  5. 返回结果:该 SDK 将操作结果返回到您的代码,然后继续执行下一个操作。

此顺序确保了每当一项操作完成时,其结果都会被安全地存储下来。如果您的函数在任何时候被中断,该 SDK 能够重放至上一个已完成的检查点。

重放行为

当您的函数在暂停或中断后恢复时,该 SDK 会执行重放操作:

  1. 加载检查点日志:该 SDK 会从 Lambda 中检索此次执行的检查点日志。

  2. 从头开始运行:该 SDK 会从头开始调用您的处理程序函数,而不是从它暂停的地方调用。

  3. 跳过已完成的持久操作:当您的代码调用持久操作时,该 SDK 会对照检查点日志检查每个操作。对于已完成的持久操作,该 SDK 会直接返回已存储的结果,而不会执行操作代码。

    注意

    如果子上下文的结果大于最大检查点大小(256 KB),则该上下文的代码将在重放期间再次执行。这使得您能够根据在该上下文中运行的持久操作构建出较大的结果,这些结果将从检查点日志中进行查找。因此,必须确保只在该上下文中运行确定性代码。在使用包含较大结果的子上下文时,最佳做法是在步骤中执行长时间运行或非确定性的工作,而仅在上下文中执行整合结果的短时运行任务。

  4. 在中断点恢复:当该 SDK 执行到一个没有检查点的操作时,它会正常运行,并在完成持久操作后创建新的检查点。

这种重放机制要求您的代码必须具有确定性。在输入和检查点日志相同的情况下,您的函数必须按相同顺序执行持久操作调用。该 SDK 通过在重放过程中验证操作名称和类型是否与检查点日志匹配来强制执行此规则。

可用的持久操作

DurableContext 为不同的协调模式提供操作。每次持久操作都会自动创建检查点,从而确保您的函数能够从任何位置恢复运行。

Steps

使用自动检查点机制和重试功能执行业务逻辑。对于调用外部服务、进行计算或执行任何需要进行检查点记录的逻辑的操作,请遵循相应的步骤。该 SDK 会在步骤之前和之后创建一个检查点,存储结果以供重放。

TypeScript
const result = await context.step('process-payment', async () => { return await paymentService.charge(amount); });
Python
result = context.step( lambda _: payment_service.charge(amount), name='process-payment' )

步骤支持可配置的重试策略、执行语义(最多一次或至少一次)和自定义序列化。

等待

在指定的持续时间内暂停执行,且不消耗计算资源。SDK 会创建检查点、终止函数调用并调度恢复。当等待过程结束时,Lambda 会再次调用您的函数,并且 SDK 会重放到等待点,然后继续执行。

TypeScript
// Wait 1 hour without charges await context.wait({ seconds: 3600 });
Python
# Wait 1 hour without charges context.wait(3600)

回拨

回调使您的函数能够暂停并等待外部系统提供输入。创建回调时,该 SDK 会生成一个唯一的回调 ID 并创建一个检查点。然后,您的函数将暂停(终止调用),且不会产生计算费用。外部系统使用 SendDurableExecutionCallbackSuccessSendDurableExecutionCallbackFailure Lambda API 提交回调结果。提交回调后,Lambda 会再次调用您的函数,SDK 会重放至回调点,然后您的函数会继续使用回调结果执行。

SDK 提供了两种处理回调的方法:

createCallback:创建回调并同时返回 Promise 和回调 ID。您将回调 ID 发送到外部系统,该系统随后使用 Lambda API 提交结果。

TypeScript
const [promise, callbackId] = await context.createCallback('approval', { timeout: { hours: 24 } }); await sendApprovalRequest(callbackId, requestData); const approval = await promise;
Python
callback = context.create_callback( name='approval', config=CallbackConfig(timeout_seconds=86400) ) context.step( lambda _: send_approval_request(callback.callback_id), name='send_request' ) approval = callback.result()

waitForCallback:通过将回调的创建与提交合二为一,简化了回调处理过程。SDK 创建回调,使用回调 ID 执行您的提交函数,然后等待结果。

TypeScript
const result = await context.waitForCallback( 'external-api', async (callbackId, ctx) => { await submitToExternalAPI(callbackId, requestData); }, { timeout: { minutes: 30 } } );
Python
result = context.wait_for_callback( lambda callback_id: submit_to_external_api(callback_id, request_data), name='external-api', config=WaitForCallbackConfig(timeout_seconds=1800) )

配置超时,以防止函数无限期等待。如果回调超时,SDK 会抛出 CallbackError,此时您的函数可以处理超时的情况。对长时间运行的回调使用心跳超时,以检测外部系统停止响应的时间。

对于需要人工干预的工作流程、与外部系统的集成、webhook 的响应,或者任何必须暂停等待外部输入才能继续执行的场景,都应使用回调机制。

并行执行

使用可选的并发控制同时执行多个操作。该 SDK 负责管理并行执行过程,为每个操作创建检查点,并根据您的完成策略处理故障情况。

TypeScript
const results = await context.parallel([ async (ctx) => ctx.step('task1', async () => processTask1()), async (ctx) => ctx.step('task2', async () => processTask2()), async (ctx) => ctx.step('task3', async () => processTask3()) ]);
Python
results = context.parallel( lambda ctx: ctx.step(lambda _: process_task1(), name='task1'), lambda ctx: ctx.step(lambda _: process_task2(), name='task2'), lambda ctx: ctx.step(lambda _: process_task3(), name='task3') )

使用 parallel 来同时执行独立的操作。

Map

使用可选的并发控制对数组中的每个项目同时执行操作。该 SDK 负责管理并发执行过程,为每个操作创建检查点,并根据您的完成策略处理故障情况。

TypeScript
const results = await context.map(itemArray, async (ctx, item, index) => ctx.step('task', async () => processItem(item, index)) );
Python
results = context.map( item_array, lambda ctx, item, index: ctx.step( lambda _: process_item(item, index), name='task' ) )

使用 map 处理具有并发控制的数组。

子上下文

为分组操作创建隔离的执行上下文。子上下文具有自己的检查点日志,并且可以包含多个步骤、等待操作以及其他操作。该 SDK 将整个子上下文视为一个整体来进行重试和恢复操作。

利用子上下文来组织复杂的工作流程、实施子工作流程,或者将那些应该一同重试的操作进行隔离。

TypeScript
const result = await context.runInChildContext( 'batch-processing', async (childCtx) => { return await processBatch(childCtx, items); } );
Python
result = context.run_in_child_context( lambda child_ctx: process_batch(child_ctx, items), name='batch-processing' )

重放机制要求持久操作按照确定的顺序进行。通过使用多个子上下文,您可以实现多条工作流的并行执行,而确定性则在每个上下文中分别适用。这使您能够构建高性能的功能,而这些函数能够高效地利用多个 CPU 核心。

例如,假设我们启动了两个子上下文,分别称为 A 和 B。在初始调用时,这两个上下文内的步骤会按照这样的顺序执行,其中“A”步骤与“B”步骤同时进行:A1、B1、B2、A2、A3。重放时,时间会大大缩短,因为结果是从检查点日志中获取的,而且步骤的执行顺序与之前有所不同,即:B1、A1、A2、B2、A3。因为“A”步骤是按照正确的顺序(A1、A2、A3)被处理的,而“B”步骤也是按照正确的顺序(B1、B2)被处理的,所以确定性的需求得到了正确满足。

有条件等待

在两次尝试之间使用自动检查点机制对条件进行轮询。该 SDK 将执行您的检查函数,用结果创建一个检查点,根据您的策略等待,然后重复直到条件得到满足。

TypeScript
const result = await context.waitForCondition( async (state, ctx) => { const status = await checkJobStatus(state.jobId); return { ...state, status }; }, { initialState: { jobId: 'job-123', status: 'pending' }, waitStrategy: (state) => state.status === 'completed' ? { shouldContinue: false } : { shouldContinue: true, delay: { seconds: 30 } } } );
Python
result = context.wait_for_condition( lambda state, ctx: check_job_status(state['jobId']), config=WaitForConditionConfig( initial_state={'jobId': 'job-123', 'status': 'pending'}, wait_strategy=lambda state, attempt: {'should_continue': False} if state['status'] == 'completed' else {'should_continue': True, 'delay': 30} ) )

使用 waitForCondition 进行外部系统轮询、等待资源准备就绪或使用回退实现重试。

函数调用

调用另一个 Lambda 函数并等待其结果。该 SDK 会创建检查点,调用目标函数,并在调用完成后恢复您的函数。这使得函数组合和工作流程分解成为可能。

TypeScript
const result = await context.invoke( 'invoke-processor', 'arn:aws:lambda:us-east-1:123456789012:function:processor', { data: inputData } );
Python
result = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:processor', {'data': input_data}, name='invoke-processor' )

如何计量持久操作

您通过 DurableContext 调用的每个持久操作都会创建检查点来跟踪执行进度并存储状态数据。这些操作会根据其使用情况产生费用,而这些检查点中可能包含产生您的数据写入和保留成本的数据。存储的数据包括调用事件数据、从步骤中返回的有效载荷以及完成回调时传递的数据。了解持久操作的计量方式有助于您估算执行成本并优化工作流程。有关定价详情,请参阅 Lambda 定价页面

有效载荷大小是指持久操作所保存的已序列化数据的大小。数据以字节为单位计量,大小可能会因操作使用的序列化器而异。一次操作的有效载荷可以是操作成功完成后的实际结果,也可以是在操作失败时所生成的序列化错误对象。

基本操作

基本操作是持久性函数的基本构建块:

操作 检查点时间 操作次数 保存的数据
Execution Started 1 输入有效载荷大小
Execution 已完成(成功/失败/已停止) 0 输出有效载荷大小
步骤 # 重试/成功/失败 1 + N 次重试 每次尝试所返回的有效载荷大小
Wait Started 1 不适用
WaitForCondition 每次轮询尝试 1 + N 次轮询 每次轮询尝试所返回的有效载荷大小
调用级重试 Started 1 错误对象的有效载荷

回调操作

回调操作使您的函数能够暂停并等待外部系统提供输入。这些操作会在创建回调和完成回调时创建检查点:

操作 检查点时间 操作次数 保存的数据
CreateCallback Started 1 不适用
通过 API 调用完成回调 已完成 0 回调有效载荷
WaitForCallback Started 3 + N 次重试(上下文 + 回调 + 步骤) 提交步骤尝试所返回的有效载荷,再加上两个回调有效载荷的副本

复合操作

复合操作将多个持久操作组合在一起,以处理诸如并行执行、数组处理和嵌套上下文等复杂的协调模式:

操作 检查点时间 操作次数 保存的数据
Parallel Started 1 + N 个分支(1 个父上下文 + N 个子上下文) 每个分支返回的有效载荷大小的最多两份副本,外加每个分支的状态
Map Started 1 + N 个分支(1 个父上下文 + N 个子上下文) 每次迭代返回的有效载荷大小的最多两份副本,外加每次迭代的状态
Promise 助手 已完成 1 Promise 所返回的有效载荷大小
RunInChildContext 成功/失败 1 从子上下文中返回的有效载荷大小

对于上下文,例如来自 runInChildContext 或由复合操作在内部使用的上下文,将直接对小于 256KB 的结果进行检查点检查。较大的结果并不会被存储下来——相反,在重放时,它们会通过重新处理上下文的操作而被重新构建。