Amazon Neptune 的 Amazon Lambda 函数示例 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Neptune 的 Amazon Lambda 函数示例

以下示例 Amazon Lambda 函数是用 Java、JavaScript 和 Python 编写的,说明了使用 fold().coalesce().unfold() 习惯用法用随机生成的 ID 对单个顶点进行更新插入。

每个函数中的大部分代码是样板代码,负责管理连接,并在出现错误时重试连接和查询。实际的应用程序逻辑和 Gremlin 查询分别在 doQuery()query() 方法中实现。如果您使用这些示例作为自己的 Lambda 函数的基础,则可以专注于修改 doQuery()query()

这些函数配置为重试失败的查询 5 次,两次重试之间等待 1 秒钟。

这些函数要求值必须存在于以下 Lambda 环境变量中:

  • NEPTUNE_ENDPOINT – 您的 Neptune 数据库集群端点。对于 Python 来说,这应该是 neptuneEndpoint

  • NEPTUNE_PORT – Neptune 端口。对于 Python 来说,这应该是 neptunePort

  • USE_IAM –(truefalse)如果您的数据库启用了 Amazon Identity and Access Management (IAM) 数据库身份验证,请将 USE_IAM 环境变量设置为 true。这会导致 Lambda 函数对指向 Neptune 的连接请求进行 Sigv4 签名。对于此类 IAM 数据库身份验证请求,请确保 Lambda 函数的执行角色附加了相应的 IAM policy,允许该函数连接到您的 Neptune 数据库集群(请参阅IAM policy 的类型)。

Amazon Neptune 的 Java Lambda 函数示例

以下是使用 Java Amazon Lambda 函数时要记住的一些事项:

  • Java 驱动程序会维护其自己的连接池,而您不需要该连接池,因此请使用 minConnectionPoolSize(1)maxConnectionPoolSize(1) 配置您的 Cluster 对象。

  • Cluster 对象的构建速度可能很慢,因为它会创建一个或多个序列化器 [默认情况下为 Gyro,如果您已将其配置为其它输出格式(例如 binary),则会创建另一个序列化器]。这些可能需要一段时间才能实例化。

  • 连接池使用第一个请求进行初始化。此时,如果您使用的是 IAM 数据库身份验证,驱动程序会设置 Netty 堆栈、分配字节缓冲区并创建签名密钥。所有这些都可能增加冷启动延迟。

  • Java 驱动程序的连接池监控服务器主机的可用性,并在连接失败时自动尝试重新连接。它会启动后台任务以尝试重新建立连接。使用 reconnectInterval( ) 配置尝试重新连接的间隔。当驱动程序尝试重新连接时,您的 Lambda 函数只需重试查询即可。

    如果重试之间的间隔小于重新连接尝试之间的间隔,则在连接失败时重试会再次失败,因为主机被认为不可用。这不适用于在引发 ConcurrentModificationException 时重试。

  • 使用 Java 8 而不是 Java 11。在 Java 11 中,默认情况下不启用 Netty 优化。

  • 此示例使用 Retry4j 进行重试。

  • 要在 Java Lambda 函数中使用 Sigv4 签名驱动程序,请参阅使用 Java 和 Gremlin 及签名版本 4 签名连接到 Neptune中的依赖项要求。

警告

来自 Retry4j 的 CallExecutor 可能不是线程安全的。考虑让每个线程使用自己的 CallExecutor 实例。

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

如果要在函数中加入重新连接逻辑,请参阅Java 重新连接示例

Amazon Neptune 的 JavaScript Lambda 函数示例

有关此示例的注意事项
  • JavaScript 驱动程序不维护连接池。它总是打开单个连接。

  • 该示例函数使用 gremlin-aws-sigv4 中的 Sigv4 签名实用程序,对指向已启用 IAM 身份验证的数据库的请求进行签名。

  • 它使用开源 async 实用程序模块中的 retry( ) 函数来处理回退和重试尝试。

  • Gremlin 终端步骤会返回 JavaScript promise(请参阅 TinkerPop 文档)。对于 next(),这是一个 {value, done} 元组。

  • 连接错误是在处理程序内部引发的,并根据此处概述的建议使用一些回退和重试逻辑进行处理,但有一个异常。有一种连接问题,驱动程序不会将其视为异常,因此这种回退和重试逻辑无法解决这个问题。

    问题在于,如果在驱动程序发送请求之后但在驱动程序收到响应之前关闭连接,则查询似乎已完成,但返回 null 值。就 lambda 函数客户端而言,该函数似乎成功完成,但响应为空。

    此问题的影响取决于您的应用程序如何处理空响应。有些应用程序可能会将读取请求中的空响应视为错误,但其它应用程序可能会错误地将其视为空结果。

    遇到此连接问题的写入请求也将返回空响应。响应为空的成功调用是表示成功还是失败? 如果调用写入函数的客户端只是将成功调用该函数视为已提交对数据库的写入,而不检查响应的正文,则系统可能会丢失数据。

    此问题源于驱动程序如何处理由底层套接字发出的事件。当底层网络套接字因 ECONNRESET 错误关闭时,驱动程序使用的 WebSocket 将关闭并发出 'ws close' 事件。然而,驱动程序中没有任何内容可通过一种用于引发异常的方式来处理该事件。结果,查询就消失了。

    为了解决此问题,此处的示例 lambda 函数添加了一个 'ws close' 事件处理程序,用于在创建远程连接时会向驱动程序引发异常。但是,这个异常不是沿着 Gremlin 查询的请求-响应路径引发的,因此不能用来触发 lambda 函数本身中的任何回退和重试逻辑。相反,'ws close' 事件处理程序引发的异常会导致未处理的异常,从而导致 lambda 调用失败。这允许调用该函数的客户端处理错误,并在适当时重试 lambda 调用。

    我们建议您在 lambda 函数本身中实现回退和重试逻辑,以保护您的客户端免受间歇性连接问题的影响。但是,上述问题的解决方法也要求客户端实现重试逻辑,以处理由此特定连接问题导致的失败。

Javascript 代码

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Amazon Neptune 的 Python Lambda 函数示例

以下是关于以下 Python Amazon Lambda 示例函数的一些注意事项:

  • 它使用回退模块

  • 它设置 pool_size=1 以防止创建不必要的连接池。

  • 它设置 message_serializer=serializer.GraphSONSerializersV2d0()

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

以下是示例结果,显示了重负载和轻负载的交替时期:

该图显示了 Python Lambda 函数示例的示例结果。