从特征组创建数据集 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

从特征组创建数据集

在离线存储中创建 Feature Store 特征组后,您可以选择使用以下方法来获取数据:

  • 使用 Amazon SageMaker Python SDK

  • 在 Amazon Athena 中运行 SQL 查询

重要

Feature Store 要求在 Amazon Glue 数据目录中注册数据。默认情况下,Feature Store 会在您创建特征组时自动生成 Amazon Glue 数据目录。

为离线存储创建特征组并填充数据后,您可以通过运行查询或通过使用 SDK 联接存储在离线存储中的不同特征组数据来创建数据集。也可以将特征组联接到单个 pandas 数据框。您可以使用 Amazon Athena 编写和执行 SQL 查询。

注意

为确保数据最新,可以将 Amazon Glue 爬网程序设置为按计划运行。

要设置 Amazon Glue 爬网程序,请指定 IAM 角色,供爬网程序用于访问离线存储的 Amazon S3 存储桶。有关更多信息,请参阅创建 IAM 角色

有关如何使用 Amazon Glue 和 Athena 构建用于模型训练和推理的训练数据集的更多信息,请参阅将 Feature Store 与 SDK for Python (Boto3) 结合使用

使用 Amazon SageMaker Python SDK 从特征组中获取数据

您可以使用 Feature Store API 从特征组创建数据集。数据科学家通过从离线存储中的一个或多个特征组中检索机器学习特征数据,创建用于训练的机器学习数据集。使用 create_dataset() 函数创建数据集。您可以使用 SDK 执行以下操作:

  • 从多个特征组创建数据集。

  • 从特征组和 pandas 数据框创建数据集。

默认情况下,Feature Store 不包含您已从数据集中删除的记录,也不包含重复的记录。重复记录在事件时间列中具有记录 ID 和时间戳值。

使用 SDK 创建数据集之前,必须启动 SageMaker 会话。使用以下代码启动会话。

import boto3 from sagemaker.session import Session from sagemaker.feature_store.feature_store import FeatureStore region = boto3.Session().region_name boto_session = boto3.Session(region_name=region) sagemaker_client = boto_session.client( service_name="sagemaker", region_name=region ) featurestore_runtime = boto_session.client( service_name="sagemaker-featurestore-runtime",region_name=region ) feature_store_session = Session( boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_featurestore_runtime_client=featurestore_runtime, ) feature_store = FeatureStore(feature_store_session)

以下代码显示了从多个特征组创建数据集的示例。以下代码片段使用了示例特征组“base_fg_name”、“first_fg_name”和“second_fg_name”,它们可能在 Feature Store 中不存在或具有相同的架构。建议将这些特征组替换为 Feature Store 中存在的特征组。有关如何创建特征组的信息,请参阅步骤 3:创建特征组

from sagemaker.feature_store.feature_group import FeatureGroup s3_bucket_name = "offline-store-sdk-test" base_fg_name = "base_fg_name" base_fg = FeatureGroup(name=base_fg_name, sagemaker_session=feature_store_session) first_fg_name = "first_fg_name" first_fg = FeatureGroup(name=first_fg_name, sagemaker_session=feature_store_session) second_fg_name = "second_fg_name" second_fg = FeatureGroup(name=second_fg_name, sagemaker_session=feature_store_session) feature_store = FeatureStore(feature_store_session) builder = feature_store.create_dataset( base=base_fg, output_path=f"s3://{DOC-EXAMPLE-BUCKET1}", ).with_feature_group(first_fg ).with_feature_group(second_fg, "base_id", ["base_feature_1"])

以下代码显示了从多个特征组和 pandas 数据框创建数据集的示例。

base_data = [[1, 187512346.0, 123, 128], [2, 187512347.0, 168, 258], [3, 187512348.0, 125, 184], [1, 187512349.0, 195, 206]] base_data_df = pd.DataFrame( base_data, columns=["base_id", "base_time", "base_feature_1", "base_feature_2"] ) builder = feature_store.create_dataset( base=base_data_df, event_time_identifier_feature_name='base_time', record_identifier_feature_name='base_id', output_path=f"s3://{s3_bucket_name}" ).with_feature_group(first_fg ).with_feature_group(second_fg, "base_id", ["base_feature_1"])

Feature Store API 为您提供了 create_dataset 函数的辅助标记方法。您可以使用它们执行以下操作:

  • 从多个特征组创建数据集。

  • 从多个特征组和一个 pandas 数据框创建数据集。

  • 从单个特征组和一个 pandas 数据框创建数据集。

  • 使用时间点精确联接创建数据集,其中联接的特征组中的记录按顺序排列。

  • 使用重复的记录创建数据集,而不是遵循函数的默认行为。

  • 使用已删除的记录创建数据集,而不是遵循函数的默认行为。

  • 为您指定的时间段创建数据集。

  • 将数据集另存为 CSV 文件。

  • 将数据集另存为 pandas 数据框。

基本 特征组是联接的一个重要概念。基本特征组是有其他特征组或 pandas 数据框与之联接的特征组。对于每个数据集

您可以将以下可选方法添加到 create_dataset 函数,以配置创建数据集的方式:

  • with_feature_group - 使用基本特征组中的记录标识符和目标特征名称在基本特征组和另一个特征组之间执行内部联接。下面提供了有关您指定的参数的信息:

    • feature_group - 要联接的特征组。

    • target_feature_name_in_base - 基本特征组中用作联接键的特征的名称。其他特征组中的记录标识符是 Feature Store 在联接中使用的其他键。

    • included_feature_names - 表示基本特征组特征名称的字符串列表。您还可以使用该字段指定要包含在数据集中的特征。

    • feature_name_in_target - 可选字符串,表示目标特征组中要与基本特征组中的目标特征进行比较的特征。

    • join_comparator - 可选 JoinComparatorEnum,表示将基本特征组中的目标特征与目标特征组中的特征联接时使用的比较器。默认情况下,这些 JoinComparatorEnum 值可以是 GREATER_THANGREATER_THAN_OR_EQUAL_TOLESS_THANLESS_THAN_OR_EQUAL_TONOT_EQUAL_TOEQUALS

    • join_type - 可选 JoinTypeEnum,表示基本特征组和目标特征组之间的联接类型。默认情况下,这些 JoinTypeEnum 值可以是 LEFT_JOINRIGHT_JOINFULL_JOINCROSS_JOININNER_JOIN

  • with_event_time_range - 使用您指定的事件时间范围创建数据集。

  • as_of - 创建不超过您指定的时间戳的数据集。例如,如果指定 datetime(2021, 11, 28, 23, 55, 59, 342380) 作为值,则会创建一个截至 2021 年 11 月 28 日的数据集。

  • point_time_accurate_join - 创建一个数据集,其中基本特征组的所有事件时间值都小于您要联接的特征组或 pandas 数据帧的所有事件时间值。

  • include_duplicated_records - 在特征组中保留重复的值。

  • include_deleted_records - 在特征组中保留已删除的值。

  • with_number_of_recent_records_by_record_identifier - 您指定的一个整数,用于确定数据集内显示了多少条最近记录。

  • with_number_of_records_by_record_identifier - 一个整数,表示数据集内显示了多少条记录。

配置数据集后,您可以使用以下方法之一指定输出:

  • to_csv_file - 将数据集另存为 CSV 文件。

  • to_dataframe - 将数据集另存为 pandas 数据框。

您可以检索特定时间段之后的数据。以下代码检索某个时间戳之后的数据。

fg1 = FeatureGroup("example-feature-group-1") feature_store.create_dataset( base=fg1, output_path="s3://example-S3-path" ).with_number_of_records_from_query_results(5).to_csv_file()

您还可以检索特定时间段内的数据。您可以使用以下代码来获取特定时间范围内的数据:

fg1 = FeatureGroup("fg1") feature_store.create_dataset( base=fg1, output_path="example-S3-path" ).with_event_time_range( datetime(2021, 11, 28, 23, 55, 59, 342380), datetime(2020, 11, 28, 23, 55, 59, 342380) ).to_csv_file() #example time range specified in datetime functions

您可能想将多个特征组联接到一个 pandas 数据框,其中特征组的事件时间值不迟于数据框的事件时间。使用以下代码作为模板来帮助执行联接。

fg1 = FeatureGroup("fg1") fg2 = FeatureGroup("fg2") events = [['2020-02-01T08:30:00Z', 6, 1], ['2020-02-02T10:15:30Z', 5, 2], ['2020-02-03T13:20:59Z', 1, 3], ['2021-01-01T00:00:00Z', 1, 4]] df = pd.DataFrame(events, columns=['event_time', 'customer-id', 'title-id']) feature_store.create_dataset( base=df, event_time_identifier_feature_name='event_time', record_identifier_feature_name='customer_id', output_path="s3://example-S3-path" ).with_feature_group(fg1, "customer-id" ).with_feature_group(fg2, "title-id" ).point_in_time_accurate_join( ).to_csv_file()

您还可以检索特定时间段之后的数据。以下代码检索 as_of 方法中时间戳指定的时间之后的数据。

fg1 = FeatureGroup("fg1") feature_store.create_dataset( base=fg1, output_path="s3://example-s3-file-path" ).as_of(datetime(2021, 11, 28, 23, 55, 59, 342380) ).to_csv_file() # example datetime values

Amazon Athena 示例查询

您可以在 Amazon Athena 中编写查询,以便从特征组创建数据集。您还可以编写查询,从特征组和单个 pandas 数据框创建数据集。

交互式探索

此查询选择前 1000 条记录。 

SELECT * FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName> LIMIT 1000

没有重复项的最新快照

此查询选择最新的非重复记录。

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>) WHERE row_num = 1;

离线存储中没有重复项和已删除记录的最新快照

此查询会筛选出所有已删除记录,并从离线存储中选择非重复记录。 

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>) WHERE row_num = 1 and  NOT is_deleted;

离线存储中没有重复项和已删除记录的时间旅行

此查询会筛选出所有已删除记录,并选择特定时间点的非重复记录。

SELECT * FROM     (SELECT *,          row_number()         OVER (PARTITION BY <RecordIdentiferFeatureName>     ORDER BY  <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num     FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>     where <EventTimeFeatureName> <= timestamp '<timestamp>')     -- replace timestamp '<timestamp>' with just <timestamp>  if EventTimeFeature is of type fractional WHERE row_num = 1 and NOT is_deleted