本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在Amazon Glue交互式会话中使用流媒体操作
切换串流会话类型
使用 Amazon Glue 交互式会话配置魔法命令 %streaming
以定义您正在运行的任务并初始化串流交互式会话。
用于交互式开发的采样输入流式传输
我们为帮助增强交互式会话中的交Amazon Glue互体验而开发的一种工具是,在下GlueContext
方添加了一种获取静态直播快照的新方法 DynamicFrame。 GlueContext
允许您检查、交互和实施您的工作流程。
使用 GlueContext
类实例,您将能够找到方法 getSampleStreamingDynamicFrame
。此方法要求的参数为:
-
dataFrame
: Spark 直播 DataFrame -
options
:查看以下可用选项
可用选项包括:
-
windowSize: 这也称为微批持续时间。此参数将确定在触发前一批处理后串流查询的等待时间。此参数值必须小于
pollingTimeInMs
。 -
pollingTimeInMs:该方法将运行的总时长。它将触发至少一个微批处理,以从输入流式传输中获取样本记录。
-
recordPollingLimit:此参数可帮助您限制要从直播中轮询的记录总数。
-
(可选)您也可以使用
writeStreamFunction
将此自定义函数应用于每个记录采样函数。有关 Scala 和 Python 中的示例,请参阅以下内容。
注意
在采样的 DynFrame
为空时,可能是由以下几个原因造成的:
-
流式传输源设置为“Latest(最新)”,并且在采样周期内没有摄入新数据。
-
轮询时间不足以处理其摄入的记录。除非整个批处理处理完毕,否则数据不会显示。
在交互式会话中运行串流应用程序
在 Amazon Glue 交互式会话中,您可以运行 Amazon Glue 串流应用程序,就像您在 Amazon Glue 控制台中创建串流应用程序一样。由于交互式会话基于会话,因此在运行时遇到异常不会导致会话停止。目前,我们具有以迭代方式开发批处理函数的额外优势。例如:
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
我们在以上例子中包括了方法的无效用法,与退出整个应用程序的常规 Amazon Glue 任务不同,用户的编码上下文和定义都完全保留,并且会话仍然在运行。无需引导启动新集群和重新运行所有之前的转换。这使您可以专注于快速迭代批处理函数实施以获得理想的结果。
需要注意的是,交互式会话以阻塞方式评估每个语句,以便会话一次仅能执行一条语句。由于流式传输查询将始终连续且永不结束,具有有效流式传输查询的会话将无法处理任何后续语句,除非这些会话中断。您可以直接从 Jupyter Notebook 发出中断命令,我们的内核将为您处理取消。
以下列正在等待执行的语句序列为例:
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely