本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
执行多个查询(Amazon Athena、Amazon SNS)
本示例项目演示了如何连续和并行运行 Athena 查询,处理错误,然后根据查询成功或失败发送 Amazon SNS 通知。
在本项目中,Step Functions 使用状态机同步运行 Athena 查询。返回查询结果后,进入并行状态,并行执行两个 Athena 查询。然后它等待作业成功或失败,并发送一个 Amazon SNS 主题,其中包含有关作业是成功还是失败的消息。
第 1 步:创建状态机并预置资源
-
打开 Step Functions 控制台
,然后选择创建状态机。 -
在搜索框中键入
Execute multiple queries
,然后从返回的搜索结果中选择执行多个查询。 -
选择下一步以继续。
-
Step Functions 列出了您选择的示例项目中 Amazon Web Services 使用的。它还显示了示例项目的工作流图。将此项目部署到您的, Amazon Web Services 账户 或者将其用作构建您自己的项目的起点。根据您想继续的方式,选择运行演示或构建依据。
该示例项目部署了以下资源:
-
Amazon Athena 查询
-
一个 Amazon SNS 主题
-
一个 Amazon Step Functions 状态机
-
相关 Amazon Identity and Access Management (IAM) 角色
下图显示了执行多个查询示例项目的工作流图:
-
-
选择使用模板继续进行选择。
-
请执行以下操作之一:
-
如果您选择构建依据,Step Functions 将为您选择的示例项目创建工作流原型。Step Functions 不会部署工作流定义中列出的资源。
在 Workflow Studio 的设计模式下,从状态浏览器中拖放状态,继续构建工作流原型。或者切换到代码模式,该模式提供了一个类似于 VS Code 的集成代码编辑器,用于在 Step Functions 控制台中更新状态机的 Amazon States Language(ASL)定义。有关使用 Workflow Studio 构建状态机的更多信息,请参阅使用 Workflow Studio。
重要
请记住,在运行工作流之前,为示例项目中使用的资源更新占位符 Amazon 资源名称 (ARN)。
-
如果您选择了 “运行演示”,Step Functions 将创建一个只读示例项目,该项目使用 Amazon CloudFormation 模板将该模板中列出的 Amazon 资源部署到您的 Amazon Web Services 账户。
提示
要查看示例项目的状态机定义,请选择代码。
准备就绪后,选择部署并运行以部署示例项目并创建资源。
创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。在部署资源时,您可以打开 CloudFormation 堆栈 ID 链接以查看正在配置哪些资源。
创建示例项目中的所有资源后,您可以在状态机页面上看到新的示例项目。
重要
CloudFormation 模板中使用的每项服务都可能收取标准费用。
-
第 2 步:运行状态机
-
在状态机页面上,选择您的示例项目。
-
在示例项目页面上,选择启动执行。
-
在启动执行对话框中,执行以下操作:
-
(可选)要识别您的执行,您可以在名称框中为其指定一个名称。默认情况下,Step Functions 会自动生成一个唯一的执行名称。
注意
Step Functions 允许您为状态机、执行、活动和包含非 ASCII 字符的标签创建名称。这些非 ASCII 名称不适用于亚马逊。 CloudWatch为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。
-
(可选)在输入框中,以 JSON 格式输入输入值以便运行工作流。
如果您选择运行演示,则无需提供任何执行输入。
-
选择启动执行。
-
Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。
要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅“执行详细信息”页面 – 界面概述。
-
示例状态机代码
此示例项目中的状态机通过将参数直接传递给这些资源来与 Amazon Athena 和 Amazon SNS 集成。
浏览此示例状态机,了解 Step Functions 如何通过连接到 Resource
字段中的 Amazon 资源名称 (ARN),以及向服务 API 传递 Parameters
来控制 Amazon Athena 和 Amazon SNS。
有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅与其他服务 Amazon Step Functions 一起使用。
{
"Comment": "An example of using Athena to execute queries in sequence and parallel, with error handling and notifications.",
"StartAt": "Generate Example Data",
"States": {
"Generate Example Data": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<ATHENA_FUNCTION_NAME>"
},
"Next": "Load Data to Database"
},
"Load Data to Database": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Send query results"
}
],
"Next": "Map"
},
"Map": {
"Type": "Parallel",
"ResultSelector": {
"Query1Result.$": "$[0].ResultSet.Rows",
"Query2Result.$": "$[1].ResultSet.Rows"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Send query results"
}
],
"Branches": [
{
"StartAt": "Start Athena query 1",
"States": {
"Start Athena query 1": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Next": "Get Athena query 1 results"
},
"Get Athena query 1 results": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
},
"End": true
}
}
},
{
"StartAt": "Start Athena query 2",
"States": {
"Start Athena query 2": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Next": "Get Athena query 2 results"
},
"Get Athena query 2 results": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
},
"End": true
}
}
}
],
"Next": "Send query results"
},
"Send query results": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message.$": "$",
"TopicArn": "<SNS_TOPIC_ARN>"
},
"End": true
}
}
}
IAM 示例
示例项目生成的此示例 Amazon Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。
AthenaStartQueryExecution
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:startQueryExecution",
"athena:stopQueryExecution",
"athena:getQueryExecution",
"athena:getDataCatalog"
],
"Resource": [
"arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-ztuvu9yuix",
"arn:aws:athena:us-east-2:123456789012:datacatalog/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:DeleteDatabase",
"glue:CreateTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition",
"glue:DeletePartition",
"glue:BatchDeletePartition"
],
"Resource": [
"arn:aws:glue:us-east-2:123456789012:catalog",
"arn:aws:glue:us-east-2:123456789012:database/*",
"arn:aws:glue:us-east-2:123456789012:table/*",
"arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess"
],
"Resource": [
"*"
]
}
]
}
AthenaGetQueryResults
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:getQueryResults"
],
"Resource": [
"arn:aws:us-east-2:123456789012:workgroup/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}
SNSPublish
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaMultipleQueriese1ec229b-5cbe-4754-a8a8-078474bac878-SNSTopic-9AID0HEJT7TH"
]
}
]
}
LambdaInvokeFunction
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl:*"
]
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl"
]
}
]
}
有关在将 Step Functions 与其他 Amazon 服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 策略。