Amazon LambdaAmazon Neptune 函数示例 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon LambdaAmazon Neptune 函数示例

以下示例Amazon Lambda函数,用 Java,JavaScript 和 Python 编写,说明使用fold().coalesce().unfold()习惯用语。

每个函数中的大部分代码都是样板代码,负责管理连接并在出现错误时重试连接和查询。真正的应用程序逻辑和 Gemlin 查询在doQuery()query()方法。如果您使用这些示例作为自己的 Lambda 函数的基础,则可以专注于修改doQuery()query().

这些函数配置为重试失败的查询 5 次,在重试之间等待 1 秒钟。

这些函数要求值存在于以下 Lambda 环境变量中:

  • NEPTUNE_ENDPOINT— 您的 Neptune 数据库集群终端节点。

  • NEPTUNE_PORT — Neptune 端口。

  • USE_IAM — (true或者false)如果您的数据库具有Amazon Identity and Access Management(IAM) 数据库身份验证,请将USE_IAM环境变量true. 这将导致 Lambda 函数签名到 Neptune 的连接请求。对于此类 IAM DB 身份验证请求,请确保 Lambda 函数的执行角色附加了适当的 IAM 策略,该策略允许函数连接到您的 Neptune 数据库集群(请参阅自定义 IAM 策略)。

Amazon Neptune 的 Java Lambda 函数示例

以下是关于 Java 的一些要注意的事项Amazon Lambda函数:

  • Java 驱动程序维护其自己的连接池,您不需要这些连接池,因此请配置Cluster对象使用minConnectionPoolSize(1)maxConnectionPoolSize(1).

  • 这些区域有:Cluster对象的构建速度可能很慢,因为它会创建一个或多个序列化器(默认情况下,Gyro 还有另一个序列化器,如果您已将其配置为其他输出格式,例如binary)。这些可能需要一段时间来实例化。

  • 连接池将使用第一个请求进行初始化。此时,驱动程序将设置Netty堆栈,分配字节缓冲区,并创建签名密钥(如果您使用 IAM DB 身份验证)。所有这些都会增加冷启动延迟。

  • Java 驱动程序的连接池监视服务器主机的可用性,并在连接失败时自动尝试重新连接。它启动后台任务以尝试重新建立连接。使用reconnectInterval( )配置重新连接尝试之间的间隔。当驱动程序尝试重新连接时,Lambda 函数可以简单地重试查询。

    如果重试之间的间隔小于重新连接尝试之间的间隔,则由于主机被视为不可用,故障连接上的重试会再次失败。这不适用于ConcurrentModificationException.

  • 使用 Java 8 而不是 Java 11。Netty优化在 Java 11 中默认情况下不会启用。

  • 此示例使用Ry4j以进行重试。

  • 使用Sigv4签名驱动程序,请参阅使用 Java 和 Gremlin 及签名版本 4 签名连接到 Neptune.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { builder = builder.channelizer(SigV4WebSocketChannelizer.class); } return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder() .retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if (message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

如果要在函数中包含重新连接逻辑,请参阅Java 重新连接示例.

Amazon Neptune 的 JavaScript Lambda 函数示例

有关此示例的说明

  • JavaScript 驱动程序不会维护连接池。它始终打开单个连接。

  • 示例函数使用 Sigv4 签名实用程序来自灰色林-aws-sigv4用于对启用 IAM 身份验证的数据库的请求进行签名。

  • 它使用重试 ()函数来自开源异步实用程序模块来处理退出和重试尝试。

  • 格雷姆林终端步骤返回一个 JavaScriptpromise(请参阅TinkerPop 文档)。适用于next(),这是一个{value, done}元组。

  • 连接错误在处理程序中引发,并根据此处概述的建议使用一些退出和重试逻辑进行处理,但有一个例外。有一种连接问题,驱动程序不会作为异常处理,因此无法通过此退出和重试逻辑来解决这种问题。

    问题是,如果在驱动程序发送请求后但在驱动程序收到响应之前关闭连接,则查询似乎已完成,但返回空值。就 lambda 函数客户端而言,该函数似乎已成功完成,但响应为空。

    此问题的影响取决于您的应用程序如何处理空响应。某些应用程序可能会将来自读取请求的空响应视为错误,但其他应用程序可能会错误地将其视为空结果。

    遇到此连接问题的写入请求也将返回空响应。成功调用带空响应的信号是成功还是失败? 如果调用写函数的客户端只是将函数的成功调用视为表示已提交对数据库的写入操作,而不是检查响应的正文,则系统可能会显示丢失数据。

    此问题源于驱动程序如何处理底层套接字发出的事件。当底层网络套接字用ECONNRESET错误时,驱动程序使用的 WebSocket 将关闭并发出'ws close'event. 但是,驱动程序中没有任何内容可用于引发异常的方式来处理该事件。因此,查询只是消失。

    要解决该问题,此处的示例 lambda 函数在此处添加了'ws close'事件处理程序,它会在创建远程连接时向驱动程序抛出异常。但是,这个异常不会沿着 Gemlin 查询的请求-响应路径引发,因此不能用于触发 lambda 函数本身内的任何退出和重试逻辑。相反,由'ws close'事件处理程序会导致未处理的异常,导致 lambda 调用失败。这允许调用函数的客户端处理错误,并在适当的情况下重试 lambda 调用。

    我们建议您在 lambda 函数本身中实现回退和重试逻辑,以保护您的客户端免受间歇性连接问题的影响。但是,上述问题的解决方法要求客户端也实现重试逻辑,以处理由此特定连接问题导致的故障。

JavaScript 代码

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Amazon Neptune 的 Python Lambda 函数示例

以下是关于以下 Python 的一些事项Amazon Lambda示例函数:

  • 它使用退避模块.

  • 它设置pool_size=1以防止创建不必要的连接池。

  • 它设置message_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from tornado.websocket import WebSocketClosedError from tornado import httpclient from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [WebSocketClosedError, OSError] retriable_errors = [GremlinServerError] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return httpclient.HTTPRequest(database_url, headers=request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) print('error: [{}] {}'.format(type(e), err_msg)) print('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) print('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): return doQuery(event) def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): print('Creating remote connection') return DriverRemoteConnection( connection_string(), 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0()) def connection_string(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return database_url conn = create_remote_connection() g = create_graph_traversal_source(conn)

以下是示例结果:

显示 Python Lambda 函数示例中的示例结果的图。