使用 openCypher 和 Bolt 的 Neptune 最佳实践 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 openCypher 和 Bolt 的 Neptune 最佳实践

将 openCypher 查询语言和 Bolt 协议与 Neptune 结合使用时,请遵循以下最佳实践。有关在 Neptune 中使用 openCypher 的信息,请参阅使用 openCypher 访问 Neptune 图形

在查询中首选定向边缘而非双向边缘

当 Neptune 执行查询优化时,双向边缘会使创建最佳查询计划变得困难。次优计划要求引擎执行不必要的工作,从而导致性能降低。

因此,请尽可能使用定向边缘而不是双向边缘。例如,使用:

MATCH p=(:airport {code: 'ANC'})-[:route]->(d) RETURN p)

而不是:

MATCH p=(:airport {code: 'ANC'})-[:route]-(d) RETURN p)

大多数数据模型实际上不需要在两个方向上遍历边缘,因此,通过切换到使用定向边缘,查询可以显著提高性能。

如果您的数据模型确实需要遍历双向边缘,请将 MATCH 模式中的第一个节点(左侧)设置为筛选限制最严的节点。

例如,“为我找到往返 ANC 机场的所有 routes”。编写这个查询以从 ANC 机场开始,如下所示:

MATCH p=(src:airport {code: 'ANC'})-[:route]-(d) RETURN p

引擎可以执行最少的工作量来满足查询,因为受限制最严的节点放置为模式中的第一个节点(左侧)。然后,引擎可以优化查询。

这比在模式末尾筛选 ANC 机场要好得多,如下所示:

MATCH p=(d)-[:route]-(src:airport {code: 'ANC'}) RETURN p

当受限制最严的节点没有放在模式中的首位时,引擎必须执行额外的工作,因为它无法优化查询,必须执行额外的查找才能得出结果。

Neptune 不支持在一个事务中进行多个并发查询

尽管 Bolt 驱动程序本身允许在事务中进行并发查询,但 Neptune 不支持在一个事务中并发运行多个查询。相反,Neptune 要求一个事务中的多个查询按顺序运行,并且在启动下一个查询之前完全消耗掉每个查询的结果。

以下示例显示了如何使用 Bolt 在一个事务中按顺序运行多个查询,以便在下一个查询开始之前完全消耗掉每个查询的结果:

final String query = "MATCH (n) RETURN n"; try (Driver driver = getDriver(HOST_BOLT, getDefaultConfig())) { try (Session session = driver.session(readSessionConfig)) { try (Transaction trx = session.beginTransaction()) { final Result res_1 = trx.run(query); Assert.assertEquals(10000, res_1.list().size()); final Result res_2 = trx.run(query); Assert.assertEquals(10000, res_2.list().size()); } } }

在失效转移后创建新连接

在失效转移的情况下,Bolt 驱动程序可以继续连接到旧的写入器实例,而不是新的活动写入器实例,因为 DNS 名称已解析为特定的 IP 地址。

为防止出现这种情况,请在进行任何失效转移后关闭 Driver 对象,然后重新连接该对象。

长寿命应用程序的连接处理

在构建长寿命的应用程序(例如,在容器内或 Amazon EC2 实例上运行的应用程序)时,只需实例化 Driver 对象一次,然后在应用程序的生命周期内重用该对象。Driver 对象是线程安全的,并且将其初始化的开销非常大。

Amazon Lambda 的连接处理

不建议在 Amazon Lambda 函数中使用 Bolt 驱动程序,因为它们具有连接开销和管理要求。请改用 HTTPS 端点

完成后关闭驱动程序对象

在完成对客户端的操作后,务必关闭客户端,以便服务器关闭 Bolt 连接并释放与连接关联的所有资源。如果您使用 driver.close() 关闭驱动程序,则会自动发生这种情况。

如果驱动程序未正确关闭,Neptune 会在 20 分钟后终止所有空闲的 Bolt 连接,或者,如果您使用的是 IAM 身份验证,则会在 10 天后终止所有空闲的 Bolt 连接。

Neptune 支持的并发 Bolt 连接不超过 1000 个。如果您在使用完连接后没有显式关闭连接,并且实时连接的数量达到了 1000 的限制,则任何新的连接尝试都会失败。

使用显式事务模式进行读写

在将事务与 Neptune 和 Bolt 驱动程序结合使用时,最好将读取和写入事务的访问模式显式设置为正确的设置。

只读事务

对于只读事务,如果您在构建会话时没有传入适当的访问模式配置,则使用默认的隔离级别,即突变查询隔离。因此,对于只读事务来说,将访问模式显式设置为 read 非常重要。

自动提交读取事务示例:

SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.READ) .build(); Session session = driver.session(sessionConfig); try { (Add your application code here) } catch (final Exception e) { throw e; } finally { driver.close() }

读取事务示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withDefaultAccessMode(AccessMode.READ) .build(); driver.session(sessionConfig).readTransaction( new TransactionWork<List<String>>() { @Override public List<String> execute(org.neo4j.driver.Transaction tx) { (Add your application code here) } } );

在这两种情况下,都使用 Neptune 只读事务语义实现 SNAPSHOT 隔离

由于只读副本仅接受只读查询,因此提交到只读副本的任何查询都在 SNAPSHOT 隔离语义下运行。

只读事务没有脏读或不可重复读取。

只读事务

对于突变查询,有三种不同的机制可以创建写入事务,每种机制如下所示:

隐式写入事务示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withDefaultAccessMode(AccessMode.WRITE) .build(); driver.session(sessionConfig).writeTransaction( new TransactionWork<List<String>>() { @Override public List<String> execute(org.neo4j.driver.Transaction tx) { (Add your application code here) } } );

自动提交写入事务示例:

SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.Write) .build(); Session session = driver.session(sessionConfig); try { (Add your application code here) } catch (final Exception e) { throw e; } finally { driver.close() }

显式写入事务示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.WRITE) .build(); Transaction beginWriteTransaction = driver.session(sessionConfig).beginTransaction(); (Add your application code here) beginWriteTransaction.commit(); driver.close();
写入事务的隔离级别
  • 作为突变查询的一部分进行的读取是在 READ COMMITTED 事务隔离下运行的。

  • 对于作为突变查询一部分进行的读取,没有脏读。

  • 在突变查询中读取时,记录和记录范围会被锁定。

  • 当突变事务已读取索引范围时,可以强力保证在读取结束之前,任何并发事务都不会修改该范围。

突变查询不是线程安全的。

有关冲突,请参阅使用锁定等待超时解决冲突

突变查询失败时不会自动重试。

异常的重试逻辑

对于所有允许重试的异常,通常最好使用指数回退和重试策略,在两次重试之间提供逐渐延长的等待时间,以便更好地处理诸如 ConcurrentModificationException 错误等临时问题。下面显示指数回退和重试模式的示例:

public static void main() { try (Driver driver = getDriver(HOST_BOLT, getDefaultConfig())) { retriableOperation(driver, "CREATE (n {prop:'1'})") .withRetries(5) .withExponentialBackoff(true) .maxWaitTimeInMilliSec(500) .call(); } } protected RetryableWrapper retriableOperation(final Driver driver, final String query){ return new RetryableWrapper<Void>() { @Override public Void submit() { log.info("Performing graph Operation in a retry manner......"); try (Session session = driver.session(writeSessionConfig)) { try (Transaction trx = session.beginTransaction()) { trx.run(query).consume(); trx.commit(); } } return null; } @Override public boolean isRetryable(Exception e) { if (isCME(e)) { log.debug("Retrying on exception.... {}", e); return true; } return false; } private boolean isCME(Exception ex) { return ex.getMessage().contains("Operation failed due to conflicting concurrent operations"); } }; } /** * Wrapper which can retry on certain condition. Client can retry operation using this class. */ @Log4j2 @Getter public abstract class RetryableWrapper<T> { private long retries = 5; private long maxWaitTimeInSec = 1; private boolean exponentialBackoff = true; /** * Override the method with custom implementation, which will be called in retryable block. */ public abstract T submit() throws Exception; /** * Override with custom logic, on which exception to retry with. */ public abstract boolean isRetryable(final Exception e); /** * Define the number of retries. * * @param retries -no of retries. */ public RetryableWrapper<T> withRetries(final long retries) { this.retries = retries; return this; } /** * Max wait time before making the next call. * * @param time - max polling interval. */ public RetryableWrapper<T> maxWaitTimeInMilliSec(final long time) { this.maxWaitTimeInSec = time; return this; } /** * ExponentialBackoff coefficient. */ public RetryableWrapper<T> withExponentialBackoff(final boolean expo) { this.exponentialBackoff = expo; return this; } /** * Call client method which is wrapped in submit method. */ public T call() throws Exception { int count = 0; Exception exceptionForMitigationPurpose = null; do { final long waitTime = exponentialBackoff ? Math.min(getWaitTimeExp(retries), maxWaitTimeInSec) : 0; try { return submit(); } catch (Exception e) { exceptionForMitigationPurpose = e; if (isRetryable(e) && count < retries) { Thread.sleep(waitTime); log.debug("Retrying on exception attempt - {} on exception cause - {}", count, e.getMessage()); } else if (!isRetryable(e)) { log.error(e.getMessage()); throw new RuntimeException(e); } } } while (++count < retries); throw new IOException(String.format( "Retry was unsuccessful.... attempts %d. Hence throwing exception " + "back to the caller...", count), exceptionForMitigationPurpose); } /* * Returns the next wait interval, in milliseconds, using an exponential backoff * algorithm. */ private long getWaitTimeExp(final long retryCount) { if (0 == retryCount) { return 0; } return ((long) Math.pow(2, retryCount) * 100L); } }