教程:增量同步 - AWS AppSync
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

教程:增量同步

AWS AppSync 中的客户端应用程序通过将 GraphQL 响应本地缓存到移动/Web 应用程序中的磁盘来存储数据。版本化的数据源和 Sync 操作使客户能够使用单个解析器执行同步过程。这使客户端能够将其本地缓存与来自一个基本查询的结果(可能包含大量记录)混合,然后仅接收自上次查询以来更改的数据(增量更新)。通过允许客户端使用初始请求和另一个请求中的增量更新来执行缓存的基本组合,您可以将计算工作从客户端应用程序移至后端。这对于经常在联机状态和脱机状态之间切换的客户端应用程序来说效率更高。

为了实现增量同步,Sync 查询会对版本化数据源使用 Sync 操作。当 AWS AppSync 更改将更改版本化数据源中的项目时,该更改的记录也将存储在增量 表中。您可以选择对其他版本化数据源使用不同的增量 表(例如,每种类型一个,每个域区域一个),或者对 API 使用单个增量 表。AWS AppSync 建议不要对多个 API 使用单个增量 表,以避免主键冲突。

此外,增量同步客户端还可以接收订阅作为参数,然后客户端协调订阅在脱机和联机转换之间重新连接和写入。增量同步通过自动恢复订阅(包括指数回退和通过各种网络错误情况的抖动重试)并将事件存储在队列中来执行此操作。然后,在合并队列中的任何事件之前运行适当的增量或基本查询,最后正常处理订阅。

客户端配置选项(包括 Amplify 数据存储)的文档可在Amplify 框架网站上查阅。本文档概述了如何设置版本化的 DynamoDB 数据源和 Sync 操作,以便与增量同步客户端一起使用,实现最佳的数据访问。

一键设置

要在 AWS AppSync 中使用所有配置的解析程序和必要的 AWS 资源自动设置 GraphQL 终端节点,请使用以下 AWS CloudFormation 模板:

此堆栈在您的账户中创建以下资源:

  • 2 个 DynamoDB 表(基本和增量)

  • 1 个 AWS AppSync API(带 API 密钥)

  • 1 个 IAM 角色(带适用于 DynamoDB 表的策略)

两个表用于将您的同步查询分区到另一个表中,此表在客户端处于脱机状态时充当错过事件的日志。为了使查询在增量表上保持高效,将使用 Amazon DynamoDB TTL 来根据需要自动整理事件。可以根据您对数据源的需求配置 TTL 时间(您可能希望这是 1 小时,1 天等)。

架构

为了演示增量同步,示例应用程序将创建由 DynamoDB 中的基本增量 表支持的 Posts 架构。AWS AppSync 会自动将更改写入这两个表。同步查询将根据情况从基本增量 表中拉取记录,并定义单个订阅,以说明客户端如何在其重新连接逻辑中利用它。

input CreatePostInput { author: String! title: String! content: String! url: String ups: Int downs: Int _version: Int } interface Connection { nextToken: String startedAt: AWSTimestamp! } type Mutation { createPost(input: CreatePostInput!): Post updatePost(input: UpdatePostInput!): Post deletePost(input: DeletePostInput!): Post } type Post { id: ID! author: String! title: String! content: String! url: AWSURL ups: Int downs: Int _version: Int _deleted: Boolean _lastChangedAt: AWSTimestamp! } type PostConnection implements Connection { items: [Post!]! nextToken: String startedAt: AWSTimestamp! } type Query { getPost(id: ID!): Post syncPosts(limit: Int, nextToken: String, lastSync: AWSTimestamp): PostConnection! } type Subscription { onCreatePost: Post @aws_subscribe(mutations: ["createPost"]) onUpdatePost: Post @aws_subscribe(mutations: ["updatePost"]) onDeletePost: Post @aws_subscribe(mutations: ["deletePost"]) } input DeletePostInput { id: ID! _version: Int! } input UpdatePostInput { id: ID! author: String title: String content: String url: String ups: Int downs: Int _version: Int! } schema { query: Query mutation: Mutation subscription: Subscription }

GraphQL 架构是标准的,但在继续之前需注意以下几点。首先,所有更改都将先自动写入基本 表,然后写入增量 表。基本 表是状态的可信中央来源,而 增量 表是您的日志。如果您不传入 lastSync: AWSTimestamp,当客户端的脱机时间长于您在增量 表中配置的 TTL 时间时,syncPosts 查询将针对基本 表运行,并将缓存以及定期运行结合作为边缘情形的全局捕获过程。如果您传入 lastSync: AWSTimestamp,则此 syncPosts 查询将针对增量 表运行并由客户端用于检索自它们上次脱机以后更改的事件。Amplify 客户端会自动传递该 lastSync: AWSTimestamp 值,并相应地保存到磁盘。

Post 上的 _deleted 字段用于 DELETE 操作。当客户端处于脱机状态并且从基本 表中删除记录时,此属性将通知客户端执行同步以移出其本地缓存中的项目。如果客户端脱机较长时间并且在客户端可以使用增量同步查询检索此值之前已删除该项目,则基本查询中的全局捕获事件(可在客户端中配置)将运行,并且会从缓存中删除该项目。此字段标记为可选,因为它仅在运行包含已删除项目的同步查询时返回值。

更改

对于所有更改,AWS AppSync 在基本 表中执行标准的 Create/Update/Delete 操作,并自动在增量 表中记录更改。您可以通过修改数据源上的 DeltaSyncTableTTL 值来减少或延长保留记录的时间。对于拥有高速数据的组织,短时间保留记录可能是有意义的。另外,如果您的客户端长时间处于脱机状态,则最好是将记录保留较长时间。

同步查询

基本查询 是一个没有指定 lastSync 值的 DynamoDB 同步操作。对于许多组织而言,这是有效的,因为基本查询仅在启动时运行,之后将定期运行。

增量查询 是指定 lastSync 值的 DynamoDB 同步操作。每当客户端从脱机状态恢复联机状态时,就会执行增量查询(只要基本查询周期时间未触发运行)。客户端会自动跟踪上次成功运行查询以同步数据的时间。

运行增量查询时,查询的解析程序使用 ds_pkds_sk,仅查询自客户端上次执行同步以来发生更改的记录。客户端将存储相应的 GraphQL 响应。

有关执行同步查询的详细信息,请参阅 同步操作文档

示例

让我们首先调用一个 createPost 更改来创建一个项目:

mutation create { createPost(input: {author: "Nadia", title: "My First Post", content: "Hello World"}) { id author title content _version _lastChangedAt _deleted } }

此更改的返回值如下所示:

{ "data": { "createPost": { "id": "81d36bbb-1579-4efe-92b8-2e3f679f628b", "author": "Nadia", "title": "My First Post", "content": "Hello World", "_version": 1, "_lastChangedAt": 1574469356331, "_deleted": null } } }

如果您检查基本 表的内容,将看到一条如下所示的记录:

{ "_lastChangedAt": { "N": "1574469356331" }, "_version": { "N": "1" }, "author": { "S": "Nadia" }, "content": { "S": "Hello World" }, "id": { "S": "81d36bbb-1579-4efe-92b8-2e3f679f628b" }, "title": { "S": "My First Post" } }

如果您检查增量 表的内容,将看到一条如下所示的记录:

{ "_lastChangedAt": { "N": "1574469356331" }, "_ttl": { "N": "1574472956" }, "_version": { "N": "1" }, "author": { "S": "Nadia" }, "content": { "S": "Hello World" }, "ds_pk": { "S": "AppSync-delta-sync-post:2019-11-23" }, "ds_sk": { "S": "00:35:56.331:81d36bbb-1579-4efe-92b8-2e3f679f628b:1" }, "id": { "S": "81d36bbb-1579-4efe-92b8-2e3f679f628b" }, "title": { "S": "My First Post" } }

现在我们可以模拟一个基本 查询,客户端将运行该查询来组合其本地数据存储,并使用如下所示的 syncPosts 查询:

query baseQuery { syncPosts(limit: 100, lastSync: null, nextToken: null) { items { id author title content _version _lastChangedAt } startedAt nextToken } }

基本 查询的返回值如下所示:

{ "data": { "syncPosts": { "items": [ { "id": "81d36bbb-1579-4efe-92b8-2e3f679f628b", "author": "Nadia", "title": "My First Post", "content": "Hello World", "_version": 1, "_lastChangedAt": 1574469356331 } ], "startedAt": 1574469602238, "nextToken": null } } }

我们稍后会保存 startedAt 值来模拟增量 查询,但首先我们需要对表进行更改。让我们使用 updatePost 更改来修改我们现有的文章:

mutation updatePost { updatePost(input: {id: "81d36bbb-1579-4efe-92b8-2e3f679f628b", _version: 1, title: "Actually this is my Second Post"}) { id author title content _version _lastChangedAt _deleted } }

此更改的返回值如下所示:

{ "data": { "updatePost": { "id": "81d36bbb-1579-4efe-92b8-2e3f679f628b", "author": "Nadia", "title": "Actually this is my Second Post", "content": "Hello World", "_version": 2, "_lastChangedAt": 1574469851417, "_deleted": null } } }

如果您现在检查基本 表的内容,则应该看到更新后的项目:

{ "_lastChangedAt": { "N": "1574469851417" }, "_version": { "N": "2" }, "author": { "S": "Nadia" }, "content": { "S": "Hello World" }, "id": { "S": "81d36bbb-1579-4efe-92b8-2e3f679f628b" }, "title": { "S": "Actually this is my Second Post" } }

如果您现在检查增量 表的内容,则应看到两条记录:

  1. 创建项目时的记录

  2. 项目更新时间的记录。

新项目将如下所示:

{ "_lastChangedAt": { "N": "1574469851417" }, "_ttl": { "N": "1574473451" }, "_version": { "N": "2" }, "author": { "S": "Nadia" }, "content": { "S": "Hello World" }, "ds_pk": { "S": "AppSync-delta-sync-post:2019-11-23" }, "ds_sk": { "S": "00:44:11.417:81d36bbb-1579-4efe-92b8-2e3f679f628b:2" }, "id": { "S": "81d36bbb-1579-4efe-92b8-2e3f679f628b" }, "title": { "S": "Actually this is my Second Post" } }

现在,我们可以模拟增量 查询来检索客户端脱机时发生的修改。我们将使用从基本 查询返回的 startedAt 值来发出请求:

query delta { syncPosts(limit: 100, lastSync: 1574469602238, nextToken: null) { items { id author title content _version } startedAt nextToken } }

增量 查询的返回值如下所示:

{ "data": { "syncPosts": { "items": [ { "id": "81d36bbb-1579-4efe-92b8-2e3f679f628b", "author": "Nadia", "title": "Actually this is my Second Post", "content": "Hello World", "_version": 2 } ], "startedAt": 1574470400808, "nextToken": null } } }