

# 在 Lambda 中实现有状态的 Kinesis Data Streams 处理
<a name="services-kinesis-windows"></a>

Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息，可以使用按时间定义的窗口来限制包含的记录。

滚动窗口是定期打开和关闭的不同窗口。预设情况下，Lambda 调用是无状态的，在没有外部数据库的情况下，无法使用它们跨多次连续调用处理数据。但是，有了滚动窗口后，您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小，Lambda 将提前终止窗口。

流中的每条记录都属于特定窗口。Lambda 将至少处理每条记录一次，但不保证每条记录只处理一次。在极少数情况下（例如错误处理），某些记录可能会被多次处理。第一次处理记录时始终按顺序处理。如果多次处理记录，则可能会不按顺序处理。

## 聚合和处理
<a name="streams-tumbling-processing"></a>

系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口中接收的所有记录。您可以分多个批次接收这些记录，每个批次都作为单独的调用。每次调用都会收到一个状态。因此，当使用滚动窗口时，Lambda 函数响应必须包含 `state` 属性。如果响应不包含 `state` 属性，Lambda 会将其视作失败的调用。为了满足该条件，您的函数可以返回一个具有以下 JSON 形状的 `TimeWindowEventResponse` 对象：

**Example `TimeWindowEventResponse` 值**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注意**  
对于 Java 函数，我们建议使用 `Map<String, String>` 来表示状态。

在窗口末尾，标志 `isFinalInvokeForWindow` 被设置 `true`，以表示这是最终状态，并且已准备好进行处理。处理完成后，窗口完成，最终调用完成，然后状态将被删除。

在窗口结束时，Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后，函数会检查序列号并继续进行流处理。如果调用失败，则您的 Lambda 函数将暂停进一步处理，直到成功调用为止。

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## 配置
<a name="streams-tumbling-config"></a>

您可以在创建或更新事件源映射时配置滚动窗口。要配置翻转窗口，请以秒为单位进行指定（[TumblingWindowInSeconds](https://docs.amazonaws.cn/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)）。以下示例 Amazon Command Line Interface (Amazon CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 `tumbling-window-example-function`。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds {{120}}
```

Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳，供 Lambda 在确定边界时使用。

滚动窗口聚合不支持重新分片。当分片结束时，Lambda 认为当前窗口已关闭，并且任何子分片都将以全新状态启动自己的窗口。如果没有向当前窗口添加任何新记录，则 Lambda 会等待最多 2 分钟，然后假定该窗口已结束。这有助于确保函数读取当前窗口中的所有记录，即使这些记录是间歇性添加的。

滚动窗口完全支持现有的重试策略 `maxRetryAttempts` 和 `maxRecordAge`。

**Example Handler.py – 聚合和处理**  
以下 Python 函数演示了如何聚合然后处理您的最终状态：  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```