Amazon Glue 串流交互式会话 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Amazon Glue 串流交互式会话

切换串流会话类型

使用 Amazon Glue 交互式会话配置魔法命令 %streaming 以定义您正在运行的任务并初始化串流交互式会话。

用于交互式开发的采样输入流式传输

皆在帮助提升 Amazon Glue 交互式会话中交互式体验的一种派生工具是在 GlueContext 下添加一种新方法,以获取静态 DynamicFrame 中流式传输的快照。GlueContext 允许您检查、交互和实施工作流。

在交互式会话中,默认提供 GlueContext,使用此类实例,您将能够找到此方法 getSampleStreamingDynamicFrame。此方法要求的参数为:

  • dataFrame:Spark Streaming Dataframe

  • options:查看以下可用选项

可用选项包括:

  • windowSize:这也称为微批处理持续时间。此参数将确定在触发前一批处理后串流查询的等待时间。此参数值必须小于 pollingTimeInMs

  • pollingTimeInMs:方法将运行的总时间长度。它将触发至少一个微批处理,以从输入流式传输中获取样本记录。

  • recordPollingLimit:此参数帮助您限制从流式传输中轮询的记录的总数。

(可选)您也可以使用 writeStreamFunction 将此自定义函数应用于每个记录采样函数。有关 Scala 和 Python 中的示例,请参阅以下内容。

Scala
val jsonString: String = s"""{"pollingTimeInMs": "2000", "windowSize": "1 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString)) dynFrame.show()
Python
options = { "pollingTimeInMs": "5000", "windowSize": "10 seconds" } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, None)
注意

在采样的 DynFrame 为空时,可能是由以下几个原因造成的:

  • 串流源设置为“最新”,并且在采样周期内没有摄入新数据。

  • 轮询时间不足以处理其摄入的记录。除非处理完毕整个批处理,否则数据不会显示。

在交互式会话中运行串流应用程序

在 Amazon Glue 交互式会话中,您可以运行 Amazon Glue 串流应用程序,就像您在 Amazon Glue 控制台中创建串流应用程序一样。由于交互式会话基于会话,因此在运行时遇到异常不会导致会话停止。目前,我们具有以迭代方式开发批处理函数的额外优势。例如:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

我们在以上例子中包括了方法的无效用法,与退出整个应用程序的常规 Amazon Glue 任务不同,用户的编码上下文和定义都完全保留,并且会话仍然在运行。无需引导启动新集群和重新运行所有之前的转换。这使您可以专注于快速迭代批处理函数实施以获得理想的结果。

需要注意的是,交互式会话以阻塞方式评估每个语句,以便会话一次仅能执行一条语句。由于串流查询是连续的并且永远不会结束,因此具有活动的串流查询的会话将无法处理任何后续语句,除非这些会话中断。您可以直接从 Jupyter Notebook 发出中断命令,我们的内核将为您处理取消。

以下列正在等待执行的语句序列为例:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely