

# 采用持久性函数的事件源映射
<a name="durable-invoking-esm"></a>

持久性函数适用于所有的 Lambda 事件源映射。为持久性函数配置事件源映射，方法与为标准函数配置事件源映射相同。事件源映射会自动轮询事件源（例如 Amazon SQS、Kinesis 和 DynamoDB Streams），并使用批量记录调用函数。

事件源映射对于那些处理具有复杂多步骤工作流程的流或队列的持久性函数来说非常有用。例如，您可以创建一个持久性函数，用于通过重试、外部 API 调用以及人工审批等功能处理 Amazon SQS 消息。

## 事件源映射如何调用持久性函数
<a name="durable-esm-invocation-behavior"></a>

事件源映射同步调用持久性函数，以等待完整的持久执行完成，然后再处理下一批或将记录标记为已处理。如果持久执行的时间超过 15 分钟，则执行超时并失败。事件源映射收到超时异常，并根据其重试配置处理该异常。

## 15 分钟执行限制
<a name="durable-esm-duration-limit"></a>

当通过事件源映射调用持久性函数时，整个持久执行时间不能超过 15 分钟。此限制适用于从开始到完成的整个持久执行，而不仅仅是单个函数调用。

这个 15 分钟的限制与 Lambda 函数的超时（同样为最长 15 分钟）是相互独立的。函数超时用于控制每次单独调用的运行时长，而持久执行超时则控制从执行开始到完成的总耗时。

**应用场景示例：**
+ **有效：**持久性函数通过三个步骤来处理 Amazon SQS 消息，每个步骤都需要 2 分钟，然后会再等待 5 分钟，之后才会完成最后一个步骤。总执行时间：11 分钟。这之所以能奏效，是因为整个过程不超过 15 分钟。
+ **无效：**持久性函数处理 Amazon SQS 消息，在 2 分钟内完成初始处理，然后会等待 20 分钟以接收外部回调，之后再完成整个流程。总执行时间：22 分钟。这超出了 15 分钟的限制并且会失败。
+ **无效：**持久性函数处理一个 Kinesis 记录，其在各步骤之间的多个等待操作总时长为 30 分钟。尽管每次单独的调用都能迅速完成，但整个执行时间却超过了 15 分钟。

**重要**  
在使用事件源映射时，请将持久执行超时时间设置为 15 分钟或更短，否则创建事件源映射的操作将会失败。如果您的工作流程需要较长的执行时间，请使用下面所描述的中介函数模式。

## 配置事件源映射
<a name="durable-esm-configuration"></a>

使用 Lambda 控制台、Amazon CLI 或 Amazon SDK 为持久性函数配置事件源映射。所有标准事件源映射属性都适用于持久性函数：

```
aws lambda create-event-source-mapping \
  --function-name arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1 \
  --event-source-arn arn:aws:sqs:us-east-1:123456789012:my-queue \
  --batch-size 10 \
  --maximum-batching-window-in-seconds 5
```

为持久性函数配置事件源映射时，请记住使用限定的 ARN（带有版本号或别名）。

## 使用事件源映射进行错误处理
<a name="durable-esm-error-handling"></a>

事件源映射提供内置的错误处理功能，可与持久性函数配合使用：
+ **重试行为：**如果初始调用失败，则事件源映射将根据其重试配置进行重试操作。根据您的要求配置最大重试尝试次数和重试间隔时间。
+ **死信队列：**配置死信队列，以捕获所有重试尝试后仍失败的记录。这能够防止信息丢失，并允许对出现故障的记录进行人工检查。
+ **部分批处理失败：**对于 Amazon SQS 和 Kinesis，使用部分批处理故障报告来单独处理记录，并且仅重试失败的记录。
+ **出错时二分：**对于 Kinesis 和 DynamoDB Streams，启用错误时二分以拆分失败的批次并隔离有问题的记录。

**注意**  
持久性函数支持死信队列（DLQ）用于错误处理，但不支持 Lambda 目标。配置 DLQ 以从失败的调用中捕获记录。

有关事件源映射错误处理的完整信息，请参阅[事件源映射](invocation-eventsourcemapping.md)。

## 为长时间运行的工作流程使用中介函数
<a name="durable-esm-intermediary-function"></a>

如果您的工作流程需要超过 15 分钟才能完成，那么可以在事件源映射和持久性函数之间使用一个中介标准 Lambda 函数。中介函数接收来自事件源映射的事件并异步调用持久性函数，从而消除 15 分钟的执行限制。

这种模式将事件源映射的同步调用模式与持久性函数的长时间运行执行模型分离开来。事件源映射调用中介函数，该函数在开始持久执行后会迅速返回。然后，持久性函数可以根据需要独立运行（最长 1 年）。

### 架构
<a name="durable-esm-intermediary-architecture"></a>

中介函数模式使用三个组件：

1. **事件源映射：**轮询事件源（Amazon SQS、Kinesis、DynamoDB Streams），并与批量记录同步调用中介函数。

1. **中介函数：**一种标准的 Lambda 函数，它会从事件源映射接收事件，对数据进行验证和转换（如果需要的话），然后异步调用持久性函数。此函数执行迅速（通常在 1 秒以内完成），并会将控制权交还给事件源映射。

1. **持久性函数：**以复杂、多步骤的逻辑来处理事件，该逻辑能够持续运行较长时间。异步调用，因此不受 15 分钟限制的约束。

### 实施
<a name="durable-esm-intermediary-implementation"></a>

中介函数接收来自事件源映射的整个事件并异步调用持久性函数。使用执行名称参数以确保幂等性执行开始，从而避免在事件源映射重试时出现重复处理的情况：

------
#### [ TypeScript ]

```
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
import { SQSEvent } from 'aws-lambda';
import { createHash } from 'crypto';

const lambda = new LambdaClient({});

export const handler = async (event: SQSEvent) => {
  // Invoke durable function asynchronously with execution name
  await lambda.send(new InvokeCommand({
    FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
    InvocationType: 'Event',
    Payload: JSON.stringify({
      executionName: event.Name,
      event: event
    })
  }));
  
  return { statusCode: 200 };
};
```

------
#### [ Python ]

```
import boto3
import json
import hashlib

lambda_client = boto3.client('lambda')

def handler(event, context):  
    # Invoke durable function asynchronously with execution name
    lambda_client.invoke(
        FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1',
        InvocationType='Event',
        Payload=json.dumps({
            'executionName': execution_name,
            'event': event["name"]
        })
    )
    
    return {'statusCode': 200}
```

------

对于中介函数本身的幂等性，请使用 [Powertools for Amazon Lambda](https://docs.amazonaws.cn//powertools/) 来防止在事件源映射重试中介函数时重复调用持久性函数。

持久性函数会接收带有执行名称的有效载荷，并对所有涉及长时间运行逻辑的记录进行处理：

------
#### [ TypeScript ]

```
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';

export const handler = withDurableExecution(
  async (payload: any, context: DurableContext) => {
    const sqsEvent = payload.event;
    
    // Process each record with complex, multi-step logic
    const results = await context.map(
      sqsEvent.Records,
      async (ctx, record) => {
        const validated = await ctx.step('validate', async () => {
          return validateOrder(JSON.parse(record.body));
        });
        
        // Wait for external approval (could take hours or days)
        const approval = await ctx.waitForCallback(
          'approval',
          async (callbackId) => {
            await requestApproval(callbackId, validated);
          },
          { timeout: { hours: 48 } }
        );
        
        // Complete processing
        return await ctx.step('complete', async () => {
          return completeOrder(validated, approval);
        });
      }
    );
    
    return { statusCode: 200, processed: results.getResults().length };
  }
);
```

------
#### [ Python ]

```
from aws_durable_execution_sdk_python import durable_execution, DurableContext
from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
from collections.abc import Sequence
import json

def validate_order(order_data: dict) -> dict:
    """Validate order data - always passes."""
    return order_data

def request_approval(callback_id: str, validated_order: dict) -> None:
    """Request approval for the order - always passes."""
    pass

def complete_order(validated_order: dict, approval_result: str) -> dict:
    """Complete the order processing - always passes."""
    return validated_order

@durable_execution
def lambda_handler(payload, context: DurableContext):
    sqs_event = payload['event']

    def process_record(
        ctx: DurableContext, 
        record: dict, 
        index: int, 
        items: Sequence[dict]
    ) -> dict:
        validated = ctx.step(
            lambda _: validate_order(json.loads(record['body'])),
            name=f'validate-{index}'
        )

        approval = ctx.wait_for_callback(
            submitter=lambda callback_id, wait_ctx: request_approval(callback_id, validated),
            name=f'approval-{index}',
            config=WaitForCallbackConfig(timeout=Duration.from_seconds(172800))
        )

        return ctx.step(
            lambda _: complete_order(validated, approval),
            name=f'complete-{index}'
        )

    results = context.map(
        inputs=sqs_event['Records'],
        func=process_record,
        name='process-records'
    )

    return {
        'statusCode': 200, 
        'started': results.started_count,
        'completed': results.success_count,
        'failed': results.failure_count,
        'total': results.total_count
    }
```

------

### 重要注意事项
<a name="durable-esm-intermediary-tradeoffs"></a>

这种模式通过将事件源映射与持久执行相分离，消除了 15 分钟的执行限制。中介函数在启动持久执行后会立即返回，从而使得事件源映射能够继续进行处理。然后，持久性函数可以根据需要长时间独立运行。

中介函数在调用持久性函数时才视为成功，而非在持久执行完成时才算成功。如果持久性执行后续失败，事件源映射将不会进行重试，因为其已经成功处理了该批次。在持久性函数中实施错误处理，并为失败的执行配置死信队列。

使用执行名称参数确保幂等性执行开始。如果事件源映射重试中介函数，则持久性函数将不会开始重复执行，因为执行名称已经存在。

## 支持的事件源
<a name="durable-esm-supported-sources"></a>

持久性函数支持使用事件源映射的所有 Lambda 事件源：
+ Amazon SQS 队列（标准和 FIFO）
+ Kinesis Streams
+ DynamoDB Streams
+ Amazon Managed Streaming for Apache Kafka (Amazon MSK)
+ 自行管理的 Apache Kafka
+ Amazon MQ（ActiveMQ 和 RabbitMQ）
+ Amazon DocumentDB 更改流

在调用持久性函数时，所有事件源类型都受到 15 分钟的持久执行时间限制的约束。