执行多个查询(Amazon Athena、Amazon SNS) - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

执行多个查询(Amazon Athena、Amazon SNS)

此示例项目演示了如何依次运行 Athena 查询,然后并parallel 运行,处理错误,然后根据查询成功还是失败发送 Amazon SNS 通知。部署此示例项目将创建Amazon Step Functions状态机、Amazon Athena 查询和 Amazon SNS 主题。

在这个项目中,Step Functions 使用状态机同步运行 Athena 查询。返回查询结果后,进入并行状态,并行执行两个 Athena 查询。然后,它等待任务成功或失败,并发送一个 Amazon SNS 主题,其中包含有关任务成功还是失败的消息。

创建状态机并预置资源

  1. https://console.aws.amazon.com/athena/ 打开 Amazon Athena 控制台。

  2. 在左侧导航窗格中,选择 Instorces (工作流)。

  3. 在 “执行多个查询” 图块中,选择 “开始”。

  4. 在 “开始” 对话框中,选择 “部署示例项目”,然后选择 “继续”。

  5. 您将被重定向到 Step Functions 控制台的 “查看” 工作流程页面。查看为示例项目自动生成的亚马逊州语言定义。

    将显示状态机工作流定义可视工作流

    
                        运行多个查询工作流程。
  6. 选择 Next(下一步)

    将显示 “部署并运行” 页面,其中列出了将要创建的资源。此示例项目创建了以下资源:

    • Amazon Athena

    • Lambda 函数

    • 一个 Amazon S3 存储桶

    • 一个 Amazon SNS 主题

    • 一个Amazon Glue数据库

  7. 选择 “部署并运行”

    注意

    创建这些资源和相关的 IAM 权限可能需要长达 10 分钟时间。在显示 “部署并运行” 页面时,您可以打开 Stack ID 链接以查看正在配置哪些资源。

启动新的执行

  1. New execution 页面上,输入执行名称 (可选),然后选择 Start Execution (开始执行)

  2. (可选)要识别您的执行情况,可以在 “名称” 框中为其指定名称。默认情况下,Step Functions 会自动生成唯一的执行名称。

    注意

    Step Functions 允许您为包含非 ASCII 字符的状态机、执行、活动和标签创建名称。这些非 ASCII 名称不适用于亚马逊 CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。

  3. 或者,您可以转到 Step Functions 仪表板上新创建的状态机,然后选择 “新建执行”。

  4. 执行完成后,您可以在 Visual workflow (可视工作流) 上选择状态,并浏览 Step details (步骤详细信息) 下的 Input (输入)Output (输出)

示例状态机代码

此示例项目中的状态机通过将参数直接传递给 Amazon Athena 和 Amazon SNS 来与 Amazon SNS 集成。

浏览此示例状态机,了解 Step Functions 如何通过连接到Resource字段中的亚马逊资源名称 (ARN) 并传递Parameters给服务 API 来控制 Amazon Athena 和 Amazon SNS。

有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅将 Amazon Step Functions 与其他服务一起使用

{ "Comment": "An example of using Athena to execute queries in sequence and parallel, with error handling and notifications.", "StartAt": "Generate Example Data", "States": { "Generate Example Data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<ATHENA_FUNCTION_NAME>" }, "Next": "Load Data to Database" }, "Load Data to Database": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Next": "Map" }, "Map": { "Type": "Parallel", "ResultSelector": { "Query1Result.$": "$[0].ResultSet.Rows", "Query2Result.$": "$[1].ResultSet.Rows" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Branches": [ { "StartAt": "Start Athena query 1", "States": { "Start Athena query 1": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 1 results" }, "Get Athena query 1 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } }, { "StartAt": "Start Athena query 2", "States": { "Start Athena query 2": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 2 results" }, "Get Athena query 2 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } } ], "Next": "Send query results" }, "Send query results": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message.$": "$", "TopicArn": "<SNS_TOPIC_ARN>" }, "End": true } } }

IAM 示例

示例项目生成的此示例Amazon Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议您在 IAM 策略中仅包含必要的权限。

AthenaStartQueryExecution

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-ztuvu9yuix", "arn:aws:athena:us-east-2:123456789012:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:us-east-2:123456789012:catalog", "arn:aws:glue:us-east-2:123456789012:database/*", "arn:aws:glue:us-east-2:123456789012:table/*", "arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
AthenaGetQueryResults

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:getQueryResults" ], "Resource": [ "arn:aws:us-east-2:123456789012:workgroup/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
snsPublish

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaMultipleQueriese1ec229b-5cbe-4754-a8a8-078474bac878-SNSTopic-9AID0HEJT7TH" ] } ] }
LambdaInvokeFunction

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl:*" ] }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl" ] } ] }

有关在将Step Functions与其他Amazon服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 政策