教程:DynamoDB 批处理解析器 - Amazon AppSync
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:DynamoDB 批处理解析器

Amazon AppSync 支持对单个区域中的一个或多个表执行 Amazon DynamoDB 批处理操作。支持的操作为 BatchGetItemBatchPutItemBatchDeleteItem。通过在 Amazon AppSync 中使用这些功能,您可以执行如下任务:

  • 在单个查询中传递键列表,并从表中返回结果

  • 在单个查询中从一个或多个表中读取记录

  • 将记录批量写入到一个或多个表

  • 在可能存在关系的多个表中有条件地写入或删除记录

Amazon AppSync 中的批处理操作与非批处理操作有两个主要区别:

  • 数据源角色必须具有解析器访问的所有表的权限。

  • 解析器的表规范是请求对象的一部分。

单个表批处理

首先,我们创建一个新的 GraphQL API。在 Amazon AppSync 控制台中,选择创建 APIGraphQL API从头开始设计。将您的 API 命名为 BatchTutorial API,选择下一步,在指定 GraphQL 资源步骤中选择稍后创建 GraphQL 资源,然后单击下一步。检查您的详细信息并创建 API。转到架构页面并粘贴以下架构,请注意,对于查询,我们将传入一个 ID 列表:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] }

保存您的架构,并选择页面顶部的创建资源。选择使用现有的类型,并选择 Post 类型。将您的表命名为 Posts。确保主键设置为 id,取消选择自动生成 GraphQL(您将提供自己的代码),然后选择创建。首先,Amazon AppSync 创建一个新的 DynamoDB 表以及一个使用相应角色连接到该表的数据源。不过,您仍然需要为该角色添加一些权限。转到数据源页面,并选择新的数据源。在选择现有角色下面,您会注意到已为该表自动创建了一个角色。记下该角色(应类似于 appsync-ds-ddb-aaabbbcccddd-Posts),然后转到 IAM 控制台 (https://console.aws.amazon.com/iam/)。在 IAM 控制台中,选择角色,然后从该表中选择您的角色。在您的角色中,在权限策略下面,单击策略旁边的“+”(名称应与角色名称相似)。在显示该策略时,选择折叠菜单顶部的编辑。您需要为您的策略添加批处理权限,具体来说是 dynamodb:BatchGetItemdynamodb:BatchWriteItem。代码片段如下所示:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem" ], "Resource": [ "arn:aws:dynamodb:…", "arn:aws:dynamodb:…" ] } ] }

选择下一步,然后选择保存更改。您的策略现在应该允许进行批处理。

返回到 Amazon AppSync 控制台,转到架构页面并选择 Mutation.batchAdd 字段旁边的附加。将 Posts 表作为数据源以创建解析器。在代码编辑器中,将处理程序替换为下面的代码片段。该代码片段自动获取 GraphQL input PostInput 类型中的每个项目,并构建 BatchPutItem 操作所需的映射:

import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchPutItem", tables: { Posts: ctx.args.posts.map((post) => util.dynamodb.toMapValues(post)), }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }

导航到 Amazon AppSync 控制台的查询页面,并运行以下 batchAdd 变更:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您应该会看到在屏幕上输出的结果;可以在 DynamoDB 控制台中扫描写入到 Posts 表的值以验证这一点。

接下来,重复附加解析器的过程,但对于 Query.batchGet 字段,将 Posts 表作为数据源。将处理程序替换为以下代码。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchGetItem", tables: { Posts: { keys: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })), consistentRead: true, }, }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }

现在,返回到 Amazon AppSync 控制台的查询页面,并运行以下 batchGet 查询:

query get { batchGet(ids:[1,2,3]){ id title } }

这应返回您早前添加的两个 id 值的结果。请注意,对于值为 3id,将返回 null 值。这是因为您的 Posts 表中还没有具有该值的记录。另请注意,Amazon AppSync 使用与传递给查询的键相同的顺序返回结果,这是 Amazon AppSync 代表您执行的一项额外功能。因此,如果切换到 batchGet(ids:[1,3,2]),您会看到顺序发生了变化。您还将了解哪个 id 返回了 null 值。

最后,将另一个解析器附加到 Mutation.batchDelete 字段,并将 Posts 表作为数据源。将处理程序替换为以下代码。这会自动接受 GraphQL ids:[] 类型的每个项目并构建 BatchGetItem 操作所需的一个映射:

import { util } from "@aws-appsync/utils"; export function request(ctx) { return { operation: "BatchDeleteItem", tables: { Posts: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })), }, }; } export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.data.Posts; }

现在,返回到 Amazon AppSync 控制台的查询页面,并运行以下 batchDelete 变更:

mutation delete { batchDelete(ids:[1,2]){ id } }

现在应删除 id12 的记录。如果您更早重新运行 batchGet() 查询,则应返回 null

多个表批处理

Amazon AppSync 还允许您在多个表中执行批处理操作。我们来构建更复杂的应用程序。想象一下,我们构建一个宠物健康应用程序,其中传感器报告宠物的位置和体温。传感器由电池供电,并每隔几分钟尝试连接到网络。在传感器建立连接时,它将读数发送到我们的 Amazon AppSync API。然后,触发器分析数据,这样,就可以向宠物主人显示控制面板。我们重点关注如何表示传感器与后端数据存储之间的交互。

在 Amazon AppSync 控制台中,选择创建 APIGraphQL API从头开始设计。将您的 API 命名为 MultiBatchTutorial API,选择下一步,在指定 GraphQL 资源步骤中选择稍后创建 GraphQL 资源,然后单击下一步。检查您的详细信息并创建 API。转到架构页面,并粘贴和保存以下架构:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

我们需要创建两个 DynamoDB 表:

  • locationReadings 存储传感器位置读数。

  • temperatureReadings 存储传感器温度读数。

这两个表具有相同的主键结构:将 sensorId (String) 作为分区键,并将 timestamp (String) 作为排序键。

选择页面顶部的创建资源。选择使用现有的类型,并选择 locationReadings 类型。将您的表命名为 locationReadings。确保将主键设置为 sensorId,并将排序键设置为 timestamp。取消选择自动生成 GraphQL(您将提供自己的代码),然后选择创建。为 temperatureReadings 重复该过程,并将 temperatureReadings 作为类型和表名称。使用与上面相同的键。

您的新表将包含自动生成的角色。您仍然需要为这些角色添加一些权限。转到数据源页面并选择 locationReadings。在选择现有角色下面,您可以看到该角色。记下该角色(应类似于 appsync-ds-ddb-aaabbbcccddd-locationReadings),然后转到 IAM 控制台 (https://console.aws.amazon.com/iam/)。在 IAM 控制台中,选择角色,然后从该表中选择您的角色。在您的角色中,在权限策略下面,单击策略旁边的“+”(名称应与角色名称相似)。在显示该策略时,选择折叠菜单顶部的编辑。您需要为该策略添加权限。代码片段如下所示:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:UpdateItem", "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Resource": [ "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

选择下一步,然后选择保存更改。使用上面相同的策略片段,为 temperatureReadings 数据源重复该过程。

BatchPutItem - 记录传感器读数

我们的传感器需要能够在连接到 Internet 后立即发送其读数。GraphQL 字段 Mutation.recordReadings 是传感器将用来执行上述操作的 API。我们需要在该字段中添加一个解析器。

在 Amazon AppSync 控制台的架构页面中,选择 Mutation.recordReadings 字段旁边的附加。在下一个屏幕上,将 locationReadings 表作为数据源以创建解析器。

在创建解析器后,在编辑器中将处理程序替换为以下代码。我们可以通过 BatchPutItem 操作指定多个表:

import { util } from '@aws-appsync/utils' export function request(ctx) { const { locReadings, tempReadings } = ctx.args const locationReadings = locReadings.map((loc) => util.dynamodb.toMapValues(loc)) const temperatureReadings = tempReadings.map((tmp) => util.dynamodb.toMapValues(tmp)) return { operation: 'BatchPutItem', tables: { locationReadings, temperatureReadings, }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } return ctx.result.data }

使用批处理操作,可能会从调用中同时返回错误和结果。在这种情况下,我们可以自主执行一些额外的错误处理。

注意

使用 utils.appendError()util.error() 类似,主要区别在于,它不会中断请求或响应处理程序评估。相反,它指示字段存在错误,但允许评估处理程序,从而将数据返回到调用方。在您的应用程序需要返回部分结果时,我们建议您使用 utils.appendError()

保存解析器,并导航到 Amazon AppSync 控制台中的查询页面。我们现在可以发送一些传感器读数。

执行以下变更:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我们在一个变更中发送了 10 个传感器读数,这些读数拆分到两个表中。使用 DynamoDB 控制台验证是否在 locationReadingstemperatureReadings 表中显示数据。

BatchDeleteItem - 删除传感器读数

同样,我们还需要能够批量删除传感器读数。我们使用 Mutation.deleteReadings GraphQL 字段来实现此目的。在 Amazon AppSync 控制台的架构页面中,选择 Mutation.deleteReadings 字段旁边的附加。在下一个屏幕上,将 locationReadings 表作为数据源以创建解析器。

在创建解析器后,在代码编辑器中将处理程序替换为下面的代码片段。在该解析器中,我们使用帮助程序函数映射器,该映射器从提供的输入中提取 sensorIdtimestamp

import { util } from '@aws-appsync/utils' export function request(ctx) { const { locReadings, tempReadings } = ctx.args const mapper = ({ sensorId, timestamp }) => util.dynamodb.toMapValues({ sensorId, timestamp }) return { operation: 'BatchDeleteItem', tables: { locationReadings: locReadings.map(mapper), temperatureReadings: tempReadings.map(mapper), }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } return ctx.result.data }

保存解析器,并导航到 Amazon AppSync 控制台中的查询页面。现在,让我们删除几个传感器读数。

执行以下变更:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }
注意

DeleteItem 操作相反,响应中不会返回完全删除的项目。只返回传递的键。要了解更多信息,请参阅 DynamoDB 的 JavaScript 解析器函数参考中的 BatchDeleteItem

通过 DynamoDB 控制台验证是否从 locationReadingstemperatureReadings 表中删除了这两个读数。

BatchGetItem - 检索读数

我们的应用程序的另一个常见操作是,检索传感器在特定时间点的读数。我们将解析器附加到架构上的 Query.getReadings GraphQL 字段。在 Amazon AppSync 控制台的架构页面中,选择 Query.getReadings 字段旁边的附加。在下一个屏幕上,将 locationReadings 表作为数据源以创建解析器。

让我们使用以下代码:

import { util } from '@aws-appsync/utils' export function request(ctx) { const keys = [util.dynamodb.toMapValues(ctx.args)] const consistentRead = true return { operation: 'BatchGetItem', tables: { locationReadings: { keys, consistentRead }, temperatureReadings: { keys, consistentRead }, }, } } export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type) } const { locationReadings: locs, temperatureReadings: temps } = ctx.result.data return [ ...locs.map((l) => ({ ...l, __typename: 'LocationReading' })), ...temps.map((t) => ({ ...t, __typename: 'TemperatureReading' })), ] }

保存解析器,并导航到 Amazon AppSync 控制台中的查询页面。现在,让我们检索传感器读数。

执行以下查询:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我们已成功说明了如何通过 Amazon AppSync 使用 DynamoDB 批处理操作。

错误处理

在 Amazon AppSync 中,数据源操作有时可能会返回部分结果。部分结果是一个术语,我们用它来表示操作的输出中包含某些数据和一个错误。由于错误处理本质上是应用程序特定的,因此,Amazon AppSync 允许您在响应处理程序中处理错误。上下文中的解析器调用错误(如果有)为 ctx.error。调用错误始终包含一条消息和一个类型,可作为属性 ctx.error.messagectx.error.type 进行访问。在响应处理程序中,您可以使用三种方法处理部分结果:

  1. 仅返回数据以忽略调用错误。

  2. 停止处理程序评估以引发错误(使用 util.error(...)),这不会返回任何数据。

  3. 附加一个错误(使用 util.appendError(...))并且也返回数据。

让我们通过 DynamoDB 批处理操作分别说明上述三点。

DynamoDB 批处理操作

借助 DynamoDB 批处理操作,批处理可能会部分完成。也就是说,某些请求的项目或键未得到处理。如果 Amazon AppSync 无法完成批处理,则会在上下文中设置未处理的项目和调用错误。

我们将使用本教程前一部分中 Query.getReadings 操作的 BatchGetItem 字段配置来实施错误处理。这一次,我们假定在执行 Query.getReadings 字段时,temperatureReadings DynamoDB 表耗尽了预置的吞吐量。在 Amazon AppSync 第二次尝试处理批次中的剩余元素时,DynamoDB 引发了 ProvisionedThroughputExceededException

以下 JSON 表示在 DynamoDB 批处理调用之后但在调用响应处理程序之前的序列化上下文:

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

关于上下文需要注意的几点:

  • Amazon AppSync 在上下文中的 ctx.error 处设置了调用错误,并且错误类型设置为 DynamoDB:ProvisionedThroughputExceededException

  • 即使存在错误,也会在 ctx.result.data 中为每个表映射结果。

  • ctx.result.data.unprocessedKeys 中提供了未处理的键。此处,由于表吞吐量不足,Amazon AppSync 无法检索具有 (sensorId:1, timestamp:2018-02-01T17:21:05.000+08:00) 键的项目。

注意

对于 BatchPutItem,它是 ctx.result.data.unprocessedItems。对于 BatchDeleteItem,它是 ctx.result.data.unprocessedKeys

我们通过三种不同方式处理此错误。

1. 承受调用错误

返回数据而不处理调用错误:这会有效地承受此错误,同时使给定 GraphQL 字段的结果始终成功。

我们编写的代码是熟悉的,并且仅关注结果数据。

响应处理程序

export function response(ctx) { return ctx.result.data }

GraphQL 响应

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

将不向错误响应中添加任何错误,因为只对数据执行了操作。

2. 引发错误以中止执行响应处理程序

从客户端角度看,在应将部分失败视为完全失败时,您可以中止执行响应处理程序以防止返回数据。util.error(...) 实用程序方法实现完全此行为。

响应处理程序代码

export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys); } return ctx.result.data; }

GraphQL 响应

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使可能已从 DynamoDB 批处理操作返回了一些结果,我们也选择引发错误,这样,getReadings GraphQL 字段为 Null,并且此错误已添加到 GraphQL 响应的错误数据块中。

3. 追加错误以返回数据和错误

在某些情况下,为了提供更好的用户体验,应用程序可以返回部分结果并向其客户端通知未处理的项目。客户端可以决定是实施重试,还是将错误翻译出来并返回给最终用户。util.appendError(...) 是一种实现该行为的实用程序方法,它让应用程序设计者在上下文中附加错误,而不干扰响应处理程序评估。在评估响应处理程序后,Amazon AppSync 将任何上下文错误附加到 GraphQL 响应的错误块以处理它们。

响应处理程序代码

export function response(ctx) { if (ctx.error) { util.appendError(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys); } return ctx.result.data; }

我们在 GraphQL 响应的错误块中转发了调用错误和 unprocessedKeys 元素。getReadings 字段也从 locationReadings 表中返回部分数据,如下面的响应中所示。

GraphQL 响应

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }