本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
了解 Step Functions 中的活动
使用 Step Functions 活动,可以在状态机中设置任务,其中实际工作由在 Step Functions 之外运行的工作线程 执行。例如,可以在 Amazon Elastic Compute Cloud(Amazon EC2)、Amazon Elastic Container Service(Amazon ECS)甚或是移动设备上运行工作线程程序。
概述
在 Amazon Step Functions 中,可以将正在某处运行的代码(称为活动工作线程)与状态机中的特定任务关联起来。您可以在 Step Functions 控制台中或通过调用 CreateActivity
创建一个活动。这样会为任务状态提供 Amazon 资源名称 (ARN)。使用此 ARN 可以轮询活动工作线程中的工作的任务状态。
注意
活动没有版本控制,并应向后兼容。如果您必须对活动执行无法向后兼容的更改,则应通过 Step Functions 使用唯一名称创建一个新 活动。
活动工作线程可以是在 Amazon EC2 实例上运行的应用程序、Amazon Lambda 函数、移动设备:任何可以进行 HTTP 连接、在任何位置托管的应用程序。当 Step Functions 达到某种活动任务状态,工作流程就等待活动工作线程轮询任务。活动工作线程通过使用 GetActivityTask
并发送相关活动的 ARN 来轮询 Step Functions,GetActivityTask
返回响应,其中包括 input
(任务的 JSON 字符串输入)和 taskToken
(任务的唯一标识符)。活动工作线程完成其工作后,可以使用 SendTaskSuccess
或 SendTaskFailure
提供成功或失败报告。这两个调用使用 GetActivityTask
提供的 taskToken
将结果与该任务关联起来。
与活动任务相关的 API
Step Functions 提供用于创建和列出活动、请求任务和根据工作线程结果管理状态机流程的 API。
下面是与活动相关的 Step Functions API:
注意
在某些实现中,通过 GetActivityTask
轮询活动任务可能会导致延迟。请参阅 避免轮询活动任务时发生延迟。
等待完成活动任务
通过在任务定义中设置 TimeoutSeconds
配置状态等待时长 要使任务保持为活动和等待状态,请在 TimeoutSeconds
中配置的时间内,定期使用 SendTaskHeartbeat
从活动工作线程发送检测信号。通过配置较长的超时时间和积极发送检测信号,Step Functions 中的活动最长可以等待一年时间以完成执行。
例如,如果需要工作流程等待长时间进程的结果,请执行以下操作:
-
使用控制台或者使用
CreateActivity
创建活动。记下活动 ARN。 -
在状态机定义中的活动任务状态中引用该 ARN 并设置
TimeoutSeconds
。 -
使用
GetActivityTask
并引用该活动 ARN 实现用于轮询工作的活动工作线程。 -
在状态机任务定义的
HeartbeatSeconds
所设置的时间内,定期使用SendTaskHeartbeat
,以防止任务超时。 -
启动状态机执行。
-
启动活动工作进程。
执行在相应活动任务状态时暂停,等待活动工作线程轮询任务。一旦 taskToken
提供给活动工作线程,工作流程将等待 SendTaskSuccess
或 SendTaskFailure
提供状态。如果执行在 TimeoutSeconds
中配置的时间之前未收到上述任一状态或 SendTaskHeartbeat
调用,则执行将失败,执行历史记录将包含 ExecutionTimedOut
事件。
示例:Ruby 中的活动工作线程
以下示例活动工作线程代码实现使用者/生产者模式,并为轮询器和活动工作线程提供可配置的线程数量。轮询器线程不断地长时间轮询 Step Functions 中的活动任务。当检索到活动任务时,则将该任务传递穿过一个有界的阻塞队列,供活动线程领取任务。
-
有关更多信息,请参阅 Amazon SDK for Ruby API 参考。
-
要下载此代码及相关资源,请参阅 GitHub 上的 step-functions-ruby-activity-worker
存储库。
以下代码是此示例 Ruby 活动工作线程的主入口点。
require_relative '
../lib/step_functions/activity
' credentials = Aws::SharedCredentials.new region = 'us-west-2
' activity_arn = 'ACTIVITY_ARN
' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # Start method block contains your custom implementation to process the input activity.start do |input| { result: :SUCCESS, echo: input['value'] } end
必须指定活动 ARN 和区域。该代码包括您可以设置的默认值,例如线程数和检测信号延迟。
项目 | 描述 |
---|---|
|
下面示例活动工作线程代码的相对路径。 |
|
活动的 Amazon 区域。 |
|
活动工作线程的线程数。对于大多数实现来说,10 到 20 个线程就足够了。活动的处理时间越长,可能需要的线程就越多。您可以使用以下方式进行估算:将每秒处理活动数乘以第 99 个百分点的活动处理延迟 (以秒为单位)。 |
|
轮询器的线程数。对于大多数实现来说,10 到 20 个线程就足够了。 |
|
检测信号间的延迟 (以秒为单位)。 |
input |
活动的实现逻辑。 |
后续步骤
要更详细地了解如何创建使用活动工作线程的状态机,请参阅: