选择性检查点示例(快速工作流) - AWS Step Functions
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

选择性检查点示例(快速工作流)

此示例项目演示了如何通过运行执行选择性检查点的模拟电子商务工作流来组合标准和快速工作流。部署此示例项目将创建标准工作流状态机、嵌套的快速工作流状态机、AWS Lambda 函数、Amazon Simple Queue Service (Amazon SQS) 队列和 Amazon Simple Notification Service (Amazon SNS) 主题。

有关快速工作流、嵌套工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:

创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择 Create a state machine (创建状态机)

  2. 选择 Run a sample project (运行示例项目),然后选择 Selective checkpointing example (选择性检查点示例)

    此时将显示标准工作流状态机 Code (代码)Visual Workflow (可视工作流)

    
                    选择性检查点工作流。

    部署示例项目后,您可以查看所嵌套快速工作流的状态机 Code (代码)Visual Workflow (可视工作流)

  3. 选择 Next (下一步)

    此时将显示 Deploy resources (部署资源) 页面,其中列出了将创建的资源。对于本示例项目,资源包括:

    • Step Functions 状态机

    • Amazon SQS 队列

    • Lambda 函数

  4. 选择 Deploy Resources (部署资源)

    注意

    创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。当显示 Deploy resources (部署资源) 页面时,您可打开 Stack ID (堆栈 ID) 链接以查看正在预置的资源。

部署示例项目的资源后,执行以下操作。

启动新的执行

  1. 打开 Step Functions 控制台

  2. State machines (状态机) 页面上,选择示例项目创建的状态机,然后选择 Start execution (启动执行)

  3. New execution 页面上,输入执行名称 (可选),然后选择 Start Execution (开始执行)

  4. (可选)为帮助您标识执行,您可以在 Enter an execution name (输入执行名称) 框中为执行指定一个 ID。如果未输入 ID,Step Functions 将自动生成一个唯一 ID。

    注意

    Step Functions 允许您创建包含非 ASCII 字符的状态机、执行和活动名称。这些非 ASCII 名称不适用于 Amazon CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择一个只使用 ASCII 字符的名称。

  5. (可选)转到 Step Functions Dashboard (控制面板) 上新创建的状态机,然后选择 Start execution (启动执行)

  6. 执行完成后,您可以打开 Step Functions 控制台

  7. 转到您的 CloudWatch Logs 日志组并检查日志。日志组的名称将类似于 example-ExpressLogGroup-wJalrXUtnFEMI

父级(标准工作流)的示例状态机代码

此示例项目中的状态机与 Amazon SQS、Amazon SNS 和 Step Functions 快速工作流集成。

浏览此示例状态机,了解 Step Functions 如何处理来自 Amazon SQS 和 Amazon SNS 的输入,然后使用嵌套的快速工作流状态机更新后端系统。

有关 AWS Step Functions 如何控制其他 AWS 服务的更多信息,请参阅 服务与 AWS Step Functions 集成

{ "Comment": "An example of combining standard and express workflows to run a mock e-commerce workflow that does selective checkpointing.", "StartAt": "Approve Order Request", "States": { "Approve Order Request": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "MessageTitle": "Order Request received. Pausing workflow to wait for manual approval. ", "TaskToken.$": "$$.Task.Token" } }, "Next": "Notify Order Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Notify Order Failure" } ] }, "Notify Order Success": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order has been approved. Resuming workflow.", "TopicArn": "<SNS_ARN>" }, "Next": "Process Payment" }, "Notify Order Failure": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order not approved. Order failed.", "TopicArn": "<SNS_ARN>" }, "End": true }, "Process Payment": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "MessageTitle": "Payment sent to third-party for processing. Pausing workflow to wait for response.", "TaskToken.$": "$$.Task.Token" } }, "Next": "Notify Payment Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Notify Payment Failure" } ] }, "Notify Payment Success": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Payment processing succeeded. Resuming workflow.", "TopicArn": "<SNS_ARN>" }, "Next": "Workflow to Update Backend Systems" }, "Notify Payment Failure": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Payment processing failed.", "TopicArn": "<SNS_ARN>" }, "End": true }, "Workflow to Update Backend Systems": { "Comment": "Starting an execution of an Express workflow to handle backend updates. Express workflows are fast and cost-effective for steps where checkpointing isn't required.", "Type": "Task", "Resource": "arn:<PARTITION>:states:::states:startExecution.sync", "Parameters": { "StateMachineArn": "<UPDATE_DATABASE_EXPRESS_STATE_MACHINE_ARN>", "Input": { "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" } }, "Next": "Ship the Package" }, "Ship the Package": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::sns:publish", "Parameters": { "Message": "Order and payment received, database is updated and the package is ready to ship.", "TopicArn": "<SNS_ARN>" }, "End": true } } }

父状态机的示例 IAM 角色

示例项目生成的这些示例 AWS Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

Amazon SNS 策略:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "sns:Publish" ], "Resource": "arn:aws:sns:us-east-1:123456789012:Checkpoint-SNSTopic-wJalrXUtnFEMI", "Effect": "Allow" } ] }

Amazon SQS 策略:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "sqs:SendMessage" ], "Resource": "arn:aws:sqs:us-east-1:123456789012:Checkpoint-SQSQueue-je7MtGbClwBF", "Effect": "Allow" } ] }

状态执行策略:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "states:StartExecution", "states:DescribeExecution", "states:StopExecution" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "events:PutTargets", "events:PutRule", "events:DescribeRule" ], "Resource": "arn:aws:events:us-east-1:123456789012:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule", "Effect": "Allow" } ] }

嵌套状态机(快速工作流)的示例状态机代码

此示例项目中的状态机在父状态机调用时更新后端信息。

浏览此示例状态机,了解 Step Functions 如何更新模拟电子商务后端系统的不同组件。

有关 AWS Step Functions 如何控制其他 AWS 服务的更多信息,请参阅 服务与 AWS Step Functions 集成


                选择性检查点工作流。
{ "StartAt": "Update Order History", "States": { "Update Order History": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update order history." } }, "Next": "Update Data Warehouse" }, "Update Data Warehouse": { "Type" : "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update data warehouse." } }, "Next": "Update Customer Profile" }, "Update Customer Profile": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update customer profile." } }, "Next": "Update Inventory" }, "Update Inventory": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "Checkpoint-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI", "Payload": { "Message": "Update inventory." } }, "End": true } } }

子状态的示例 IAM 角色

示例项目生成的此示例 AWS Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws-cn:lambda:us-east-1:123456789012:function:Example-UpdateDatabaseLambdaFunction-wJalrXUtnFEMI" ], "Effect": "Allow" } ] }

以下策略确保 CloudWatch Logs 有足够的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他 AWS 服务一起使用时如何配置 IAM 的信息,请参阅 集成服务的 IAM 策略