第 2 步。实施转换逻辑 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

第 2 步。实施转换逻辑

注意

自定义视觉转换仅支持 Python 脚本。不支持 Scala。

要添加实现 .json 配置文件定义的函数的代码,建议将 Python 文件放在与 .json 文件相同的位置,名称相同但扩展名为“.py”。Amazon Glue Studio 自动将 .json 和 .py 文件配对,因此您无需在配置文件中指定 Python 文件的路径。

在 Python 文件中,添加已声明的函数,配置命名参数,并将其注册到 DynamicFrame 中使用。以下是 Python 文件的示例:

from awsglue import DynamicFrame # self refers to the DynamicFrame to transform, # the parameter names must match the ones defined in the config # if it's optional, need to provide a default value def myTransform(self, email, phone, age=None, gender="", country="", promotion=False): resulting_dynf = # do some transformation on self return resulting_dynf DynamicFrame.myTransform = myTransform

建议使用 Amazon Glue 笔记本作为开发和测试 python 代码的最快方法。请参阅开始在 Amazon Glue Studio 中使用笔记本

为了说明如何实现转换逻辑,以下示例中的自定义视觉转换是一种转换,用于过滤传入数据,仅保留与美国特定州相关的数据。.json 文件包含作为 custom_filter_statefunctionName 的参数和两个参数(类型为 “str”的“state”和“colName”)。

示例配置 .json 文件是:

{ "name": "custom_filter_state", "displayName": "Filter State", "description": "A simple example to filter the data to keep only the state indicated.", "functionName": "custom_filter_state", "parameters": [ { "name": "colName", "displayName": "Column name", "type": "str", "description": "Name of the column in the data that holds the state postal code" }, { "name": "state", "displayName": "State postal code", "type": "str", "description": "The postal code of the state whole rows to keep" } ] }
在 Python 中实现配套脚本
  1. 启动 Amazon Glue 笔记本并运行为启动会话提供的初始单元。运行初始单元会创建所需的基本组件。

  2. 创建一个执行示例中描述的过滤的函数并将其注册到 DynamicFrame。复制下面的代码并粘贴到 Amazon Glue 笔记本的单元中。

    from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
  3. 创建或加载示例数据,以测试同一个单元或新单元中的代码。如果您在新单元中添加示例数据,请不要忘记运行该单元。例如:

    # A few of rows of sample data to test data_sample = [ {"state": "CA", "count": 4}, {"state": "NY", "count": 2}, {"state": "WA", "count": 3} ] df1 = glueContext.sparkSession.sparkContext.parallelize(data_sample).toDF() dynf1 = DynamicFrame.fromDF(df1, glueContext, None)
  4. 测试使用不同的参数验证“custom_filter_state”:

    
                        屏幕截图显示了 Amazon Glue 笔记本中的一个单元,其参数传递给 dynamicFrame.show 函数。
  5. 运行多个测试后,使用 .py 扩展名保存代码,并使用镜像 .json 文件名的名称命名 .py 文件。.py 和 .json 文件应位于同一个转换文件夹中。

    复制以下代码并将其粘贴到文件中,并使用 .py 文件扩展名对其进行重命名。

    from awsglue import DynamicFrame def custom_filter_state(self, colName, state): return self.filter(lambda row: row[colName] == state) DynamicFrame.custom_filter_state = custom_filter_state
  6. 在 Amazon Glue Studio 中,打开可视化作业,然后从可用 Transforms(转换)列表中选择该转换,将其添加到该作业中。

    要在 Python 脚本代码中重用此转换,请在作业中的“引用文件路径”下将 Amazon S3 路径添加到 .py 文件,然后在脚本中,将 python 文件的名称(不带扩展名)添加到文件顶部,将其导入。例如:import <文件名(不带扩展名)>