执行多个查询(Amazon Athena、Amazon SNS) - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

执行多个查询(Amazon Athena、Amazon SNS)

本示例项目演示了如何连续和并行运行 Athena 查询,处理错误,然后根据查询成功或失败发送 Amazon SNS 通知。

在本项目中,Step Functions 使用状态机同步运行 Athena 查询。返回查询结果后,进入并行状态,并行执行两个 Athena 查询。然后它等待作业成功或失败,并发送一个 Amazon SNS 主题,其中包含有关作业是成功还是失败的消息。

第 1 步:创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 在搜索框中键入 Execute multiple queries,然后从返回的搜索结果中选择执行多个查询

  3. 选择下一步以继续。

  4. Step Functions 列出了您选择的示例项目中 Amazon Web Services 使用的。它还显示了示例项目的工作流图。将此项目部署到您的, Amazon Web Services 账户 或者将其用作构建您自己的项目的起点。根据您想继续的方式,选择运行演示构建依据

    该示例项目部署了以下资源:

    • Amazon Athena 查询

    • 一个 Amazon SNS 主题

    • 一个 Amazon Step Functions 状态机

    • 相关 Amazon Identity and Access Management (IAM) 角色

    下图显示了执行多个查询示例项目的工作流图:

    
                        执行多个查询示例项目的工作流图。
  5. 选择使用模板继续进行选择。

  6. 请执行以下操作之一:

    • 如果您选择构建依据,Step Functions 将为您选择的示例项目创建工作流原型。Step Functions 不会部署工作流定义中列出的资源。

      在 Workflow Studio 的设计模式下,从状态浏览器中拖放状态,继续构建工作流原型。或者切换到代码模式,该模式提供了一个类似于 VS Code 的集成代码编辑器,用于在 Step Functions 控制台中更新状态机的 Amazon States Language (ASL) 定义。有关使用 Workflow Studio 构建状态机的更多信息,请参阅使用 Workflow Studio

      重要

      请记住,在运行工作流之前,为示例项目中使用的资源更新占位符 Amazon 资源名称 (ARN)。

    • 如果您选择了 “运行演示”,Step Functions 将创建一个只读示例项目,该项目使用 Amazon CloudFormation 模板将该模板中列出的 Amazon 资源部署到您的 Amazon Web Services 账户。

      提示

      要查看示例项目的状态机定义,请选择代码

      准备就绪后,选择部署并运行以部署示例项目并创建资源。

      创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。在部署资源时,您可以打开 CloudFormation 堆栈 ID 链接以查看正在配置哪些资源。

      创建示例项目中的所有资源后,您可以在状态机页面上看到新的示例项目。

      重要

      CloudFormation 模板中使用的每项服务都可能收取标准费用。

第 2 步:运行状态机

  1. 状态机页面上,选择您的示例项目。

  2. 在示例项目页面上,选择启动执行

  3. 启动执行对话框中,执行以下操作:

    1. (可选)要识别您的执行,您可以在名称框中为其指定一个名称。默认情况下,Step Functions 会自动生成一个唯一的执行名称。

      注意

      Step Functions 允许您为状态机、执行、活动、速率控制和包含非 ASCII 字符的标签创建名称。这些非 ASCII 名称不适用于亚马逊。 CloudWatch为确保您可以跟踪 CloudWatch 指标,请选择仅使用 ASCII 字符的名称。

    2. (可选)在输入框中,以 JSON 格式输入输入值以便运行工作流。

      如果您选择运行演示,则无需提供任何执行输入。

    3. 选择启动执行

    4. Step Functions 控制台会将您引导到一个以您的执行 ID 为标题的页面。该页面被称为执行详细信息页面。在此页面上,您可以随着执行的进展或者在执行完成后查看执行结果。

      要查看执行结果,请在图表视图上选择各个状态,然后在步骤详细信息窗格中选择各个选项卡,分别查看每个状态的详细信息,包括输入、输出和定义。有关可在执行详细信息页面上查看的执行信息的详细信息,请参阅“执行详细信息”页面 – 界面概述

示例状态机代码

此示例项目中的状态机通过将参数直接传递给这些资源来与 Amazon Athena 和 Amazon SNS 集成。

浏览此示例状态机,了解 Step Functions 如何通过连接到 Resource 字段中的 Amazon 资源名称 (ARN),以及向服务 API 传递 Parameters 来控制 Amazon Athena 和 Amazon SNS。

有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅与其他服务 Amazon Step Functions 一起使用

{ "Comment": "An example of using Athena to execute queries in sequence and parallel, with error handling and notifications.", "StartAt": "Generate Example Data", "States": { "Generate Example Data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<ATHENA_FUNCTION_NAME>" }, "Next": "Load Data to Database" }, "Load Data to Database": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Next": "Map" }, "Map": { "Type": "Parallel", "ResultSelector": { "Query1Result.$": "$[0].ResultSet.Rows", "Query2Result.$": "$[1].ResultSet.Rows" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Branches": [ { "StartAt": "Start Athena query 1", "States": { "Start Athena query 1": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 1 results" }, "Get Athena query 1 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } }, { "StartAt": "Start Athena query 2", "States": { "Start Athena query 2": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 2 results" }, "Get Athena query 2 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } } ], "Next": "Send query results" }, "Send query results": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message.$": "$", "TopicArn": "<SNS_TOPIC_ARN>" }, "End": true } } }

IAM 示例

示例项目生成的此示例 Amazon Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

AthenaStartQueryExecution

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-ztuvu9yuix", "arn:aws:athena:us-east-2:123456789012:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:us-east-2:123456789012:catalog", "arn:aws:glue:us-east-2:123456789012:database/*", "arn:aws:glue:us-east-2:123456789012:table/*", "arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
AthenaGetQueryResults

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:getQueryResults" ], "Resource": [ "arn:aws:us-east-2:123456789012:workgroup/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
SNSPublish

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaMultipleQueriese1ec229b-5cbe-4754-a8a8-078474bac878-SNSTopic-9AID0HEJT7TH" ] } ] }
LambdaInvokeFunction

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl:*" ] }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl" ] } ] }

有关在将 Step Functions 与其他 Amazon 服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 策略