本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
执行多个查询(Amazon Athena、Amazon SNS)
此示例项目演示了如何依次运行 Athena 查询,然后并parallel 运行,处理错误,然后根据查询成功还是失败发送 Amazon SNS 通知。部署此示例项目将创建Amazon Step Functions状态机、Amazon Athena 查询和 Amazon SNS 主题。
在这个项目中,Step Functions 使用状态机同步运行 Athena 查询。返回查询结果后,进入并行状态,并行执行两个 Athena 查询。然后,它等待任务成功或失败,并发送一个 Amazon SNS 主题,其中包含有关任务成功还是失败的消息。
创建状态机并预置资源
从 https://console.aws.amazon.com/athena/
打开 Amazon Athena 控制台。 -
在左侧导航窗格中,选择 Instorces (工作流)。
-
在 “执行多个查询” 图块中,选择 “开始”。
-
在 “开始” 对话框中,选择 “部署示例项目”,然后选择 “继续”。
-
您将被重定向到 Step Functions 控制台的 “查看” 工作流程页面。查看为示例项目自动生成的亚马逊州语言定义。
将显示状态机工作流定义和可视工作流。
-
选择 Next(下一步)。
将显示 “部署并运行” 页面,其中列出了将要创建的资源。此示例项目创建了以下资源:
-
Amazon Athena
-
Lambda 函数
-
一个 Amazon S3 存储桶
-
一个 Amazon SNS 主题
-
一个Amazon Glue数据库
-
-
选择 “部署并运行”。
注意 创建这些资源和相关的 IAM 权限可能需要长达 10 分钟时间。在显示 “部署并运行” 页面时,您可以打开 Stack ID 链接以查看正在配置哪些资源。
启动新的执行
-
在 New execution 页面上,输入执行名称 (可选),然后选择 Start Execution (开始执行)。
(可选)要识别您的执行情况,可以在 “名称” 框中为其指定名称。默认情况下,Step Functions 会自动生成唯一的执行名称。
注意 Step Functions 允许您为包含非 ASCII 字符的状态机、执行、活动和标签创建名称。这些非 ASCII 名称不适用于亚马逊 CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。
-
或者,您可以转到 Step Functions 仪表板上新创建的状态机,然后选择 “新建执行”。
-
执行完成后,您可以在 Visual workflow (可视工作流) 上选择状态,并浏览 Step details (步骤详细信息) 下的 Input (输入) 和 Output (输出)。
示例状态机代码
此示例项目中的状态机通过将参数直接传递给 Amazon Athena 和 Amazon SNS 来与 Amazon SNS 集成。
浏览此示例状态机,了解 Step Functions 如何通过连接到Resource
字段中的亚马逊资源名称 (ARN) 并传递Parameters
给服务 API 来控制 Amazon Athena 和 Amazon SNS。
有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅将 Amazon Step Functions 与其他服务一起使用。
{
"Comment": "An example of using Athena to execute queries in sequence and parallel, with error handling and notifications.",
"StartAt": "Generate Example Data",
"States": {
"Generate Example Data": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<ATHENA_FUNCTION_NAME>"
},
"Next": "Load Data to Database"
},
"Load Data to Database": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Send query results"
}
],
"Next": "Map"
},
"Map": {
"Type": "Parallel",
"ResultSelector": {
"Query1Result.$": "$[0].ResultSet.Rows",
"Query2Result.$": "$[1].ResultSet.Rows"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Send query results"
}
],
"Branches": [
{
"StartAt": "Start Athena query 1",
"States": {
"Start Athena query 1": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Next": "Get Athena query 1 results"
},
"Get Athena query 1 results": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
},
"End": true
}
}
},
{
"StartAt": "Start Athena query 2",
"States": {
"Start Athena query 2": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "<ATHENA_QUERYSTRING>",
"WorkGroup": "<ATHENA_WORKGROUP>"
},
"Next": "Get Athena query 2 results"
},
"Get Athena query 2 results": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
},
"End": true
}
}
}
],
"Next": "Send query results"
},
"Send query results": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message.$": "$",
"TopicArn": "<SNS_TOPIC_ARN>"
},
"End": true
}
}
}
IAM 示例
示例项目生成的此示例Amazon Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议您在 IAM 策略中仅包含必要的权限。
AthenaStartQueryExecution
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:startQueryExecution",
"athena:stopQueryExecution",
"athena:getQueryExecution",
"athena:getDataCatalog"
],
"Resource": [
"arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-ztuvu9yuix",
"arn:aws:athena:us-east-2:123456789012:datacatalog/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:DeleteDatabase",
"glue:CreateTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition",
"glue:DeletePartition",
"glue:BatchDeletePartition"
],
"Resource": [
"arn:aws:glue:us-east-2:123456789012:catalog",
"arn:aws:glue:us-east-2:123456789012:database/*",
"arn:aws:glue:us-east-2:123456789012:table/*",
"arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess"
],
"Resource": [
"*"
]
}
]
}
AthenaGetQueryResults
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:getQueryResults"
],
"Resource": [
"arn:aws:us-east-2:123456789012:workgroup/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}
snsPublish
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaMultipleQueriese1ec229b-5cbe-4754-a8a8-078474bac878-SNSTopic-9AID0HEJT7TH"
]
}
]
}
LambdaInvokeFunction
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl:*"
]
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl"
]
}
]
}
有关在将Step Functions与其他Amazon服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 政策。