

# Amazon Glue 流式处理概念
<a name="glue-streaming-concepts"></a>

 以下部分介绍了 Amazon Glue 流式处理概念。

**Topics**
+ [Amazon Glue 流式处理作业剖析](#glue-streaming-anatomy)

## Amazon Glue 流式处理作业剖析
<a name="glue-streaming-anatomy"></a>

 Amazon Glue 流式处理作业以 Spark 流式处理范式运行，并利用 Spark 框架的结构化流。流式处理作业以特定的时间间隔不断轮询流式处理数据源，以微批次的形式获取记录。以下各节探讨了 Amazon Glue 流式处理作业的不同部分。

![屏幕截图显示了一个 Amazon CloudWatch 监控日志（对于上述示例为 Amazon Glue），并查看了所需的执行程序数量（橙色线），然后根据该数量扩缩了执行程序（蓝色线），而无需手动调整。](http://docs.amazonaws.cn/glue/latest/dg/images/glue-streaming-anatomy.png)


### forEachBatch
<a name="glue-streaming-anatomy-batch"></a>

 `forEachBatch` 方法是 Amazon Glue 流式处理作业运行的入口点。Amazon Glue 流式处理作业使用 `forEachBatch` 方法轮询数据，其功能类似于迭代器，在流式处理作业的生命周期中保持活动状态，定期轮询流式处理数据源以获取新数据，并以微批次处理最新数据。

```
glueContext.forEachBatch(
    frame=dataFrame_AmazonKinesis_node1696872487972,    
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
```

 配置 `forEachBatch` 的 `frame` 属性以指定流式处理数据源。在此示例中，在创建作业期间在空白画布中创建的源节点将填充作业的默认 DataFrame。将 `batch_function` 属性设置为您决定为每个微批次操作调用的 `function`。必须定义一个函数来处理传入数据的批量转换。

### 来源
<a name="glue-streaming-anatomy-source"></a>

 在 `processBatch` 函数的第一步中，程序会验证您定义为 `forEachBatch` 帧属性的 DataFrame 记录数。该程序会在非空 DataFrame 上附加摄取时间戳。`data_frame.count()>0` 子句决定了最新的微批次是否为空，是否可以进一步处理。

```
def processBatch(data_frame, batchId):
    if data_frame.count() >0:       
        AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )
```

### Mapping
<a name="glue-streaming-anatomy-mapping"></a>

 该程序的下一部分是应用映射。通过 spark DataFrame 的 `Mapping.apply` 方法，围绕数据元素定义转换规则。通常，您可以对源数据列进行重命名、更改数据类型或应用自定义函数，然后将其映射到目标列。

```
    #Script generated for node ChangeSchema        
    ChangeSchema_node16986872679326 = ApplyMapping.apply(
        frame =  AmazonKinesis_node1696872487972,
        mappings = [
            ("eventtime", "string", "eventtime", "string"),
            ("manufacturer", "string", "manufacturer", "string"),
            ("minutevolume", "long", "minutevolume", "int"),
            ("o2stats", "long", "OxygenSaturation", "int"),
            ("pressurecontrol", "long", "pressurecontrol", "int"),
            ("serialnumber", "string", "serialnumber", "string"),
            ("ventilatorid", "long", "ventilatorid", "long"),
            ("ingest_year", "string", "ingest_year", "string"),
            ("ingest_month", "string", "ingest_month", "string"),
            ("ingest_day", "string", "ingest_day", "string"),
            ("ingest_hour", "string", "ingest_hour", "string"),
        ],
        transformation_ctx="ChangeSchema_node16986872679326",
    )
        )
```

### sink
<a name="glue-streaming-anatomy-sink"></a>

 在这一部分，来自流式处理数据源的传入数据集存储在目标位置。在本示例中，我们将数据写入 Amazon S3 位置。`AmazonS3_node_path` 属性详细信息是根据您在画布上创建作业期间使用的设置预先填充的。您可以根据自己的用例设置 `updateBehavior`，然后决定不更新数据目录表，或者创建数据目录并在后续运行中更新数据目录架构，或者创建目录表但在后续运行时不更新架构定义。

 `partitionKeys` 属性定义了存储分区选项。默认行为是根据源部分中提供的 `ingestion_time_columns` 对数据进行分区。`compression` 属性允许您设置在目标写入期间应用的压缩算法。您可以选择将 Snappy、LZO 或 GZIP 设置为压缩技术。`enableUpdateCatalog` 属性控制是否需要更新 Amazon Glue 目录表。此属性的可用选项为 `True` 或 `False`。

```
    #Script generated for node Amazon S3        
    AmazonS3_node1696872743449 = glueContext.getSink(
        path =  AmazonS3_node1696872743449_path,
        connection_type = "s3",
        updateBehavior = "UPDATE_IN_DATABASE",
        partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
        compression = "snappy",
        enableUpdateCatalog = True,
        transformation_ctx = "AmazonS3_node1696872743449",
    )
```

### Amazon Glue 目录接收器
<a name="glue-streaming-anatomy-catalog-sink"></a>

 作业的这一部分控制 Amazon Glue 目录表更新行为。根据您的 Amazon Glue 目录数据库名称和与您设计的 Amazon Glue 作业关联的表名称设置 `catalogDatabase` 和 `catalogTableName` 属性。您可以通过 `setFormat` 属性定义目标数据的文件格式。在本示例中，我们将以 parquet 格式存储数据。

 参考本教程设置并运行 Amazon Glue 流式处理作业后，在 Amazon Kinesis Data Streams 生成的流式处理数据将以 parquet 格式存储在 Amazon S3 位置，并快速压缩。成功运行流式处理作业后，您将能够通过 Amazon Athena 查询数据。

```
       
    AmazonS3_node1696872743449 = setCatalogInfo(
        catalogDatabase =  "demo", catalogTableName = "demo_stream_transform_result"
    )
    AmazonS3_node1696872743449.setFormat("glueparquet")
    AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
    )
```