Use the SageMakerEstimator in a Spark Pipeline
You can use org.apache.spark.ml.Estimator estimators and
org.apache.spark.ml.Model models, and
SageMakerEstimator estimators and SageMakerModel
models in org.apache.spark.ml.Pipeline pipelines, as shown in the
following example:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.PCA import org.apache.spark.sql.SparkSession import com.amazonaws.services.sagemaker.sparksdk.IAMRole import com.amazonaws.services.sagemaker.sparksdk.algorithms import com.amazonaws.services.sagemaker.sparksdk.algorithms.KMeansSageMakerEstimator val spark = SparkSession.builder.getOrCreate // load mnist data as a dataframe from libsvm val region = "us-east-1" val trainingData = spark.read.format("libsvm") .option("numFeatures", "784") .load(s"s3://sagemaker-sample-data-$region/spark/mnist/train/") val testData = spark.read.format("libsvm") .option("numFeatures", "784") .load(s"s3://sagemaker-sample-data-$region/spark/mnist/test/") // substitute your SageMaker IAM role here val roleArn = "arn:aws:iam::account-id:role/rolename" val pcaEstimator = new PCA() .setInputCol("features") .setOutputCol("projectedFeatures") .setK(50) val kMeansSageMakerEstimator = new KMeansSageMakerEstimator( sagemakerRole = IAMRole(integTestingRole), requestRowSerializer = new ProtobufRequestRowSerializer(featuresColumnName = "projectedFeatures"), trainingSparkDataFormatOptions = Map("featuresColumnName" -> "projectedFeatures"), trainingInstanceType = "ml.p2.xlarge", trainingInstanceCount = 1, endpointInstanceType = "ml.c4.xlarge", endpointInitialInstanceCount = 1) .setK(10).setFeatureDim(50) val pipeline = new Pipeline().setStages(Array(pcaEstimator, kMeansSageMakerEstimator)) // train val pipelineModel = pipeline.fit(trainingData) val transformedData = pipelineModel.transform(testData) transformedData.show()
The parameter trainingSparkDataFormatOptions configures Spark to
serialize to protobuf the "projectedFeatures" column for model training.
Additionally, Spark serializes to protobuf the "label" column by default.
Because we want to make inferences using the "projectedFeatures" column, we pass
the column name into the ProtobufRequestRowSerializer.
The following example shows a
transformed
DataFrame:
+-----+--------------------+--------------------+-------------------+---------------+ |label| features| projectedFeatures|distance_to_cluster|closest_cluster| +-----+--------------------+--------------------+-------------------+---------------+ | 5.0|(784,[152,153,154...|[880.731433034386...| 1500.470703125| 0.0| | 0.0|(784,[127,128,129...|[1768.51722024166...| 1142.18359375| 4.0| | 4.0|(784,[160,161,162...|[704.949236329314...| 1386.246826171875| 9.0| | 1.0|(784,[158,159,160...|[-42.328192193771...| 1277.0736083984375| 5.0| | 9.0|(784,[208,209,210...|[374.043902028333...| 1211.00927734375| 3.0| | 2.0|(784,[155,156,157...|[941.267714528850...| 1496.157958984375| 8.0| | 1.0|(784,[124,125,126...|[30.2848596410594...| 1327.6766357421875| 5.0| | 3.0|(784,[151,152,153...|[1270.14374062052...| 1570.7674560546875| 0.0| | 1.0|(784,[152,153,154...|[-112.10792566485...| 1037.568359375| 5.0| | 4.0|(784,[134,135,161...|[452.068280676606...| 1165.1236572265625| 3.0| | 3.0|(784,[123,124,125...|[610.596447285397...| 1325.953369140625| 7.0| | 5.0|(784,[216,217,218...|[142.959601818422...| 1353.4930419921875| 5.0| | 3.0|(784,[143,144,145...|[1036.71862533658...| 1460.4315185546875| 7.0| | 6.0|(784,[72,73,74,99...|[996.740157435754...| 1159.8631591796875| 2.0| | 1.0|(784,[151,152,153...|[-107.26076167417...| 960.963623046875| 5.0| | 7.0|(784,[211,212,213...|[619.771820430940...| 1245.13623046875| 6.0| | 2.0|(784,[151,152,153...|[850.152101817161...| 1304.437744140625| 8.0| | 8.0|(784,[159,160,161...|[370.041887230547...| 1192.4781494140625| 0.0| | 6.0|(784,[100,101,102...|[546.674328209335...| 1277.0908203125| 2.0| | 9.0|(784,[209,210,211...|[-29.259112927426...| 1245.8182373046875| 6.0| +-----+--------------------+--------------------+-------------------+---------------+