教程:DynamoDB 批处理解析程序 - AWS AppSync
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:DynamoDB 批处理解析程序

AWS AppSync 支持跨单个区域中的一个或多个表使用 Amazon DynamoDB 批处理操作。支持的操作为 BatchGetItemBatchPutItemBatchDeleteItem。通过在 AWS AppSync 中使用这些功能,您可以执行如下任务:

  • 在单个查询中传递键列表,并从表中返回结果

  • 在单个查询中从一个或多个表读取记录

  • 将记录批量写入一个或多个表

  • 有条件写入或删除多个可能有关系的表中的记录

在 AWS AppSync 中将批处理操作用于 DynamoDB 是一项高级技术,需要在一定程度上额外思考和了解您的后端操作和表结构。此外,AWS AppSync 中的批处理操作与非批量操作之间有两个主要区别:

  • 数据源角色必须对解析程序将访问的所有表具有权限。

  • 解析程序的表规范是映射模板的一部分。

权限

与其他解析程序类似,您需要在 AWS AppSync 中创建一个数据源,并创建角色或使用现有角色。由于批处理操作需要针对 DynamoDB 表的不同权限,因此您需要向所配置的角色授予权限以执行读取或写入操作:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/TABLENAME", "arn:aws:dynamodb:region:account:table/TABLENAME/*" ] } ] }

注意:角色与 AWS AppSync 中的数据源相关联,针对数据源调用字段上的解析程序。配置为针对 DynamoDB 进行提取的数据源仅指定了一个表,以确保配置简单。因此,当在单个解析程序中针对多个表执行批处理操作(这是一项更高级的任务)时,您必须向该数据源的角色授予对将与解析程序进行交互的任何表的访问权限。这项操作将在上面 IAM 策略中的 Resource (资源) 字段中执行。对表进行配置以便对它们进行批处理调用是在解析程序模板中实现的,下面将介绍相关内容。

数据源

为简便起见,我们将为本教程中使用的所有解析程序使用同一个数据源。在 Data sources (数据源) 选项卡中,创建一个新的 DynamoDB 数据源,并将其命名为 BatchTutorial。表名称可以是任何内容,因为表名称被指定为批处理操作的请求映射模板的一部分。我们将提供表名称 empty

对于本教程,具有以下内联策略的任何角色都有效:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/Posts", "arn:aws:dynamodb:region:account:table/Posts/*", "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

单个表批处理

在此示例中,假设您有一个名为 Posts 的表,您要通过批处理操作在其中添加和删除项目。使用以下架构,注意对于此查询,我们传入一个 ID 列表:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] } schema { query: Query mutation: Mutation }

使用以下请求映射模板将解析程序附加到 batchAdd() 字段。这会自动接受 GraphQL input PostInput 类型的每个项目并构建 BatchPutItem 操作所需的一个映射:

#set($postsdata = []) #foreach($item in ${ctx.args.posts}) $util.qr($postsdata.add($util.dynamodb.toMapValues($item))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "Posts": $utils.toJson($postsdata) } }

在这种情况下,响应映射模板是一个简单的传递,但表名称作为 ..data.Posts 追加到上下文对象,如下所示:

$util.toJson($ctx.result.data.Posts)

现在,导航到 AWS AppSync 控制台的 Queries (查询) 页面并运行以下 batchAdd 更改:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您应该可以看到结果显示在屏幕上,并且可以通过 DynamoDB 控制台独立验证这两个值写入 Posts 表中。

接下来,使用以下请求映射模板将解析程序附加到 batchGet() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "Posts": { "keys": $util.toJson($ids), "consistentRead": true } } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在,回到 AWS AppSync 控制台的 Queries (查询) 页面并运行以下 batchGet 查询

query get { batchGet(ids:[1,2,3]){ id title } }

这应返回您早前添加的两个 id 值的结果。请注意,对于值为 nullid,将返回 3 值。这是因为您的 Posts 表中尚没有具有该值的记录。另请注意,AWS AppSync 按照与传入到查询中的键的相同顺序返回结果,这是 AWS AppSync 代表您执行的一项额外功能。因此,如果您切换到 batchGet(ids:[1,3,2),将看到订单已更改。您还将了解哪个 id 返回了 null 值。

最后,使用以下请求映射模板将解析程序附加到 batchDelete() 字段。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "Posts": $util.toJson($ids) } }

响应映射模板还是一个简单的传递,且再一次,表名称作为 ..data.Posts 追加到上下文对象:

$util.toJson($ctx.result.data.Posts)

现在,回到 AWS AppSync 控制台的 Queries (查询) 页面并运行以下 batchDelete 更改:

mutation delete { batchDelete(ids:[1,2]){ id } }

现在应删除 id12 的记录。如果您更早重新运行 batchGet() 查询,则应返回 null

多个表批处理

AWS AppSync 还可让您跨多个表执行批处理操作。我们来构建更复杂的应用程序。想象一下,我们正在构建一个 Pet Health 应用程序,其中的传感器报告宠物位置和身体温度。传感器由电池供电,并每隔几分钟尝试连接到网络。当传感器建立连接时,它会向 AWS AppSync API 发送其读数。然后,触发器分析数据,这样,就可以向宠物主人显示控制面板。我们重点关注如何表示传感器与后端数据存储之间的交互。

作为前提条件,我们先创建两个 DynamoDB 表:locationReadings 将存储传感器位置读数,而 temperatureReadings 将存储传感器温度读数。这两个表正好共享相同的主键结构:sensorId (String) 是分区键,而 timestamp (String) 是排序键。

我们使用以下 GraphQL 架构:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

BatchPutItem – 记录传感器读数

我们的传感器需要能够在连接到 Internet 后立即发送其读数。GraphQL 字段 Mutation.recordReadings 是传感器将用来执行上述操作的 API。现在附加一个解析程序以实际使用 API。

选择 Mutation.recordReadings 字段旁边的 Attach (附加)。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们添加以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB objects #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) $util.qr($tempReadings.add($util.dynamodb.toMapValues($reading))) #end ## Convert locReadings arguments to DynamoDB objects #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) $util.qr($locReadings.add($util.dynamodb.toMapValues($reading))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

如您所见,BatchPutItem 操作使我们能够指定多个表。

我们使用以下响应映射模板。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also returns data for the field in the GraphQL response $utils.toJson($ctx.result.data)

使用批处理操作,可能会从调用中同时返回错误和结果。在这种情况下,我们可以自主执行一些额外的错误处理。

注意$utils.appendError() 的用法与 $util.error() 相似,主要区别是前者不中断对映射模板的评估。相反,它指示字段发生了错误,但允许评估模板,因此会将数据返回给调用方。我们建议,当您的应用程序需要返回部分结果时使用 $utils.appendError()

保存解析程序并导航到 AWS AppSync 控制台的 Queries (查询) 页面。现在发送一些传感器读数!

执行以下更改:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"} {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"} {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"} {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我们在一个更改中发送了 10 个传感器读数,并在两个表之间拆分读数。使用 DynamoDB 控制台以验证数据出现在 locationReadingstemperatureReadings 这两个表中。

BatchDeleteItem – 删除传感器读数

同样,我们也需要删除批量传感器读数。我们使用 Mutation.deleteReadings GraphQL 字段来实现此目的。选择 Mutation.recordReadings 字段旁边的 Attach (附加)。在下一个屏幕上,选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们使用以下请求映射模板。

请求映射模板

## Convert tempReadings arguments to DynamoDB primary keys #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($tempReadings.add($util.dynamodb.toMapValues($pkey))) #end ## Convert locReadings arguments to DynamoDB primary keys #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($locReadings.add($util.dynamodb.toMapValues($pkey))) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

该响应映射模板与我们用于 Mutation.recordReadings 的模板相同。

响应映射模板

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also return data for the field in the GraphQL response $utils.toJson($ctx.result.data)

保存解析程序并导航到 AWS AppSync 控制台的 Queries (查询) 页面。现在,我们删除几个传感器读数!

执行以下更改:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

通过 DynamoDB 控制台,验证这两个读数已从 locationReadingstemperatureReadings 表中删除。

BatchGetItem – 检索读数

Pet Health 应用程序的另一个常见操作是检索特定时间点的传感器读数。我们将解析程序附加到架构上的 Query.getReadings GraphQL 字段。选择 Attach (附加),然后,在下一个屏幕上选择本教程开始时创建的同一个 BatchTutorial 数据源。

我们添加以下请求映射模板。

请求映射模板

## Build a single DynamoDB primary key, ## as both locationReadings and tempReadings tables ## share the same primary key structure #set($pkey = {}) $util.qr($pkey.put("sensorId", $ctx.args.sensorId)) $util.qr($pkey.put("timestamp", $ctx.args.timestamp)) { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "locationReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true }, "temperatureReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true } } }

请注意,我们现在使用 BatchGetItem 操作。

我们的响应映射模板会有点不同,因为我们选择返回 SensorReading 列表。我们将调用结果映射到所需的形状。

响应映射模板

## Merge locationReadings and temperatureReadings ## into a single list ## __typename needed as schema uses an interface #set($sensorReadings = []) #foreach($locReading in $ctx.result.data.locationReadings) $util.qr($locReading.put("__typename", "LocationReading")) $util.qr($sensorReadings.add($locReading)) #end #foreach($tempReading in $ctx.result.data.temperatureReadings) $util.qr($tempReading.put("__typename", "TemperatureReading")) $util.qr($sensorReadings.add($tempReading)) #end $util.toJson($sensorReadings)

保存解析程序并导航到 AWS AppSync 控制台的 Queries (查询) 页面。现在,我们来检索传感器读数!

执行以下查询:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我们已成功地演示了如何通过 AWS AppSync 来使用 DynamoDB 批处理操作。

错误处理

在 AWS AppSync 中,数据源操作有时可能返回部分结果。部分结果是一个术语,我们用它来表示操作的输出中包含某些数据和一个错误。由于错误处理本质上是特定于应用程序的,因此 AWS AppSync 让您有机会处理响应映射模板中的错误。上下文中的解析程序调用错误(如果有)为 $ctx.error。调用错误始终包含一条消息和一个类型,可作为属性 $ctx.error.message$ctx.error.type 进行访问。在响应映射模板调用期间,您可以通过三种方式处理部分结果:

  1. 通过仅返回数据来承受调用错误

  2. 通过停止响应映射模板评估(这不会返回任何数据)来引发错误(使用 $util.error(...))。

  3. 追加错误(使用 $util.appendError(...))并且也返回数据

我们来演示 DynamoDB 批处理操作的上述三个要点中的每一个要点!

DynamoDB 批处理操作

借助 DynamoDB 批处理操作,批处理可能会部分完成。也就是说,某些请求的项目或键未得到处理。如果 AWS AppSync 无法完成批处理,上下文将设置未处理的项目和一个调用错误。

我们将使用本教程前一部分中 Query.getReadings 操作的 BatchGetItem 字段配置来实施错误处理。这一次,我们假定在执行 Query.getReadings 字段时,temperatureReadings DynamoDB 表耗尽了预配置的吞吐量。DynamoDB 在 AWS AppSync 第二次尝试时引发了 ProvisionedThroughputExceededException,以成批处理剩余的元素。

以下 JSON 表示在 DynamoDB 批处理调用之后但在评估响应映射模板之前的序列化上下文。

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

关于上下文需要注意的几点:

  • AWS AppSync 已在上下文中的 $ctx.error 处设置了调用错误,错误类型已设置为 DynamoDB:ProvisionedThroughputExceededException

  • 结果按每个表映射到 $ctx.result.data 下,即使存在错误

  • 留下未处理的键在 $ctx.result.data.unprocessedKeys 中提供。此处,由于表吞吐量不足,AWS AppSync 无法使用密钥 (sensorId:1, timestamp:2018-02-01T17:21:05.000+08:00) 来检索项目。

注意:对于 BatchPutItem,它是 $ctx.result.data.unprocessedItems。对于 BatchDeleteItem,它是 $ctx.result.data.unprocessedKeys

我们通过三种不同方式处理此错误。

1.承受调用错误

返回数据而不处理调用错误:这会有效地承受此错误,同时使给定 GraphQL 字段的结果始终成功。

我们编写的响应映射模板是熟悉的,仅侧重于结果数据。

响应映射模板:

$util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

将不向错误响应中添加任何错误,因为只对数据执行了操作。

2.引发错误以中止模板执行

当从客户端的角度应将部分失败视为完全失败时,您可以中止模板执行以防止返回数据。$util.error(...) 实用程序方法实现完全此行为。

响应映射模板:

## there was an error let's mark the entire field ## as failed and do not return any data back in the response #if ($ctx.error) $util.error($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

GraphQL 响应:

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使可能已从 DynamoDB 批处理操作返回了一些结果,我们也选择引发错误,这样,getReadings GraphQL 字段为 Null,并且此错误已添加到 GraphQL 响应的错误数据块中。

3.追加错误以返回数据和错误

在某些情况下,为了提供更好的用户体验,应用程序可以返回部分结果并向其客户端通知未处理的项目。客户端可以决定是实施重试,还是将错误翻译出来并返回给最终用户。$util.appendError(...) 是一个可以启用此行为的实用程序方法,具体方式为:让应用程序设计人员在上下文中追加错误,而不干扰对模板的评估。在评估模板后,AWS AppSync 通过将任何上下文错误追加到 GraphQL 响应的错误数据块中来处理这些错误。

响应映射模板:

#if ($ctx.error) ## pass the unprocessed keys back to the caller via the `errorInfo` field $util.appendError($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

我们转发了 GraphQL 响应的错误数据块内的调用错误和 unprocessedKeys 元素。getReadings 字段也从 locationReadings 表中返回部分数据,如下面的响应中所示。

GraphQL 响应:

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }