SageMaker examples using SDK for JavaScript (v3) - Amazon SDK for JavaScript
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

The Amazon SDK for JavaScript V3 API Reference Guide describes in detail all the API operations for the Amazon SDK for JavaScript version 3 (V3).

SageMaker examples using SDK for JavaScript (v3)

The following code examples show you how to perform actions and implement common scenarios by using the Amazon SDK for JavaScript (v3) with SageMaker.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios and cross-service examples.

Scenarios are code examples that show you how to accomplish a specific task by calling multiple functions within the same service.

Each example includes a link to GitHub, where you can find instructions on how to set up and run the code in context.

Get started

The following code examples show how to get started using SageMaker.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

import { SageMakerClient, ListNotebookInstancesCommand, } from "@aws-sdk/client-sagemaker"; const client = new SageMakerClient({ region: "us-west-2", }); export const helloSagemaker = async () => { const command = new ListNotebookInstancesCommand({ MaxResults: 5 }); const response = await client.send(command); console.log( "Hello Amazon SageMaker! Let's list some of your notebook instances:", ); const instances = response.NotebookInstances || []; if (instances.length === 0) { console.log( "• No notebook instances found. Try creating one in the AWS Management Console or with the CreateNotebookInstanceCommand.", ); } else { console.log( instances .map( (i) => `• Instance: ${i.NotebookInstanceName}\n Arn:${ i.NotebookInstanceArn } \n Creation Date: ${i.CreationTime.toISOString()}`, ) .join("\n"), ); } return response; };

Actions

The following code example shows how to use CreatePipeline.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

A function that creates a SageMaker pipeline using a locally provided JSON definition.

/** * Create the Amazon SageMaker pipeline using a JSON pipeline definition. The definition * can also be provided as an Amazon S3 object using PipelineDefinitionS3Location. * @param {{roleArn: string, name: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient}} props */ export async function createSagemakerPipeline({ // Assumes an AWS IAM role has been created for this pipeline. roleArn, name, // Assumes an AWS Lambda function has been created for this pipeline. functionArn, sagemakerClient, }) { const pipelineDefinition = readFileSync( // dirnameFromMetaUrl is a local utility function. You can find its implementation // on GitHub. `${dirnameFromMetaUrl( import.meta.url, )}../../../../../workflows/sagemaker_pipelines/resources/GeoSpatialPipeline.json`, ) .toString() .replace(/\*FUNCTION_ARN\*/g, functionArn); const { PipelineArn } = await sagemakerClient.send( new CreatePipelineCommand({ PipelineName: name, PipelineDefinition: pipelineDefinition, RoleArn: roleArn, }), ); return { arn: PipelineArn, cleanUp: async () => { await sagemakerClient.send( new DeletePipelineCommand({ PipelineName: name }), ); }, }; }
  • For API details, see CreatePipeline in Amazon SDK for JavaScript API Reference.

The following code example shows how to use DeletePipeline.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

The syntax for deleting a SageMaker pipeline. This code is part of a larger function. Refer to 'Create a pipeline' or the GitHub repository for more context.

await sagemakerClient.send( new DeletePipelineCommand({ PipelineName: name }), );
  • For API details, see DeletePipeline in Amazon SDK for JavaScript API Reference.

The following code example shows how to use DescribePipelineExecution.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

Wait for a SageMaker pipeline execution to succeed, fail, or stop.

/** * Poll the executing pipeline until the status is 'SUCCEEDED', 'STOPPED', or 'FAILED'. * @param {{ arn: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient}} props */ export async function waitForPipelineComplete({ arn, sagemakerClient }) { const command = new DescribePipelineExecutionCommand({ PipelineExecutionArn: arn, }); let complete = false; let intervalInSeconds = 15; const COMPLETION_STATUSES = [ PipelineExecutionStatus.FAILED, PipelineExecutionStatus.STOPPED, PipelineExecutionStatus.SUCCEEDED, ]; do { const { PipelineExecutionStatus: status, FailureReason } = await sagemakerClient.send(command); complete = COMPLETION_STATUSES.includes(status); if (!complete) { console.log( `Pipeline is ${status}. Waiting ${intervalInSeconds} seconds before checking again.`, ); await wait(intervalInSeconds); } else if (status === PipelineExecutionStatus.FAILED) { throw new Error(`Pipeline failed because: ${FailureReason}`); } else if (status === PipelineExecutionStatus.STOPPED) { throw new Error(`Pipeline was forcefully stopped.`); } else { console.log(`Pipeline execution ${status}.`); } } while (!complete); }

The following code example shows how to use StartPipelineExecution.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

Start a SageMaker pipeline execution.

/** * Start the execution of the Amazon SageMaker pipeline. Parameters that are * passed in are used in the AWS Lambda function. * @param {{ * name: string, * sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient, * roleArn: string, * queueUrl: string, * s3InputBucketName: string, * }} props */ export async function startPipelineExecution({ sagemakerClient, name, bucketName, roleArn, queueUrl, }) { /** * The Vector Enrichment Job requests CSV data. This configuration points to a CSV * file in an Amazon S3 bucket. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobInputConfig} */ const inputConfig = { DataSourceConfig: { S3Data: { S3Uri: `s3://${bucketName}/input/sample_data.csv`, }, }, DocumentType: VectorEnrichmentJobDocumentType.CSV, }; /** * The Vector Enrichment Job adds additional data to the source CSV. This configuration points * to an Amazon S3 prefix where the output will be stored. * @type {import("@aws-sdk/client-sagemaker-geospatial").ExportVectorEnrichmentJobOutputConfig} */ const outputConfig = { S3Data: { S3Uri: `s3://${bucketName}/output/`, }, }; /** * This job will be a Reverse Geocoding Vector Enrichment Job. Reverse Geocoding requires * latitude and longitude values. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobConfig} */ const jobConfig = { ReverseGeocodingConfig: { XAttributeName: "Longitude", YAttributeName: "Latitude", }, }; const { PipelineExecutionArn } = await sagemakerClient.send( new StartPipelineExecutionCommand({ PipelineName: name, PipelineExecutionDisplayName: `${name}-example-execution`, PipelineParameters: [ { Name: "parameter_execution_role", Value: roleArn }, { Name: "parameter_queue_url", Value: queueUrl }, { Name: "parameter_vej_input_config", Value: JSON.stringify(inputConfig), }, { Name: "parameter_vej_export_config", Value: JSON.stringify(outputConfig), }, { Name: "parameter_step_1_vej_config", Value: JSON.stringify(jobConfig), }, ], }), ); return { arn: PipelineExecutionArn, }; }

Scenarios

The following code example shows how to:

  • Set up resources for a pipeline.

  • Set up a pipeline that executes a geospatial job.

  • Start a pipeline execution.

  • Monitor the status of the execution.

  • View the output of the pipeline.

  • Clean up resources.

For more information, see Create and run SageMaker pipelines using Amazon SDKs on Community.aws.

SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

The following file excerpt contains functions that use the SageMaker client to manage a pipeline.

import { readFileSync } from "fs"; import { CreateRoleCommand, DeleteRoleCommand, CreatePolicyCommand, DeletePolicyCommand, AttachRolePolicyCommand, DetachRolePolicyCommand, } from "@aws-sdk/client-iam"; import { PublishLayerVersionCommand, DeleteLayerVersionCommand, CreateFunctionCommand, Runtime, DeleteFunctionCommand, CreateEventSourceMappingCommand, DeleteEventSourceMappingCommand, } from "@aws-sdk/client-lambda"; import { PutObjectCommand, CreateBucketCommand, DeleteBucketCommand, paginateListObjectsV2, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, } from "@aws-sdk/client-s3"; import { CreatePipelineCommand, DeletePipelineCommand, DescribePipelineExecutionCommand, PipelineExecutionStatus, StartPipelineExecutionCommand, } from "@aws-sdk/client-sagemaker"; import { VectorEnrichmentJobDocumentType } from "@aws-sdk/client-sagemaker-geospatial"; import { CreateQueueCommand, DeleteQueueCommand, GetQueueAttributesCommand, } from "@aws-sdk/client-sqs"; import { dirnameFromMetaUrl } from "@aws-doc-sdk-examples/lib/utils/util-fs.js"; import { retry, wait } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; /** * Create the AWS IAM role that will be assumed by AWS Lambda. * @param {{ name: string, iamClient: import('@aws-sdk/client-iam').IAMClient }} props */ export async function createLambdaExecutionRole({ name, iamClient }) { const { Role } = await iamClient.send( new CreateRoleCommand({ RoleName: name, AssumeRolePolicyDocument: JSON.stringify({ Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["sts:AssumeRole"], Principal: { Service: ["lambda.amazonaws.com"] }, }, ], }), }), ); return { arn: Role.Arn, cleanUp: async () => { await iamClient.send(new DeleteRoleCommand({ RoleName: name })); }, }; } /** * Create an AWS IAM policy that will be attached to the AWS IAM role assumed by the AWS Lambda function. * The policy grants permission to work with Amazon SQS, Amazon CloudWatch, and Amazon SageMaker. * @param {{name: string, iamClient: import('@aws-sdk/client-iam').IAMClient, pipelineExecutionRoleArn: string}} props */ export async function createLambdaExecutionPolicy({ name, iamClient, pipelineExecutionRoleArn, }) { const policy = { Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "sagemaker-geospatial:StartVectorEnrichmentJob", "sagemaker-geospatial:GetVectorEnrichmentJob", "sagemaker:SendPipelineExecutionStepFailure", "sagemaker:SendPipelineExecutionStepSuccess", "sagemaker-geospatial:ExportVectorEnrichmentJob", ], Resource: "*", }, { Effect: "Allow", // The AWS Lambda function needs permission to pass the pipeline execution role to // the StartVectorEnrichmentCommand. This restriction prevents an AWS Lambda function // from elevating privileges. For more information, see: // https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_passrole.html Action: ["iam:PassRole"], Resource: `${pipelineExecutionRoleArn}`, Condition: { StringEquals: { "iam:PassedToService": [ "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com", ], }, }, }, ], }; const createPolicyCommand = new CreatePolicyCommand({ PolicyDocument: JSON.stringify(policy), PolicyName: name, }); const { Policy } = await iamClient.send(createPolicyCommand); return { arn: Policy.Arn, policy, cleanUp: async () => { await iamClient.send(new DeletePolicyCommand({ PolicyArn: Policy.Arn })); }, }; } /** * Attach an AWS IAM policy to an AWS IAM role. * @param {{roleName: string, policyArn: string, iamClient: import('@aws-sdk/client-iam').IAMClient}} props */ export async function attachPolicy({ roleName, policyArn, iamClient }) { const attachPolicyCommand = new AttachRolePolicyCommand({ RoleName: roleName, PolicyArn: policyArn, }); await iamClient.send(attachPolicyCommand); return { cleanUp: async () => { await iamClient.send( new DetachRolePolicyCommand({ RoleName: roleName, PolicyArn: policyArn, }), ); }, }; } /** * Create an AWS Lambda layer that contains the Amazon SageMaker and Amazon SageMaker Geospatial clients * in the runtime. The default runtime supports v3.188.0 of the JavaScript SDK. The Amazon SageMaker * Geospatial client wasn't introduced until v3.221.0. * @param {{ name: string, lambdaClient: import('@aws-sdk/client-lambda').LambdaClient }} props */ export async function createLambdaLayer({ name, lambdaClient }) { const layerPath = `${dirnameFromMetaUrl(import.meta.url)}lambda/nodejs.zip`; const { LayerVersionArn, Version } = await lambdaClient.send( new PublishLayerVersionCommand({ LayerName: name, Content: { ZipFile: Uint8Array.from(readFileSync(layerPath)), }, }), ); return { versionArn: LayerVersionArn, version: Version, cleanUp: async () => { await lambdaClient.send( new DeleteLayerVersionCommand({ LayerName: name, VersionNumber: Version, }), ); }, }; } /** * Deploy the AWS Lambda function that will be used to respond to Amazon SageMaker pipeline * execution steps. * @param {{roleArn: string, name: string, lambdaClient: import('@aws-sdk/client-lambda').LambdaClient, layerVersionArn: string}} props */ export async function createLambdaFunction({ name, roleArn, lambdaClient, layerVersionArn, }) { const lambdaPath = `${dirnameFromMetaUrl( import.meta.url, )}lambda/dist/index.mjs.zip`; const command = new CreateFunctionCommand({ Code: { ZipFile: Uint8Array.from(readFileSync(lambdaPath)), }, Runtime: Runtime.nodejs18x, Handler: "index.handler", Layers: [layerVersionArn], FunctionName: name, Role: roleArn, }); // Function creation fails if the Role is not ready. This retries // function creation until it succeeds or it times out. const { FunctionArn } = await retry( { intervalInMs: 1000, maxRetries: 60 }, () => lambdaClient.send(command), ); return { arn: FunctionArn, cleanUp: async () => { await lambdaClient.send( new DeleteFunctionCommand({ FunctionName: name }), ); }, }; } /** * This uploads some sample coordinate data to an Amazon S3 bucket. * The Amazon SageMaker Geospatial vector enrichment job will take the simple Lat/Long * coordinates in this file and augment them with more detailed location data. * @param {{bucketName: string, s3Client: import('@aws-sdk/client-s3').S3Client}} props */ export async function uploadCSVDataToS3({ bucketName, s3Client }) { const s3Path = `${dirnameFromMetaUrl( import.meta.url, )}../../../../../workflows/sagemaker_pipelines/resources/latlongtest.csv`; await s3Client.send( new PutObjectCommand({ Bucket: bucketName, Key: "input/sample_data.csv", Body: readFileSync(s3Path), }), ); } /** * Create the AWS IAM role that will be assumed by the Amazon SageMaker pipeline. * @param {{name: string, iamClient: import('@aws-sdk/client-iam').IAMClient}} props */ export async function createSagemakerRole({ name, iamClient }) { const command = new CreateRoleCommand({ RoleName: name, AssumeRolePolicyDocument: JSON.stringify({ Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["sts:AssumeRole"], Principal: { Service: [ "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com", ], }, }, ], }), }); const { Role } = await iamClient.send(command); // Wait for the role to be ready. await wait(10); return { arn: Role.Arn, cleanUp: async () => { await iamClient.send(new DeleteRoleCommand({ RoleName: name })); }, }; } /** * Create the Amazon SageMaker execution policy. This policy grants permission to * invoke the AWS Lambda function, read/write to the Amazon S3 bucket, and send messages to * the Amazon SQS queue. * @param {{ name: string, sqsQueueArn: string, lambdaArn: string, iamClient: import('@aws-sdk/client-iam').IAMClient, s3BucketName: string}} props */ export async function createSagemakerExecutionPolicy({ sqsQueueArn, lambdaArn, iamClient, name, s3BucketName, }) { const policy = { Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["lambda:InvokeFunction"], Resource: lambdaArn, }, { Effect: "Allow", Action: ["s3:*"], Resource: [ `arn:aws:s3:::${s3BucketName}`, `arn:aws:s3:::${s3BucketName}/*`, ], }, { Effect: "Allow", Action: ["sqs:SendMessage"], Resource: sqsQueueArn, }, ], }; const createPolicyCommand = new CreatePolicyCommand({ PolicyDocument: JSON.stringify(policy), PolicyName: name, }); const { Policy } = await iamClient.send(createPolicyCommand); return { arn: Policy.Arn, policy, cleanUp: async () => { await iamClient.send(new DeletePolicyCommand({ PolicyArn: Policy.Arn })); }, }; } /** * Create the Amazon SageMaker pipeline using a JSON pipeline definition. The definition * can also be provided as an Amazon S3 object using PipelineDefinitionS3Location. * @param {{roleArn: string, name: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient}} props */ export async function createSagemakerPipeline({ // Assumes an AWS IAM role has been created for this pipeline. roleArn, name, // Assumes an AWS Lambda function has been created for this pipeline. functionArn, sagemakerClient, }) { const pipelineDefinition = readFileSync( // dirnameFromMetaUrl is a local utility function. You can find its implementation // on GitHub. `${dirnameFromMetaUrl( import.meta.url, )}../../../../../workflows/sagemaker_pipelines/resources/GeoSpatialPipeline.json`, ) .toString() .replace(/\*FUNCTION_ARN\*/g, functionArn); const { PipelineArn } = await sagemakerClient.send( new CreatePipelineCommand({ PipelineName: name, PipelineDefinition: pipelineDefinition, RoleArn: roleArn, }), ); return { arn: PipelineArn, cleanUp: async () => { await sagemakerClient.send( new DeletePipelineCommand({ PipelineName: name }), ); }, }; } /** * Create an Amazon SQS queue. The Amazon SageMaker pipeline will send messages * to this queue that are then processed by the AWS Lambda function. * @param {{name: string, sqsClient: import('@aws-sdk/client-sqs').SQSClient}} props */ export async function createSQSQueue({ name, sqsClient }) { const { QueueUrl } = await sqsClient.send( new CreateQueueCommand({ QueueName: name, Attributes: { DelaySeconds: "5", ReceiveMessageWaitTimeSeconds: "5", VisibilityTimeout: "300", }, }), ); const { Attributes } = await sqsClient.send( new GetQueueAttributesCommand({ QueueUrl, AttributeNames: ["QueueArn"], }), ); return { queueUrl: QueueUrl, queueArn: Attributes.QueueArn, cleanUp: async () => { await sqsClient.send(new DeleteQueueCommand({ QueueUrl })); }, }; } /** * Configure the AWS Lambda function to long poll for messages from the Amazon SQS * queue. * @param {{lambdaName: string, queueArn: string, lambdaClient: import('@aws-sdk/client-lambda').LambdaClient, sqsClient: import('@aws-sdk/client-sqs').SQSClient}} props */ export async function configureLambdaSQSEventSource({ lambdaName, queueArn, lambdaClient, }) { const { UUID } = await lambdaClient.send( new CreateEventSourceMappingCommand({ EventSourceArn: queueArn, FunctionName: lambdaName, }), ); return { cleanUp: async () => { await lambdaClient.send( new DeleteEventSourceMappingCommand({ UUID, }), ); }, }; } /** * Create an Amazon S3 bucket that will store the simple coordinate file as input * and the output of the Amazon SageMaker Geospatial vector enrichment job. * @param {{s3Client: import('@aws-sdk/client-s3').S3Client, name: string}} props */ export async function createS3Bucket({ name, s3Client }) { await s3Client.send(new CreateBucketCommand({ Bucket: name })); return { cleanUp: async () => { const paginator = paginateListObjectsV2( { client: s3Client }, { Bucket: name }, ); for await (const page of paginator) { const objects = page.Contents; if (objects) { for (const object of objects) { await s3Client.send( new DeleteObjectCommand({ Bucket: name, Key: object.Key }), ); } } } await s3Client.send(new DeleteBucketCommand({ Bucket: name })); }, }; } /** * Start the execution of the Amazon SageMaker pipeline. Parameters that are * passed in are used in the AWS Lambda function. * @param {{ * name: string, * sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient, * roleArn: string, * queueUrl: string, * s3InputBucketName: string, * }} props */ export async function startPipelineExecution({ sagemakerClient, name, bucketName, roleArn, queueUrl, }) { /** * The Vector Enrichment Job requests CSV data. This configuration points to a CSV * file in an Amazon S3 bucket. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobInputConfig} */ const inputConfig = { DataSourceConfig: { S3Data: { S3Uri: `s3://${bucketName}/input/sample_data.csv`, }, }, DocumentType: VectorEnrichmentJobDocumentType.CSV, }; /** * The Vector Enrichment Job adds additional data to the source CSV. This configuration points * to an Amazon S3 prefix where the output will be stored. * @type {import("@aws-sdk/client-sagemaker-geospatial").ExportVectorEnrichmentJobOutputConfig} */ const outputConfig = { S3Data: { S3Uri: `s3://${bucketName}/output/`, }, }; /** * This job will be a Reverse Geocoding Vector Enrichment Job. Reverse Geocoding requires * latitude and longitude values. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobConfig} */ const jobConfig = { ReverseGeocodingConfig: { XAttributeName: "Longitude", YAttributeName: "Latitude", }, }; const { PipelineExecutionArn } = await sagemakerClient.send( new StartPipelineExecutionCommand({ PipelineName: name, PipelineExecutionDisplayName: `${name}-example-execution`, PipelineParameters: [ { Name: "parameter_execution_role", Value: roleArn }, { Name: "parameter_queue_url", Value: queueUrl }, { Name: "parameter_vej_input_config", Value: JSON.stringify(inputConfig), }, { Name: "parameter_vej_export_config", Value: JSON.stringify(outputConfig), }, { Name: "parameter_step_1_vej_config", Value: JSON.stringify(jobConfig), }, ], }), ); return { arn: PipelineExecutionArn, }; } /** * Poll the executing pipeline until the status is 'SUCCEEDED', 'STOPPED', or 'FAILED'. * @param {{ arn: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient}} props */ export async function waitForPipelineComplete({ arn, sagemakerClient }) { const command = new DescribePipelineExecutionCommand({ PipelineExecutionArn: arn, }); let complete = false; let intervalInSeconds = 15; const COMPLETION_STATUSES = [ PipelineExecutionStatus.FAILED, PipelineExecutionStatus.STOPPED, PipelineExecutionStatus.SUCCEEDED, ]; do { const { PipelineExecutionStatus: status, FailureReason } = await sagemakerClient.send(command); complete = COMPLETION_STATUSES.includes(status); if (!complete) { console.log( `Pipeline is ${status}. Waiting ${intervalInSeconds} seconds before checking again.`, ); await wait(intervalInSeconds); } else if (status === PipelineExecutionStatus.FAILED) { throw new Error(`Pipeline failed because: ${FailureReason}`); } else if (status === PipelineExecutionStatus.STOPPED) { throw new Error(`Pipeline was forcefully stopped.`); } else { console.log(`Pipeline execution ${status}.`); } } while (!complete); } /** * Return the string value of an Amazon S3 object. * @param {{ bucket: string, key: string, s3Client: import('@aws-sdk/client-s3').S3Client}} param0 */ export async function getObject({ bucket, s3Client }) { const prefix = "output/"; const { Contents } = await s3Client.send( new ListObjectsV2Command({ MaxKeys: 1, Bucket: bucket, Prefix: prefix }), ); if (!Contents.length) { throw new Error("No objects found in bucket."); } // Find the CSV file. const outputObject = Contents.find((obj) => obj.Key.endsWith(".csv")); if (!outputObject) { throw new Error(`No CSV file found in bucket with the prefix "${prefix}".`); } const { Body } = await s3Client.send( new GetObjectCommand({ Bucket: bucket, Key: outputObject.Key, }), ); return Body.transformToString(); }

This function is an excerpt from a file that uses the preceding library functions to set up a SageMaker pipeline, execute it, and delete all created resources.

import { retry, wait } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; import { attachPolicy, configureLambdaSQSEventSource, createLambdaExecutionPolicy, createLambdaExecutionRole, createLambdaFunction, createLambdaLayer, createS3Bucket, createSQSQueue, createSagemakerExecutionPolicy, createSagemakerPipeline, createSagemakerRole, getObject, startPipelineExecution, uploadCSVDataToS3, waitForPipelineComplete, } from "./lib.js"; import { MESSAGES } from "./messages.js"; export class SageMakerPipelinesWkflw { names = { LAMBDA_EXECUTION_ROLE: "sagemaker-wkflw-lambda-execution-role", LAMBDA_EXECUTION_ROLE_POLICY: "sagemaker-wkflw-lambda-execution-role-policy", LAMBDA_FUNCTION: "sagemaker-wkflw-lambda-function", LAMBDA_LAYER: "sagemaker-wkflw-lambda-layer", SAGE_MAKER_EXECUTION_ROLE: "sagemaker-wkflw-pipeline-execution-role", SAGE_MAKER_EXECUTION_ROLE_POLICY: "sagemaker-wkflw-pipeline-execution-role-policy", SAGE_MAKER_PIPELINE: "sagemaker-wkflw-pipeline", SQS_QUEUE: "sagemaker-wkflw-sqs-queue", S3_BUCKET: `sagemaker-wkflw-s3-bucket-${Date.now()}`, }; cleanUpFunctions = []; /** * @param {import("@aws-doc-sdk-examples/lib/prompter.js").Prompter} prompter * @param {import("@aws-doc-sdk-examples/lib/logger.js").Logger} logger * @param {{ IAM: import("@aws-sdk/client-iam").IAMClient, Lambda: import("@aws-sdk/client-lambda").LambdaClient, SageMaker: import("@aws-sdk/client-sagemaker").SageMakerClient, S3: import("@aws-sdk/client-s3").S3Client, SQS: import("@aws-sdk/client-sqs").SQSClient }} clients */ constructor(prompter, logger, clients) { this.prompter = prompter; this.logger = logger; this.clients = clients; } async run() { try { await this.startWorkflow(); } catch (err) { console.error(err); throw err; } finally { this.logger.logSeparator(); const doCleanUp = await this.prompter.confirm({ message: "Clean up resources?", }); if (doCleanUp) { await this.cleanUp(); } } } async cleanUp() { // Run all of the clean up functions. If any fail, we log the error and continue. // This ensures all clean up functions are run. for (let i = this.cleanUpFunctions.length - 1; i >= 0; i--) { await retry( { intervalInMs: 1000, maxRetries: 60, swallowError: true }, this.cleanUpFunctions[i], ); } } async startWorkflow() { this.logger.logSeparator(MESSAGES.greetingHeader); await this.logger.log(MESSAGES.greeting); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingRole.replace( "${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE, ), ); // Create an IAM role that will be assumed by the AWS Lambda function. This function // is triggered by Amazon SQS messages and calls SageMaker and SageMaker GeoSpatial actions. const { arn: lambdaExecutionRoleArn, cleanUp: lambdaExecutionRoleCleanUp } = await createLambdaExecutionRole({ name: this.names.LAMBDA_EXECUTION_ROLE, iamClient: this.clients.IAM, }); // Add a clean up step to a stack for every resource created. this.cleanUpFunctions.push(lambdaExecutionRoleCleanUp); await this.logger.log( MESSAGES.roleCreated.replace( "${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingRole.replace( "${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE, ), ); // Create an IAM role that will be assumed by the SageMaker pipeline. The pipeline // sends messages to an Amazon SQS queue and puts/retrieves Amazon S3 objects. const { arn: pipelineExecutionRoleArn, cleanUp: pipelineExecutionRoleCleanUp, } = await createSagemakerRole({ iamClient: this.clients.IAM, name: this.names.SAGE_MAKER_EXECUTION_ROLE, }); this.cleanUpFunctions.push(pipelineExecutionRoleCleanUp); await this.logger.log( MESSAGES.roleCreated.replace( "${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE, ), ); this.logger.logSeparator(); // Create an IAM policy that allows the AWS Lambda function to invoke SageMaker APIs. const { arn: lambdaExecutionPolicyArn, policy: lambdaPolicy, cleanUp: lambdaExecutionPolicyCleanUp, } = await createLambdaExecutionPolicy({ name: this.names.LAMBDA_EXECUTION_ROLE_POLICY, s3BucketName: this.names.S3_BUCKET, iamClient: this.clients.IAM, pipelineExecutionRoleArn, }); this.cleanUpFunctions.push(lambdaExecutionPolicyCleanUp); console.log(JSON.stringify(lambdaPolicy, null, 2), "\n"); await this.logger.log( MESSAGES.attachPolicy .replace("${POLICY_NAME}", this.names.LAMBDA_EXECUTION_ROLE_POLICY) .replace("${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE), ); await this.prompter.checkContinue(); // Attach the Lambda execution policy to the execution role. const { cleanUp: lambdaExecutionRolePolicyCleanUp } = await attachPolicy({ roleName: this.names.LAMBDA_EXECUTION_ROLE, policyArn: lambdaExecutionPolicyArn, iamClient: this.clients.IAM, }); this.cleanUpFunctions.push(lambdaExecutionRolePolicyCleanUp); await this.logger.log(MESSAGES.policyAttached); this.logger.logSeparator(); // Create Lambda layer for SageMaker packages. const { versionArn: layerVersionArn, cleanUp: lambdaLayerCleanUp } = await createLambdaLayer({ name: this.names.LAMBDA_LAYER, lambdaClient: this.clients.Lambda, }); this.cleanUpFunctions.push(lambdaLayerCleanUp); await this.logger.log( MESSAGES.creatingFunction.replace( "${FUNCTION_NAME}", this.names.LAMBDA_FUNCTION, ), ); // Create the Lambda function with the execution role. const { arn: lambdaArn, cleanUp: lambdaCleanUp } = await createLambdaFunction({ roleArn: lambdaExecutionRoleArn, lambdaClient: this.clients.Lambda, name: this.names.LAMBDA_FUNCTION, layerVersionArn, }); this.cleanUpFunctions.push(lambdaCleanUp); await this.logger.log( MESSAGES.functionCreated.replace( "${FUNCTION_NAME}", this.names.LAMBDA_FUNCTION, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingSQSQueue.replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); // Create an SQS queue for the SageMaker pipeline. const { queueUrl, queueArn, cleanUp: queueCleanUp, } = await createSQSQueue({ name: this.names.SQS_QUEUE, sqsClient: this.clients.SQS, }); this.cleanUpFunctions.push(queueCleanUp); await this.logger.log( MESSAGES.sqsQueueCreated.replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.configuringLambdaSQSEventSource .replace("${LAMBDA_NAME}", this.names.LAMBDA_FUNCTION) .replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); // Configure the SQS queue as an event source for the Lambda. const { cleanUp: lambdaSQSEventSourceCleanUp } = await configureLambdaSQSEventSource({ lambdaArn, lambdaName: this.names.LAMBDA_FUNCTION, queueArn, sqsClient: this.clients.SQS, lambdaClient: this.clients.Lambda, }); this.cleanUpFunctions.push(lambdaSQSEventSourceCleanUp); await this.logger.log( MESSAGES.lambdaSQSEventSourceConfigured .replace("${LAMBDA_NAME}", this.names.LAMBDA_FUNCTION) .replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); this.logger.logSeparator(); // Create an IAM policy that allows the SageMaker pipeline to invoke AWS Lambda // and send messages to the Amazon SQS queue. const { arn: pipelineExecutionPolicyArn, policy: sagemakerPolicy, cleanUp: pipelineExecutionPolicyCleanUp, } = await createSagemakerExecutionPolicy({ sqsQueueArn: queueArn, lambdaArn, iamClient: this.clients.IAM, name: this.names.SAGE_MAKER_EXECUTION_ROLE_POLICY, s3BucketName: this.names.S3_BUCKET, }); this.cleanUpFunctions.push(pipelineExecutionPolicyCleanUp); console.log(JSON.stringify(sagemakerPolicy, null, 2)); await this.logger.log( MESSAGES.attachPolicy .replace("${POLICY_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE_POLICY) .replace("${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE), ); await this.prompter.checkContinue(); // Attach the SageMaker execution policy to the execution role. const { cleanUp: pipelineExecutionRolePolicyCleanUp } = await attachPolicy({ roleName: this.names.SAGE_MAKER_EXECUTION_ROLE, policyArn: pipelineExecutionPolicyArn, iamClient: this.clients.IAM, }); this.cleanUpFunctions.push(pipelineExecutionRolePolicyCleanUp); // Wait for the role to be ready. If the role is used immediately, // the pipeline will fail. await wait(5); await this.logger.log(MESSAGES.policyAttached); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingPipeline.replace( "${PIPELINE_NAME}", this.names.SAGE_MAKER_PIPELINE, ), ); // Create the SageMaker pipeline. const { cleanUp: pipelineCleanUp } = await createSagemakerPipeline({ roleArn: pipelineExecutionRoleArn, functionArn: lambdaArn, sagemakerClient: this.clients.SageMaker, name: this.names.SAGE_MAKER_PIPELINE, }); this.cleanUpFunctions.push(pipelineCleanUp); await this.logger.log( MESSAGES.pipelineCreated.replace( "${PIPELINE_NAME}", this.names.SAGE_MAKER_PIPELINE, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingS3Bucket.replace("${BUCKET_NAME}", this.names.S3_BUCKET), ); // Create an S3 bucket for storing inputs and outputs. const { cleanUp: s3BucketCleanUp } = await createS3Bucket({ name: this.names.S3_BUCKET, s3Client: this.clients.S3, }); this.cleanUpFunctions.push(s3BucketCleanUp); await this.logger.log( MESSAGES.s3BucketCreated.replace("${BUCKET_NAME}", this.names.S3_BUCKET), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.uploadingInputData.replace( "${BUCKET_NAME}", this.names.S3_BUCKET, ), ); // Upload CSV Lat/Long data to S3. await uploadCSVDataToS3({ bucketName: this.names.S3_BUCKET, s3Client: this.clients.S3, }); await this.logger.log(MESSAGES.inputDataUploaded); this.logger.logSeparator(); await this.prompter.checkContinue(MESSAGES.executePipeline); // Execute the SageMaker pipeline. const { arn: pipelineExecutionArn } = await startPipelineExecution({ name: this.names.SAGE_MAKER_PIPELINE, sagemakerClient: this.clients.SageMaker, roleArn: pipelineExecutionRoleArn, bucketName: this.names.S3_BUCKET, queueUrl, }); // Wait for the pipeline execution to finish. await waitForPipelineComplete({ arn: pipelineExecutionArn, sagemakerClient: this.clients.SageMaker, }); this.logger.logSeparator(); await this.logger.log(MESSAGES.outputDelay); // The getOutput function will throw an error if the output is not // found. The retry function will retry a failed function call once // ever 10 seconds for 2 minutes. const output = await retry({ intervalInMs: 10000, maxRetries: 12 }, () => getObject({ bucket: this.names.S3_BUCKET, s3Client: this.clients.S3, }), ); this.logger.logSeparator(); await this.logger.log(MESSAGES.outputDataRetrieved); console.log(output.split("\n").slice(0, 6).join("\n")); } }