SageMakerEstimator在 Spark 管道中使用 - Amazon SageMaker
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

SageMakerEstimator在 Spark 管道中使用

您可以在 org.apache.spark.ml.Estimator 管道中使用 org.apache.spark.ml.Model 评估程序和 SageMakerEstimator 模型,以及 SageMakerModel 评估程序和 org.apache.spark.ml.Pipeline 模型,如以下示例所示:

import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.PCA import org.apache.spark.sql.SparkSession import com.amazonaws.services.sagemaker.sparksdk.IAMRole import com.amazonaws.services.sagemaker.sparksdk.algorithms import com.amazonaws.services.sagemaker.sparksdk.algorithms.KMeansSageMakerEstimator val spark = SparkSession.builder.getOrCreate // load mnist data as a dataframe from libsvm val region = "us-east-1" val trainingData = spark.read.format("libsvm") .option("numFeatures", "784") .load(s"s3://sagemaker-sample-data-$region/spark/mnist/train/") val testData = spark.read.format("libsvm") .option("numFeatures", "784") .load(s"s3://sagemaker-sample-data-$region/spark/mnist/test/") // substitute your SageMaker IAM role here val roleArn = "arn:aws:iam::account-id:role/rolename" val pcaEstimator = new PCA() .setInputCol("features") .setOutputCol("projectedFeatures") .setK(50) val kMeansSageMakerEstimator = new KMeansSageMakerEstimator( sagemakerRole = IAMRole(integTestingRole), requestRowSerializer = new ProtobufRequestRowSerializer(featuresColumnName = "projectedFeatures"), trainingSparkDataFormatOptions = Map("featuresColumnName" -> "projectedFeatures"), trainingInstanceType = "ml.p2.xlarge", trainingInstanceCount = 1, endpointInstanceType = "ml.c4.xlarge", endpointInitialInstanceCount = 1) .setK(10).setFeatureDim(50) val pipeline = new Pipeline().setStages(Array(pcaEstimator, kMeansSageMakerEstimator)) // train val pipelineModel = pipeline.fit(trainingData) val transformedData = pipelineModel.transform(testData) transformedData.show()

参数 trainingSparkDataFormatOptions 配置 Spark 以将“projectedFeatures”列序列化成 protobuf 以用于模型训练。此外,Spark 在默认情况下将“label”列序列化成 protobuf。

由于我们想要使用“projectedFeatures”列进行推理,因此我们将列名传递到 ProtobufRequestRowSerializer

以下示例显示了一个转换的 DataFrame

+-----+--------------------+--------------------+-------------------+---------------+ |label| features| projectedFeatures|distance_to_cluster|closest_cluster| +-----+--------------------+--------------------+-------------------+---------------+ | 5.0|(784,[152,153,154...|[880.731433034386...| 1500.470703125| 0.0| | 0.0|(784,[127,128,129...|[1768.51722024166...| 1142.18359375| 4.0| | 4.0|(784,[160,161,162...|[704.949236329314...| 1386.246826171875| 9.0| | 1.0|(784,[158,159,160...|[-42.328192193771...| 1277.0736083984375| 5.0| | 9.0|(784,[208,209,210...|[374.043902028333...| 1211.00927734375| 3.0| | 2.0|(784,[155,156,157...|[941.267714528850...| 1496.157958984375| 8.0| | 1.0|(784,[124,125,126...|[30.2848596410594...| 1327.6766357421875| 5.0| | 3.0|(784,[151,152,153...|[1270.14374062052...| 1570.7674560546875| 0.0| | 1.0|(784,[152,153,154...|[-112.10792566485...| 1037.568359375| 5.0| | 4.0|(784,[134,135,161...|[452.068280676606...| 1165.1236572265625| 3.0| | 3.0|(784,[123,124,125...|[610.596447285397...| 1325.953369140625| 7.0| | 5.0|(784,[216,217,218...|[142.959601818422...| 1353.4930419921875| 5.0| | 3.0|(784,[143,144,145...|[1036.71862533658...| 1460.4315185546875| 7.0| | 6.0|(784,[72,73,74,99...|[996.740157435754...| 1159.8631591796875| 2.0| | 1.0|(784,[151,152,153...|[-107.26076167417...| 960.963623046875| 5.0| | 7.0|(784,[211,212,213...|[619.771820430940...| 1245.13623046875| 6.0| | 2.0|(784,[151,152,153...|[850.152101817161...| 1304.437744140625| 8.0| | 8.0|(784,[159,160,161...|[370.041887230547...| 1192.4781494140625| 0.0| | 6.0|(784,[100,101,102...|[546.674328209335...| 1277.0908203125| 2.0| | 9.0|(784,[209,210,211...|[-29.259112927426...| 1245.8182373046875| 6.0| +-----+--------------------+--------------------+-------------------+---------------+