“使用 Feature Store 进行欺诈检测”示例笔记本
本页上的示例代码是指使用 Amazon SageMaker Feature Store 进行欺诈检测
步骤 1:设置 Feature Store
要开始使用 Feature Store,请创建 SageMaker 会话、boto3 会话和 Feature Store 会话。此外,还设置要用于特征的 Amazon S3 存储桶。这是您的离线存储。以下代码使用 SageMaker 默认存储桶并为其添加自定义前缀。
注意
用于运行笔记本的角色必须附加以下托管策略:AmazonSageMakerFullAccess
和 AmazonSageMakerFeatureStoreAccess
。有关向 IAM 角色添加策略的信息,请参阅向您的 IAM 角色添加策略。
import boto3 import sagemaker from sagemaker.session import Session sagemaker_session = sagemaker.Session() region = sagemaker_session.boto_region_name boto_session = boto3.Session(region_name=region) role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() prefix = 'sagemaker-featurestore' offline_feature_store_bucket = 's3://{}/{}'.format(default_bucket, prefix) sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region) featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region) feature_store_session = Session( boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_featurestore_runtime_client=featurestore_runtime )
步骤 2:加载数据集并将数据分区到特征组中
将数据加载到每个特征的数据框中。设置特征组后,即可使用这些数据框。在欺诈检测示例中,您可以在以下代码中看到这些步骤。
import numpy as np import pandas as pd import matplotlib.pyplot as plt import io s3_client = boto3.client(service_name='s3', region_name=region) fraud_detection_bucket_name = 'sagemaker-featurestore-fraud-detection' identity_file_key = 'sampled_identity.csv' transaction_file_key = 'sampled_transactions.csv' identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key) transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key) identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read())) transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read())) identity_data = identity_data.round(5) transaction_data = transaction_data.round(5) identity_data = identity_data.fillna(0) transaction_data = transaction_data.fillna(0) # Feature transformations for this dataset are applied before ingestion into FeatureStore. # One hot encode card4, card6 encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank') encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type') transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1) transformed_transaction_data = transformed_transaction_data.rename(columns={"card_bank_american express": "card_bank_american_express"})
步骤 3:设置特征组
设置特征组时,需要使用唯一名称自定义特征名称,并使用 FeatureGroup
类设置每个特征组。
from sagemaker.feature_store.feature_group import FeatureGroup feature_group_name = "some string for a name" feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
例如,在欺诈检测示例中,两个特征组分别是 identity
和 transaction
。在下面的代码中,您可以看到如何使用时间戳自定义名称,然后通过传入名称和会话来设置每个组。
import time from time import gmtime, strftime, sleep from sagemaker.feature_store.feature_group import FeatureGroup identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime()) transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime()) identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session) transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session)
步骤 4:设置记录标识符和事件时间特征
在此步骤中,您将指定记录标识符名称和事件时间特征名称。该名称与数据中相应特征的列相对应。例如,在欺诈检测示例中,相关列是 TransactionID
。当没有可用的时间戳时,可以将 EventTime
附加到您的数据中。在下面的代码中,您可以看到如何设置这些变量,然后将 EventTime
附加到两个特征的数据中。
record_identifier_name = "TransactionID" event_time_feature_name = "EventTime" current_time_sec = int(round(time.time())) identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype="float64") transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64")
步骤 5:加载特征定义
现在,您可以通过传递包含特征数据的数据框来加载特征定义。在以下欺诈检测示例的代码中,身份特征和事务特征均通过使用 load_feature_definitions
加载,此函数会自动检测每列数据的数据类型。对于使用架构而非自动检测的开发人员,请参阅从 Data Wrangler 导出特征组示例,其中的代码显示了如何加载架构、映射架构并将架构添加为 FeatureDefinition
以用于创建 FeatureGroup
。此示例还介绍了 boto3 实现,您可以用它来代替 SageMaker Python SDK。
identity_feature_group.load_feature_definitions(data_frame=identity_data); # output is suppressed transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data); # output is suppressed
步骤 6:创建特征组
在此步骤中,您使用 create
函数创建特征组。以下代码显示了所有可用的参数。默认情况下不会创建在线存储,因此如果要启用它,您必须将其设置为 True
。s3_uri
是离线存储的 S3 存储桶位置。
# create a FeatureGroup feature_group.create( description = "Some info about the feature group", feature_group_name = feature_group_name, record_identifier_name = record_identifier_name, event_time_feature_name = event_time_feature_name, feature_definitions = feature_definitions, role_arn = role, s3_uri = offline_feature_store_bucket, enable_online_store = True, online_store_kms_key_id = None, offline_store_kms_key_id = None, disable_glue_table_creation = False, data_catalog_config = None, tags = ["tag1","tag2"])
欺诈检测示例中的以下代码显示了为两个特征组中的每一个创建的最小 create
调用。
identity_feature_group.create( s3_uri=offline_feature_store_bucket, record_identifier_name=record_identifier_name, event_time_feature_name=event_time_feature_name, role_arn=role, enable_online_store=True ) transaction_feature_group.create( s3_uri=offline_feature_store_bucket, record_identifier_name=record_identifier_name, event_time_feature_name=event_time_feature_name, role_arn=role, enable_online_store=True )
创建特征组时,需要一些时间来加载数据,并且您需要等到特征组创建之后才能使用它。您可以使用以下方法来检查状态。
status = feature_group.describe().get("FeatureGroupStatus")
创建特征组时,您会收到 Creating
作为响应。成功完成此步骤后,响应为 Created
。其他可能的状态是 CreateFailed
、Deleting
或 DeleteFailed
。
步骤 7:使用特征组
现在,您已经设置了特征组,可以执行以下任何任务:
描述特征组
您可以使用 describe
函数检索有关特征组的信息。
feature_group.describe()
列出特征组。
您可以使用 list_feature_groups
函数列出所有特征组。
sagemaker_client.list_feature_groups()
将记录放入特征组。
您可以使用 ingest
函数加载特征数据。您只需传入一个包含特征数据的数据框,设置工作线程数量,然后选择是否等待它返回即可。以下示例演示如何使用 ingest
函数。
feature_group.ingest( data_frame=feature_data, max_workers=3, wait=True )
对于您拥有的每个特征组,对要加载的特征数据运行 ingest
函数。
从特征组获取记录
您可以使用 get_record
函数按记录标识符检索特定特征的数据。以下示例使用示例标识符来检索记录。
record_identifier_value = str(2990130) featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)
欺诈检测示例的响应示例:
... 'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'}, {'FeatureName': 'isFraud', 'ValueAsString': '0'}, {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'}, {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'}, {'FeatureName': 'ProductCD', 'ValueAsString': 'H'}, {'FeatureName': 'card1', 'ValueAsString': '4577'}, ...
生成 Hive DDL 命令
SageMaker Python SDK 的 FeatureStore
类还提供生成 Hive DDL 命令的功能。表的架构根据特征定义生成。列以特征名称命名,数据类型根据特征类型推断。
print(feature_group.as_hive_ddl())
输出示例:
CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.identity-feature-group-27-19-33-00 ( TransactionID INT id_01 FLOAT id_02 FLOAT id_03 FLOAT id_04 FLOAT ...
构建训练数据集
Feature Store 会在您创建特征组时自动生成 Amazon Glue 数据目录,您可以根据需要将其关闭。以下内容介绍了如何使用本主题前面创建的身份和事务特征组中的特征值创建单个训练数据集。此外,以下内容还介绍了如何运行 Amazon Athena 查询,以联接存储在离线存储中的身份和事务特征组数据。
首先,使用 athena_query()
为身份和事务特征组创建 Athena 查询。“table_name”是 Feature Store 自动生成的 Amazon Glue 表。
identity_query = identity_feature_group.athena_query() transaction_query = transaction_feature_group.athena_query() identity_table = identity_query.table_name transaction_table = transaction_query.table_name
编写和执行 Athena 查询
在这些特征组上使用 SQL 编写查询,然后使用 .run()
命令执行查询,并指定 Amazon S3 存储桶位置以便将数据集保存在那里。
# Athena query query_string = 'SELECT * FROM "'+transaction_table+'" LEFT JOIN "'+identity_table+'" ON "'+transaction_table+'".transactionid = "'+identity_table+'".transactionid' # run Athena query. The output is loaded to a Pandas dataframe. dataset = pd.DataFrame() identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/query_results/') identity_query.wait() dataset = identity_query.as_dataframe()
在这里,您可以使用此数据集训练模型,然后执行推理。
删除特征组
您可以使用 delete
函数删除特征组。
feature_group.delete()
以下代码示例来自欺诈检测示例。
identity_feature_group.delete() transaction_feature_group.delete()
有关更多信息,请参阅删除特征组 API