教程:Lambda 解析器 - Amazon AppSync
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:Lambda 解析器

您可以将 Amazon Lambda 与 Amazon AppSync 一起使用以解析任何 GraphQL 字段。例如,GraphQL 查询可能会向 Amazon Relational Database Service (Amazon RDS) 实例发送调用,而 GraphQL 变更可能会写入到 Amazon Kinesis 流。在本节中,我们说明了如何编写 Lambda 函数,以根据 GraphQL 字段操作调用执行业务逻辑。

创建 Lambda 函数

以下示例显示了一个使用 Node.js(运行时环境:Node.js 18.x)编写的 Lambda 函数,该函数对博客文章应用程序包含的博客文章执行各种操作。请注意,代码应保存在具有 .mis 扩展名的文件中。

export const handler = async (event) => { console.log('Received event {}', JSON.stringify(event, 3)) const posts = { 1: { id: '1', title: 'First book', author: 'Author1', url: 'https://amazon.com/', content: 'SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1', ups: '100', downs: '10', }, 2: { id: '2', title: 'Second book', author: 'Author2', url: 'https://amazon.com', content: 'SAMPLE TEXT AUTHOR 2 SAMPLE TEXT AUTHOR 2 SAMPLE TEXT', ups: '100', downs: '10', }, 3: { id: '3', title: 'Third book', author: 'Author3', url: null, content: null, ups: null, downs: null }, 4: { id: '4', title: 'Fourth book', author: 'Author4', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4', ups: '1000', downs: '0', }, 5: { id: '5', title: 'Fifth book', author: 'Author5', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT', ups: '50', downs: '0', }, } const relatedPosts = { 1: [posts['4']], 2: [posts['3'], posts['5']], 3: [posts['2'], posts['1']], 4: [posts['2'], posts['1']], 5: [], } console.log('Got an Invoke Request.') let result switch (event.field) { case 'getPost': return posts[event.arguments.id] case 'allPosts': return Object.values(posts) case 'addPost': // return the arguments back return event.arguments case 'addPostErrorWithData': result = posts[event.arguments.id] // attached additional error information to the post result.errorMessage = 'Error with the mutation, data has changed' result.errorType = 'MUTATION_ERROR' return result case 'relatedPosts': return relatedPosts[event.source.id] default: throw new Error('Unknown field, unable to resolve ' + event.field) } }

该 Lambda 函数按 ID 检索文章,添加文章,检索文章列表以及获取给定文章的相关文章。

注意

Lambda 函数对 event.field 执行 switch 语句以确定当前解析的字段。

使用 Amazon 管理控制台创建该 Lambda 函数。

为 Lambda 配置数据源

在创建 Lambda 函数后,在 Amazon AppSync 控制台中导航到您的 GraphQL API,然后选择数据源选项卡。

选择创建数据源,输入友好的数据源名称(例如 Lambda),然后为数据源类型选择 Amazon Lambda 函数。对于区域,选择与您的函数相同的区域。对于函数 ARN,选择您的 Lambda 函数的 Amazon 资源名称 (ARN)。

在选择您的 Lambda 函数后,您可以创建新的 Amazon Identity and Access Management (IAM) 角色(Amazon AppSync 为其分配相应的权限),或选择具有以下内联策略的现有角色:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": "arn:aws:lambda:REGION:ACCOUNTNUMBER:function/LAMBDA_FUNCTION" } ] }

您还必须为 IAM 角色建立与 Amazon AppSync 信任关系,如下所示:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "appsync.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

创建 GraphQL 架构

数据源现已连接到您的 Lambda 函数,请创建 GraphQL 架构。

从 Amazon AppSync 控制台的架构编辑器中,确保您的架构与以下架构匹配:

schema { query: Query mutation: Mutation } type Query { getPost(id:ID!): Post allPosts: [Post] } type Mutation { addPost(id: ID!, author: String!, title: String, content: String, url: String): Post! } type Post { id: ID! author: String! title: String content: String url: String ups: Int downs: Int relatedPosts: [Post] }

配置解析器

您现已注册了 Lambda 数据源和有效的 GraphQL 架构,您可以使用解析器将 GraphQL 字段连接到 Lambda 数据源。

您将创建一个解析器,它使用 Amazon AppSync JavaScript (APPSYNC_JS) 运行时环境并与您的 Lambda 函数交互。要了解使用 JavaScript 编写 Amazon AppSync 解析器和函数的更多信息,请参阅解析器和函数的 JavaScript 运行时功能

有关 Lambda 映射模板的更多信息,请参阅 Lambda 的 JavaScript 解析器函数参考

在该步骤中,您将一个解析器附加到以下字段的 Lambda 函数:getPost(id:ID!): PostallPosts: [Post]addPost(id: ID!, author: String!, title: String, content: String, url: String): Post!Post.relatedPosts: [Post]。从 Amazon AppSync 控制台的架构编辑器中,在解析器窗格中选择 getPost(id:ID!): Post 字段旁边的附加。选择您的 Lambda 数据源。接下来,提供以下代码:

import { util } from '@aws-appsync/utils'; export function request(ctx) { const {source, args} = ctx return { operation: 'Invoke', payload: { field: ctx.info.fieldName, arguments: args, source }, }; } export function response(ctx) { return ctx.result; }

在调用 Lambda 函数时,该解析器代码将字段名称、参数列表以及有关源对象的上下文传递给该函数。选择保存

您已成功附加了您的首个解析器。对于其余字段,重复该操作。

测试您的 GraphQL API

现在您的 Lambda 函数已与 GraphQL 解析器连接,您可以使用控制台或客户端应用程序运行一些变更和查询。

在 Amazon AppSync 控制台左侧,选择查询,然后粘贴以下代码:

addPost 变更

mutation AddPost { addPost( id: 6 author: "Author6" title: "Sixth book" url: "https://www.amazon.com/" content: "This is the book is a tutorial for using GraphQL with Amazon AppSync." ) { id author title content url ups downs } }

getPost 查询

query GetPost { getPost(id: "2") { id author title content url ups downs } }

allPosts 查询

query AllPosts { allPosts { id author title content url ups downs relatedPosts { id title } } }

返回错误

解析任何给定的字段可能会导致错误。在使用 Amazon AppSync 时,您可以从以下源中引发错误:

  • 解析器响应处理程序

  • Lambda 函数

从解析器响应处理程序中

要故意引发错误,您可以使用 util.error 实用程序方法。它将 errorMessageerrorType 和可选的 data 值作为参数。出现错误后,data 对于将额外的数据返回客户端很有用。在 GraphQL 最终响应中,data 对象将添加到 errors

以下示例说明了如何在 Post.relatedPosts: [Post] 解析器响应处理程序中使用该方法。

// the Post.relatedPosts response handler export function response(ctx) { util.error("Failed to fetch relatedPosts", "LambdaFailure", ctx.result) return ctx.result; }

这将生成与以下内容类似的 GraphQL 响应:

{ "data": { "allPosts": [ { "id": "2", "title": "Second book", "relatedPosts": null }, ... ] }, "errors": [ { "path": [ "allPosts", 0, "relatedPosts" ], "errorType": "LambdaFailure", "locations": [ { "line": 5, "column": 5 } ], "message": "Failed to fetch relatedPosts", "data": [ { "id": "2", "title": "Second book" }, { "id": "1", "title": "First book" } ] } ] }

错误导致 allPosts[0].relatedPostsnull,而 errorMessageerrorTypedata 体现在 data.errors[0] 对象中。

从 Lambda 函数

Amazon AppSync 还可以识别 Lambda 函数引发的错误。Lambda 编程模型允许您引发处理的 错误。如果 Lambda 函数引发错误,则 Amazon AppSync 无法解析当前字段。仅在响应中设置从 Lambda 返回的错误消息。目前,您无法通过从 Lambda 函数中引发错误,将任何无关数据传回到客户端。

注意

如果您的 Lambda 函数引发未处理的 错误,Amazon AppSync 将使用 Lambda 设置的错误消息。

以下 Lambda 函数会引发错误:

export const handler = async (event) => { console.log('Received event {}', JSON.stringify(event, 3)) throw new Error('I always fail.') }

在您的响应处理程序中收到错误。您可以使用 util.appendError 将错误附加到 GraphQL 的响应,以将其发回到响应。为此,请将您的 Amazon AppSync 函数响应处理程序更改为:

// the lambdaInvoke response handler export function response(ctx) { const { error, result } = ctx; if (error) { util.appendError(error.message, error.type, result); } return result; }

这将返回与以下内容类似的 GraphQL 响应:

{ "data": { "allPosts": null }, "errors": [ { "path": [ "allPosts" ], "data": null, "errorType": "Lambda:Unhandled", "errorInfo": null, "locations": [ { "line": 2, "column": 3, "sourceName": null } ], "message": "I fail. always" } ] }

高级使用案例:批处理

该示例中的 Lambda 函数具有一个 relatedPosts 字段,它返回给定文章的相关文章列表。在示例查询中,从 Lambda 函数中调用 allPosts 字段将返回 5 篇文章。由于我们指定还希望为每个返回的文章解析 relatedPosts,因此,将 relatedPosts 字段操作调用 5 次。

query { allPosts { // 1 Lambda invocation - yields 5 Posts id author title content url ups downs relatedPosts { // 5 Lambda invocations - each yields 5 posts id title } } }

虽然这在该特定示例中听起来可能并不严重,但这种累积的过度获取可能会迅速降低应用程序的性能。

如果您要针对同一查询中返回的相关 Posts 再次提取 relatedPosts,调用数量将显著增加。

query { allPosts { // 1 Lambda invocation - yields 5 Posts id author title content url ups downs relatedPosts { // 5 Lambda invocations - each yield 5 posts = 5 x 5 Posts id title relatedPosts { // 5 x 5 Lambda invocations - each yield 5 posts = 25 x 5 Posts id title author } } } }

在这个相对简单的查询中,Amazon AppSync 将调用 Lambda 函数 1 + 5 + 25 = 31 次。

这是相当常见的挑战,常被称为 N+1 问题(在本例中 N = 5),会导致延迟增加,以及应用程序费用提升。

我们解决此问题的方式是批处理类似的字段解析器请求。在该示例中,Lambda 函数可能会解析给定批次的文章的相关文章列表,而不是让 Lambda 函数解析单个给定文章的相关文章列表。

为了说明这一点,让我们更新 relatedPosts 的解析器以进行批处理。

import { util } from '@aws-appsync/utils'; export function request(ctx) { const {source, args} = ctx return { operation: ctx.info.fieldName === 'relatedPosts' ? 'BatchInvoke' : 'Invoke', payload: { field: ctx.info.fieldName, arguments: args, source }, }; } export function response(ctx) { const { error, result } = ctx; if (error) { util.appendError(error.message, error.type, result); } return result; }

现在,在解析的 fieldNamerelatedPosts 时,代码将操作从 Invoke 更改为 BatchInvoke。现在,在配置批处理部分中为函数启用批处理。将最大批处理大小设置为 5。选择保存

通过进行该更改,在解析 relatedPosts 时,Lambda 函数收到以下内容以作为输入:

[ { "field": "relatedPosts", "source": { "id": 1 } }, { "field": "relatedPosts", "source": { "id": 2 } }, ... ]

如果在请求中指定了 BatchInvoke,Lambda 函数将收到请求列表并返回结果列表。

具体来说,结果列表必须与请求负载条目的大小和顺序匹配,以使 Amazon AppSync 可以相应地匹配结果。

在该批处理示例中,Lambda 函数返回一批结果,如下所示:

[ [{"id":"2","title":"Second book"}, {"id":"3","title":"Third book"}], // relatedPosts for id=1 [{"id":"3","title":"Third book"}] // relatedPosts for id=2 ]

您可以更新 Lambda 代码以处理 relatedPosts 批处理:

export const handler = async (event) => { console.log('Received event {}', JSON.stringify(event, 3)) //throw new Error('I fail. always') const posts = { 1: { id: '1', title: 'First book', author: 'Author1', url: 'https://amazon.com/', content: 'SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1', ups: '100', downs: '10', }, 2: { id: '2', title: 'Second book', author: 'Author2', url: 'https://amazon.com', content: 'SAMPLE TEXT AUTHOR 2 SAMPLE TEXT AUTHOR 2 SAMPLE TEXT', ups: '100', downs: '10', }, 3: { id: '3', title: 'Third book', author: 'Author3', url: null, content: null, ups: null, downs: null }, 4: { id: '4', title: 'Fourth book', author: 'Author4', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4', ups: '1000', downs: '0', }, 5: { id: '5', title: 'Fifth book', author: 'Author5', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT', ups: '50', downs: '0', }, } const relatedPosts = { 1: [posts['4']], 2: [posts['3'], posts['5']], 3: [posts['2'], posts['1']], 4: [posts['2'], posts['1']], 5: [], } if (!event.field && event.length){ console.log(`Got a BatchInvoke Request. The payload has ${event.length} items to resolve.`); return event.map(e => relatedPosts[e.source.id]) } console.log('Got an Invoke Request.') let result switch (event.field) { case 'getPost': return posts[event.arguments.id] case 'allPosts': return Object.values(posts) case 'addPost': // return the arguments back return event.arguments case 'addPostErrorWithData': result = posts[event.arguments.id] // attached additional error information to the post result.errorMessage = 'Error with the mutation, data has changed' result.errorType = 'MUTATION_ERROR' return result case 'relatedPosts': return relatedPosts[event.source.id] default: throw new Error('Unknown field, unable to resolve ' + event.field) } }

返回单个错误

以前的示例表明,可以从 Lambda 函数中返回单个错误,或者从响应处理程序中引发错误。对于批处理调用,从 Lambda 函数中引发错误会将整个批次标记为失败。对于发生不可恢复错误的特定场景(例如,到数据存储的连接失败),这可能是可以接受的。不过,如果批次中的某些项目成功,而其他项目失败,则可能会同时返回错误和有效的数据。由于 Amazon AppSync 要求批处理响应列出与批次的原始大小匹配的元素,因此,您必须定义一个可以区分有效数据和错误的数据结构。

例如,如果 Lambda 函数预计返回一批相关文章,您可以选择返回 Response 对象列表,其中每个对象具有可选的 dataerrorMessageerrorType 字段。如果出现 errorMessage 字段,则表示出现错误。

以下代码说明了如何更新 Lambda 函数:

export const handler = async (event) => { console.log('Received event {}', JSON.stringify(event, 3)) // throw new Error('I fail. always') const posts = { 1: { id: '1', title: 'First book', author: 'Author1', url: 'https://amazon.com/', content: 'SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1 SAMPLE TEXT AUTHOR 1', ups: '100', downs: '10', }, 2: { id: '2', title: 'Second book', author: 'Author2', url: 'https://amazon.com', content: 'SAMPLE TEXT AUTHOR 2 SAMPLE TEXT AUTHOR 2 SAMPLE TEXT', ups: '100', downs: '10', }, 3: { id: '3', title: 'Third book', author: 'Author3', url: null, content: null, ups: null, downs: null }, 4: { id: '4', title: 'Fourth book', author: 'Author4', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4 SAMPLE TEXT AUTHOR 4', ups: '1000', downs: '0', }, 5: { id: '5', title: 'Fifth book', author: 'Author5', url: 'https://www.amazon.com/', content: 'SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT AUTHOR 5 SAMPLE TEXT', ups: '50', downs: '0', }, } const relatedPosts = { 1: [posts['4']], 2: [posts['3'], posts['5']], 3: [posts['2'], posts['1']], 4: [posts['2'], posts['1']], 5: [], } if (!event.field && event.length){ console.log(`Got a BatchInvoke Request. The payload has ${event.length} items to resolve.`); return event.map(e => { // return an error for post 2 if (e.source.id === '2') { return { 'data': null, 'errorMessage': 'Error Happened', 'errorType': 'ERROR' } } return {data: relatedPosts[e.source.id]} }) } console.log('Got an Invoke Request.') let result switch (event.field) { case 'getPost': return posts[event.arguments.id] case 'allPosts': return Object.values(posts) case 'addPost': // return the arguments back return event.arguments case 'addPostErrorWithData': result = posts[event.arguments.id] // attached additional error information to the post result.errorMessage = 'Error with the mutation, data has changed' result.errorType = 'MUTATION_ERROR' return result case 'relatedPosts': return relatedPosts[event.source.id] default: throw new Error('Unknown field, unable to resolve ' + event.field) } }

更新 relatedPosts 解析器代码:

import { util } from '@aws-appsync/utils'; export function request(ctx) { const {source, args} = ctx return { operation: ctx.info.fieldName === 'relatedPosts' ? 'BatchInvoke' : 'Invoke', payload: { field: ctx.info.fieldName, arguments: args, source }, }; } export function response(ctx) { const { error, result } = ctx; if (error) { util.appendError(error.message, error.type, result); } else if (result.errorMessage) { util.appendError(result.errorMessage, result.errorType, result.data) } else if (ctx.info.fieldName === 'relatedPosts') { return result.data } else { return result } }

响应处理程序现在检查 Lambda 函数在 Invoke 操作中返回的错误,检查 BatchInvoke 操作为各个项目返回的错误,最后检查 fieldName。对于 relatedPosts,该函数返回 result.data。对于所有其他字段,该函数仅返回 result。例如,请参阅下面的查询:

query AllPosts { allPosts { id title content url ups downs relatedPosts { id } author } }

该查询返回类似下面的 GraphQL 响应:

{ "data": { "allPosts": [ { "id": "1", "relatedPosts": [ { "id": "4" } ] }, { "id": "2", "relatedPosts": null }, { "id": "3", "relatedPosts": [ { "id": "2" }, { "id": "1" } ] }, { "id": "4", "relatedPosts": [ { "id": "2" }, { "id": "1" } ] }, { "id": "5", "relatedPosts": [] } ] }, "errors": [ { "path": [ "allPosts", 1, "relatedPosts" ], "data": null, "errorType": "ERROR", "errorInfo": null, "locations": [ { "line": 4, "column": 5, "sourceName": null } ], "message": "Error Happened" } ] }

配置最大批处理大小

要配置解析器上的最大批处理大小,请在 Amazon Command Line Interface (Amazon CLI) 中使用以下命令:

$ aws appsync create-resolver --api-id <api-id> --type-name Query --field-name relatedPosts \ --code "<code-goes-here>" \ --runtime name=APPSYNC_JS,runtimeVersion=1.0.0 \ --data-source-name "<lambda-datasource>" \ --max-batch-size X
注意

在提供请求映射模板时,您必须使用 BatchInvoke 操作才能使用批处理。