数据源和提取 - Amazon SageMaker
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

数据源和提取

可通过多种方式将数据引入 中Amazon SageMaker Feature Store。Feature Store 为数据提取提供单个名为 的 API 调用PutRecord,该调用使您能够批量提取数据或从流式传输源提取数据。您还可以使用 Amazon SageMaker Data Wrangler 设计 功能,然后将您的功能提取到 中Feature Store。

流提取

您可以使用流式传输源(例如 Kafka 或 Kinesis )作为数据源,其中的功能是从该数据源提取的,并直接馈送到在线功能存储以进行训练、推理或功能创建。可以通过调用同步 PutRecord API 调用将记录推送到功能存储。由于这是一个同步 API 调用,因此,它允许在单个 API 调用中推送少量更新。这使您能够在检测到更新后立即保持特征值的新鲜度高并发布值。这些功能也称为流式处理功能。

Data Wrangler 与 Feature Store

Data Wrangler 是 的一项功能Studio,它提供端到端解决方案来导入、准备、转换、特征化和分析数据Data Wrangler。利用 ,您可以设计您的功能并将它们提取到功能存储中。 

在 中Studio,在与 交互后Data Wrangler,选择 Export (导出) 选项卡,选择 Export Step 导出步骤),然后选择 Feature Store (功能存储),如以下屏幕截图所示。这会导出一个 Jupyter 笔记本,其中包含其中的所有源代码,用于创建一个Feature Store功能组,以将您的功能从 Data Wrangler 添加到脱机或联机功能存储。

创建功能组后,您还可以选择并联接多个功能组中的数据,以便在 中创建新的设计功能Data Wrangler,然后将数据集导出到 S3 存储桶。 

有关如何导出到 的更多信息Feature Store,请参阅导出到 SageMaker Feature Store。

Athena 和 AWS Glue (带 ) Feature Store

在脱机Feature Store功能存储中创建功能组后,您可以选择在Athena目录AWS Glue上使用 Amazon 运行查询。这需要在数据目录中使用在 中自动为您注册的其他目录详细信息注册数据Feature Store。换句话说,在创建功能组并且您可以将其关闭时Feature Store, 会自动构建 AWS Glue 数据目录。当您想要通过执行 SQL 查询然后训练模型以进行推理来构建数据集时,这尤其有用。 

在创建您的 并在离线存储中填充数据FeatureStore后,您将能够编写 SQL 查询以联接存储在不同 的离线存储中的数据FeatureGroups。 为此,您可以使用 Amazon Athena 编写和执行 SQL 查询。您可以将AWS Glue爬网程序设置为按计划运行,以确保您的目录始终是最新的。 

如果要执行此操作,请定义爬AWS Glue网程序可用于访问脱机存储的 S3 存储桶的角色。有关更多信息,请参阅创建 IAM 角色。 

有关如何使用 AWS Glue 和 Athena 为模型训练和推理构建训练数据集的更多信息,请参阅构建训练数据集:创建功能组

示例Athena查询

下面我们提供了一些示例查询,这些查询充当模板,供您使用 快速编写查询Athena。 

交互式探究

此查询选择前 1000 条记录。 

SELECT * FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName> LIMIT 1000

最新快照(无重复项)

此查询选择最新的非重复记录。

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>) WHERE row_num = 1;

离线存储中无重复项和删除的记录的最新快照

此查询从脱机存储中选择非重复记录并删除记录。 

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>) WHERE row_num = 1 and  NOT is_deleted;

Time (时间) 在离线商店中没有重复和删除的记录时执行

此查询从特定时间点选择非重复记录和删除的记录。您需要指定时间戳字符串。

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>     where <EventTimeFeatureName> <= timestamp '<timestamp>')     -- replace timestamp '<timestamp>' with just <timestamp>  if EventTimeFeature is of type fractional WHERE row_num = 1 and NOT is_deleted