在Amazon Glue交互式会话中使用流媒体操作 - Amazon 连接词
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在Amazon Glue交互式会话中使用流媒体操作

切换串流会话类型

使用 Amazon Glue 交互式会话配置魔法命令 %streaming 以定义您正在运行的任务并初始化串流交互式会话。

用于交互式开发的采样输入流式传输

我们为帮助增强交互式会话中的交Amazon Glue互体验而开发的一种工具是,在下GlueContext方添加了一种获取静态直播快照的新方法 DynamicFrame。 GlueContext允许您检查、交互和实施您的工作流程。

使用 GlueContext 类实例,您将能够找到方法 getSampleStreamingDynamicFrame。此方法要求的参数为:

  • dataFrame: Spark 直播 DataFrame

  • options:查看以下可用选项

可用选项包括:

  • windowSize: 这也称为微批持续时间。此参数将确定在触发前一批处理后串流查询的等待时间。此参数值必须小于 pollingTimeInMs

  • pollingTimeInMs:该方法将运行的总时长。它将触发至少一个微批处理,以从输入流式传输中获取样本记录。

  • recordPollingLimit:此参数可帮助您限制要从直播中轮询的记录总数。

  • (可选)您也可以使用 writeStreamFunction 将此自定义函数应用于每个记录采样函数。有关 Scala 和 Python 中的示例,请参阅以下内容。

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
注意

在采样的 DynFrame 为空时,可能是由以下几个原因造成的:

  • 流式传输源设置为“Latest(最新)”,并且在采样周期内没有摄入新数据。

  • 轮询时间不足以处理其摄入的记录。除非整个批处理处理完毕,否则数据不会显示。

在交互式会话中运行串流应用程序

在 Amazon Glue 交互式会话中,您可以运行 Amazon Glue 串流应用程序,就像您在 Amazon Glue 控制台中创建串流应用程序一样。由于交互式会话基于会话,因此在运行时遇到异常不会导致会话停止。目前,我们具有以迭代方式开发批处理函数的额外优势。例如:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

我们在以上例子中包括了方法的无效用法,与退出整个应用程序的常规 Amazon Glue 任务不同,用户的编码上下文和定义都完全保留,并且会话仍然在运行。无需引导启动新集群和重新运行所有之前的转换。这使您可以专注于快速迭代批处理函数实施以获得理想的结果。

需要注意的是,交互式会话以阻塞方式评估每个语句,以便会话一次仅能执行一条语句。由于流式传输查询将始终连续且永不结束,具有有效流式传输查询的会话将无法处理任何后续语句,除非这些会话中断。您可以直接从 Jupyter Notebook 发出中断命令,我们的内核将为您处理取消。

以下列正在等待执行的语句序列为例:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely