教程:DynamoDB 批处理解析器 - Amazon AppSync
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:DynamoDB 批处理解析器

注意

我们现在主要支持 APPSYNC_JS 运行时环境及其文档。请考虑使用 APPSYNC_JS 运行时环境和此处的指南。

Amazon AppSync 支持对单个区域中的一个或多个表执行 Amazon DynamoDB 批处理操作。支持的操作为 BatchGetItemBatchPutItemBatchDeleteItem。通过在 Amazon AppSync 中使用这些功能,您可以执行如下任务:

  • 在单个查询中传递键列表,并从表中返回结果

  • 在单个查询中从一个或多个表读取记录

  • 将记录批量写入一个或多个表

  • 有条件写入或删除多个可能有关系的表中的记录

在 Amazon AppSync 中将批处理操作用于 DynamoDB 是一项高级技术,需要对后端操作和表结构有所了解。此外,Amazon AppSync 中的批处理操作与非批处理操作有两个主要区别:

  • 数据源角色必须对解析器将访问的所有表具有权限。

  • 解析器的表规范是映射模板的一部分。

权限

与其他解析器一样,您需要在 Amazon AppSync 中创建数据源,并创建一个角色或使用现有的角色。由于批处理操作需要具有 DynamoDB 表的不同权限,因此,您需要为配置的角色授予读取或写入操作权限:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/TABLENAME", "arn:aws:dynamodb:region:account:table/TABLENAME/*" ] } ] }

注意:角色与 Amazon AppSync 中的数据源相关联,并对数据源调用字段上的解析器。配置为针对 DynamoDB 获取的数据源仅指定一个表,以使配置保持简单。因此,当在单个解析器中针对多个表执行批处理操作(这是一项更高级的任务)时,您必须向该数据源的角色授予对将与解析器进行交互的任何表的访问权限。这项操作将在上面 IAM 策略中的 Resource (资源) 字段中执行。对表进行配置以便对它们进行批处理调用是在解析器模板中实现的,下面将介绍相关内容。

数据源

为简便起见,我们将为本教程中使用的所有解析器使用同一个数据源。在数据源选项卡上,创建一个新的 DynamoDB 数据源,并将其命名为 BatchTutorial。表名称可以是任何内容,因为表名称被指定为批处理操作的请求映射模板的一部分。我们将提供表名称 empty

对于本教程,具有以下内联策略的任何角色都有效:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/Posts", "arn:aws:dynamodb:region:account:table/Posts/*", "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

单个表批处理

在此示例中,假设您有一个名为 Posts 的表,您要通过批处理操作在其中添加和删除项目。使用以下架构,注意对于此查询,我们传入一个 ID 列表:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] } schema { query: Query mutation: Mutation }

使用以下请求映射模板将一个解析器附加到 batchAdd() 字段。这会自动接受 GraphQL input PostInput 类型的每个项目并构建 BatchPutItem 操作所需的一个映射:

#set($postsdata = []) #foreach($item in ${ctx.args.posts}) $util.qr($postsdata.add($util.dynamodb.toMapValues($item))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "Posts": $utils.toJson($postsdata) } }

在这种情况下,响应映射模板是一个简单的传递,但表名称作为 ..data.Posts 追加到上下文对象,如下所示:

$util.toJson($ctx.result.data.Posts)

现在,导航到 Amazon AppSync 控制台的查询页面,并运行以下 batchAdd 变更:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您应该会看到输出到屏幕的结果,并且可以通过 DynamoDB 控制台单独验证这两个值是否写入到 Posts 表。

接下来,使用以下请求映射模板将一个解析器附加到 batchGet() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "Posts": { "keys": $util.toJson($ids), "consistentRead": true, "projection" : { "expression" : "#id, title", "expressionNames" : { "#id" : "id"} } } } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在,返回到 Amazon AppSync 控制台的查询页面,并运行以下 batchGet 查询

query get { batchGet(ids:[1,2,3]){ id title } }

这应返回您早前添加的两个 id 值的结果。请注意,对于值为 nullid,将返回 3 值。这是因为您的 Posts 表中还没有具有该值的记录。另请注意,Amazon AppSync 使用与传递给查询的键相同的顺序返回结果,这是 Amazon AppSync 代表您执行的一项额外功能。因此,如果您切换到 batchGet(ids:[1,3,2),将看到订单已更改。您还将了解哪个 id 返回了 null 值。

最后,使用以下请求映射模板将一个解析器附加到 batchDelete() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "Posts": $util.toJson($ids) } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在,返回到 Amazon AppSync 控制台的查询页面,并运行以下 batchDelete 变更:

mutation delete { batchDelete(ids:[1,2]){ id } }

现在应删除 id12 的记录。如果您更早重新运行 batchGet() 查询,则应返回 null

多个表批处理

Amazon AppSync 还允许您在多个表中执行批处理操作。我们来构建更复杂的应用程序。想象一下,我们正在构建一个 Pet Health 应用程序,其中的传感器报告宠物位置和身体温度。传感器由电池供电,并每隔几分钟尝试连接到网络。在传感器建立连接时,它将读数发送到我们的 Amazon AppSync API。然后,触发器分析数据,这样,就可以向宠物主人显示控制面板。我们重点关注如何表示传感器与后端数据存储之间的交互。

作为先决条件,让我们先创建两个 DynamoDB 表;locationReadings 存储传感器位置读数,temperatureReadings 存储传感器温度读数。这两个表正好共享相同的主键结构:sensorId (String) 是分区键,而 timestamp (String) 是排序键。

我们使用以下 GraphQL 架构:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

BatchPutItem - 记录传感器读数

我们的传感器需要能够在连接到 Internet 后立即发送其读数。GraphQL 字段 Mutation.recordReadings 是传感器将用来执行上述操作的 API。现在附加一个解析器以实际使用 API。

选择 Mutation.recordReadings 字段旁边的附加。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们添加以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB objects #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) $util.qr($tempReadings.add($util.dynamodb.toMapValues($reading))) #end ## Convert locReadings arguments to DynamoDB objects #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) $util.qr($locReadings.add($util.dynamodb.toMapValues($reading))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

如您所见,BatchPutItem 操作使我们能够指定多个表。

我们使用以下响应映射模板。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also returns data for the field in the GraphQL response $utils.toJson($ctx.result.data)

使用批处理操作,可能会从调用中同时返回错误和结果。在这种情况下,我们可以自主执行一些额外的错误处理。

注意$utils.appendError() 的用法与 $util.error() 相似,主要区别是前者不中断对映射模板的评估。相反,它指示字段发生了错误,但允许评估模板,因此会将数据返回给调用方。我们建议,当您的应用程序需要返回部分结果时使用 $utils.appendError()

保存解析器,并导航到 Amazon AppSync 控制台的查询页面。现在发送一些传感器读数!

执行以下变更:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"} {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"} {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"} {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我们在一个变更中发送了 10 个传感器读数,并在两个表之间拆分读数。使用 DynamoDB 控制台验证是否在 locationReadingstemperatureReadings 表中显示数据。

BatchDeleteItem - 删除传感器读数

同样,我们也需要删除批量传感器读数。我们使用 Mutation.deleteReadings GraphQL 字段来实现此目的。选择 Mutation.recordReadings 字段旁边的附加。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们使用以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB primary keys #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($tempReadings.add($util.dynamodb.toMapValues($pkey))) #end ## Convert locReadings arguments to DynamoDB primary keys #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($locReadings.add($util.dynamodb.toMapValues($pkey))) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

该响应映射模板与我们用于 Mutation.recordReadings 的模板相同。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also return data for the field in the GraphQL response $utils.toJson($ctx.result.data)

保存解析器,并导航到 Amazon AppSync 控制台的查询页面。现在,让我们删除几个传感器读数!

执行以下变更:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

通过 DynamoDB 控制台验证是否从 locationReadingstemperatureReadings 表中删除了这两个读数。

BatchGetItem - 检索读数

Pet Health 应用程序的另一个常见操作是检索特定时间点的传感器读数。我们将解析器附加到架构上的 Query.getReadings GraphQL 字段。选择 Attach (附加),然后,在下一个屏幕上选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们添加以下请求映射模板。

请求映射模板

## Build a single DynamoDB primary key, ## as both locationReadings and tempReadings tables ## share the same primary key structure #set($pkey = {}) $util.qr($pkey.put("sensorId", $ctx.args.sensorId)) $util.qr($pkey.put("timestamp", $ctx.args.timestamp)) { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "locationReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true }, "temperatureReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true } } }

请注意,我们现在使用 BatchGetItem 操作。

我们的响应映射模板会有点不同,因为我们选择返回 SensorReading 列表。我们将调用结果映射到所需的形状。

响应映射模板

## Merge locationReadings and temperatureReadings ## into a single list ## __typename needed as schema uses an interface #set($sensorReadings = []) #foreach($locReading in $ctx.result.data.locationReadings) $util.qr($locReading.put("__typename", "LocationReading")) $util.qr($sensorReadings.add($locReading)) #end #foreach($tempReading in $ctx.result.data.temperatureReadings) $util.qr($tempReading.put("__typename", "TemperatureReading")) $util.qr($sensorReadings.add($tempReading)) #end $util.toJson($sensorReadings)

保存解析器,并导航到 Amazon AppSync 控制台的查询页面。现在,我们来检索传感器读数!

执行以下查询:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我们已成功说明了如何通过 Amazon AppSync 使用 DynamoDB 批处理操作。

错误处理

在 Amazon AppSync 中,数据源操作有时可能会返回部分结果。部分结果是一个术语,我们用它来表示操作的输出中包含某些数据和一个错误。由于错误处理本质上是应用程序特定的,因此,Amazon AppSync 允许您在响应映射模板中处理错误。上下文中的解析器调用错误(如果有)为 $ctx.error。调用错误始终包含一条消息和一个类型,可作为属性 $ctx.error.message$ctx.error.type 进行访问。在响应映射模板调用期间,您可以通过三种方式处理部分结果:

  1. 仅返回数据以忽略调用错误

  2. 通过停止响应映射模板评估(这不会返回任何数据)来引发错误(使用 $util.error(...))。

  3. 附加一个错误(使用 $util.appendError(...))并且也返回数据

让我们通过 DynamoDB 批处理操作分别说明上述三点!

DynamoDB 批处理操作

借助 DynamoDB 批处理操作,批处理可能会部分完成。也就是说,某些请求的项目或键未得到处理。如果 Amazon AppSync 无法完成批处理,则会在上下文中设置未处理的项目和调用错误。

我们将使用本教程前一部分中 Query.getReadings 操作的 BatchGetItem 字段配置来实施错误处理。这一次,我们假定在执行 Query.getReadings 字段时,temperatureReadings DynamoDB 表耗尽了预置的吞吐量。在 Amazon AppSync 第二次尝试处理批次中的剩余元素时,DynamoDB 引发了 ProvisionedThroughputExceededException

以下 JSON 表示在 DynamoDB 批处理调用之后但在评估响应映射模板之前的序列化上下文。

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

关于上下文需要注意的几点:

  • Amazon AppSync 在上下文中的 $ctx.error 处设置了调用错误,并且错误类型设置为 DynamoDB:ProvisionedThroughputExceededException

  • 即使存在错误,也会在 $ctx.result.data 中为每个表映射结果

  • $ctx.result.data.unprocessedKeys 中提供了未处理的键。此处,由于表吞吐量不足,Amazon AppSync 无法检索具有 (sensorId:1, timestamp:2018-02-01T17:21:05.000+08:00) 键的项目。

注意:对于 BatchPutItem,它是 $ctx.result.data.unprocessedItems。对于 BatchDeleteItem,它是 $ctx.result.data.unprocessedKeys

我们通过三种不同方式处理此错误。

1. 承受调用错误

返回数据而不处理调用错误:这会有效地承受此错误,同时使给定 GraphQL 字段的结果始终成功。

我们编写的响应映射模板是熟悉的,仅侧重于结果数据。

响应映射模板:

$util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

将不向错误响应中添加任何错误,因为只对数据执行了操作。

2. 引发错误以中止模板执行

从客户端角度看,在应将部分失败视为完全失败时,您可以中止执行模板以防止返回数据。$util.error(...) 实用程序方法实现完全此行为。

响应映射模板:

## there was an error let's mark the entire field ## as failed and do not return any data back in the response #if ($ctx.error) $util.error($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使可能已从 DynamoDB 批处理操作返回了一些结果,我们也选择引发错误,这样,getReadings GraphQL 字段为 Null,并且此错误已添加到 GraphQL 响应的错误数据块中。

3. 追加错误以返回数据和错误

在某些情况下,为了提供更好的用户体验,应用程序可以返回部分结果并向其客户端通知未处理的项目。客户端可以决定是实施重试,还是将错误翻译出来并返回给最终用户。$util.appendError(...) 是一个可以启用此行为的实用程序方法,具体方式为:让应用程序设计人员在上下文中追加错误,而不干扰对模板的评估。在评估模板后,Amazon AppSync 将任何上下文错误附加到 GraphQL 响应的错误块以处理它们。

响应映射模板:

#if ($ctx.error) ## pass the unprocessed keys back to the caller via the `errorInfo` field $util.appendError($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

我们在 GraphQL 响应的错误块中转发了调用错误和 unprocessedKeys 元素。getReadings 字段也从 locationReadings 表中返回部分数据,如下面的响应中所示。

GraphQL 响应:

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }