使用 Amazon SageMaker Feature Store Spark 批量摄取
Amazon SageMaker Feature Store Spark 是一个 Spark 连接器,可将 Spark 库连接到 Feature Store。Feature Store Spark 简化了从 Spark DataFrame
到特征组的数据摄取流程。Feature Store 支持使用现有 ETL 管道在 Amazon EMR、GIS、Amazon Glue 作业、Amazon SageMaker Processing 作业或 SageMaker 笔记本上使用 Spark 进行批量数据摄取。
我们为 Python 和 Scala 开发人员提供了安装和实施批量数据摄取的方法。Python 开发人员可以按照 Amazon SageMaker Feature Store Spark GitHub 存储库sagemaker-feature-store-pyspark
Python 库进行本地开发、在 Amazon EMR 上安装以及用于 Jupyter 笔记本。Scala 开发人员可以使用 Amazon SageMaker Feature Store Spark SDK Maven Central 存储库
可以通过以下方式使用 Spark 连接器摄取数据,具体取决于是启用了在线存储、离线存储,还是两者均已启用。
-
默认摄取 - 如果启用了在线存储,Spark 连接器将首先使用 PutRecord API 将您的数据框摄取到在线存储中。在线存储仅保留事件时间最长的记录。如果启用了离线存储,Feature Store 将在 15 分钟内将您的数据框提取到离线存储中。有关在线和离线存储工作原理的更多信息,请参阅 Feature Store 概念。
您可以通过不在
.ingest_data(...)
方法中指定target_stores
来完成此操作。 -
离线存储直接摄取 - 如果启用了离线存储,Spark 连接器会将您的数据框直接批量摄取到离线存储中。将数据框直接摄取到离线存储并不会更新在线存储。
您可以通过在
.ingest_data(...)
方法中设置target_stores=["OfflineStore"]
来完成此操作。 -
仅在线存储 - 如果启用了在线存储,Spark 连接器会使用 PutRecord API 将您的数据框摄取到在线存储中。将数据框直接摄取到在线存储并不会更新离线存储。
您可以通过在
.ingest_data(...)
方法中设置target_stores=["OnlineStore"]
来完成此操作。
有关使用不同摄取方法的信息,请参阅示例实施。
Feature Store Spark 安装
Scala 用户
Scala 用户可在 Amazon SageMaker Feature Store Spark SDK Maven Central 存储库
要求
-
Spark >= 3.0.0 且 <= 3.3.0
-
iceberg-spark-runtime
>= 0.14.0 -
Scala >= 2.12.x
-
Amazon EMR >= 6.1.0(仅当使用 Amazon EMR 时)
在 POM.xml 中声明依赖项
Feature Store Spark 连接器在 iceberg-spark-runtime
库中有一个依赖项。因此,如果要将数据摄取到使用 Iceberg 表格式自动创建的特征组中,则必须将相应版本的 iceberg-spark-runtime
库添加到该依赖项中。例如,如果使用的是 Spark 3.1,则必须在项目的 POM.xml
中声明以下内容:
<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>
Python 用户
开源 Amazon SageMaker Feature Store Spark GitHub 存储库
要求
-
Spark >= 3.0.0 且 <= 3.3.0
-
Amazon EMR >= 6.1.0(仅当使用 Amazon EMR 时)
-
内核 =
conda_python3
我们建议将 $SPARK_HOME
设置为安装了 Spark 的目录。安装期间,Feature Store 会将所需的 JAR 上传到 SPARK_HOME
,这样就会自动加载依赖项。需要使用 Spark 启动 JVM,才能使这个 PySpark 库正常运行。
本地安装
要查找有关安装的更多信息,请通过将 --verbose
附加到以下安装命令来启用详细模式。
pip3 install sagemaker-feature-store-pyspark-
3.1
--no-binary :all:
在 Amazon EMR 上安装
使用发行版 6.1.0 或更高版本创建一个 Amazon EMR 集群。启用 SSH 可帮助您解决任何问题。
可以执行以下操作之一来安装库:
-
在 Amazon EMR 中创建自定义步骤。
-
使用 SSH 连接到您的集群并从那里安装库。
注意
以下信息使用 Spark 版本 3.1,但您可以指定满足要求的任何版本。
export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-
3.1
--no-binary :all: --verbose
注意
如果要将从属 JAR 自动安装到 SPARK_HOME,请勿使用引导步骤。
在 SageMaker 笔记本实例上安装
使用以下命令安装与 Spark 连接器兼容的 PySpark 版本:
!pip3 install pyspark==
3.1.1
!pip3 install sagemaker-feature-store-pyspark-3.1
--no-binary :all:
如果您要对离线存储执行批量摄取,则依赖项不在笔记本实例环境中。
from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()
在带有 GIS 的笔记本上安装
重要
必须使用 Amazon Glue 版本 2.0 或更高版本。
可以使用以下信息来帮助在 Amazon Glue 交互式会话 (GIS) 中安装 PySpark 连接器。
Amazon SageMaker Feature Store Spark 需要在会话初始化期间将特定 Spark 连接器 JAR 上传到您的 Amazon S3 存储桶。有关将所需 JAR 上传到 S3 存储桶的更多信息,请参阅检索 Feature Store Spark 的 JAR。
上传 JAR 后,必须使用以下命令为 GIS 会话提供 JAR。
%extra_jars s3:/
<YOUR_BUCKET>
/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
要在 Amazon Glue 运行时系统中安装 Feature Store Spark,请使用 GIS 笔记本中的 %additional_python_modules
魔术命令。Amazon Glue 运行 pip
到您在 %additional_python_modules
下指定的模块。
%additional_python_modules sagemaker-feature-store-pyspark-
3.1
启动 Amazon Glue 会话之前,必须使用前面的两个魔术命令。
在 Amazon Glue 作业中安装
重要
必须使用 Amazon Glue 版本 2.0 或更高版本。
要在 Amazon Glue 作业中安装 Spark 连接器,请在创建 Amazon Glue 作业时使用 --extra-jars
参数提供必要的 JAR,并使用 --additional-python-modules
将 Spark 连接器作为作业参数安装,如以下示例所示。有关将所需 JAR 上传到 S3 存储桶的更多信息,请参阅检索 Feature Store Spark 的 JAR。
glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/
<YOUR_BUCKET>
/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )
在 Amazon SageMaker Processing 作业中安装
要在 Amazon SageMaker Processing 作业中使用 Feature Store Spark,请自带映像。有关自带映像的更多信息,请参阅自带 SageMaker 映像。向 Dockerfile 添加安装步骤。将 Docker 映像推送到 Amazon ECR 存储库后,您可以使用 PySpark 处理器创建处理作业。有关使用 PySpark 处理器创建处理作业的更多信息,请参阅使用 Apache Spark 进行数据处理。
以下是向 Dockerfile 添加安装步骤的示例。
FROM
<ACCOUNT_ID>
.dkr.ecr.<AWS_REGION>
.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
检索 Feature Store Spark 的 JAR
要检索 Feature Store Spark 依赖项 JAR,必须在任何具有网络访问权限的 Python 环境中使用 pip
从 Python Package Index (PyPI) 存储库中安装 Spark 连接器。SageMaker Jupyter Notebook 就是一个具有网络访问权限的 Python 环境的示例。
以下命令用于安装 Spark 连接器。
!pip install sagemaker-feature-store-pyspark-
3.1
安装 Feature Store Spark 后,您可以检索 JAR 位置并将 JAR 上传到 Amazon S3。
feature-store-pyspark-dependency-jars
命令提供了 Feature Store Spark 添加的必要依赖项 JAR 的位置。您可以使用该命令来检索 JAR,并将其上传到 Amazon S3。
jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "
<YOUR_BUCKET>
","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")