使用 OpenPher 和 Bolt 的 Neptune 最佳实践 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 OpenPher 和 Bolt 的 Neptune 最佳实践

当您将 Openency Pher 查询语言和 Bolt 协议与 Neptune 一起使用时,请遵循以下最佳实践。有关在 Neptune 中使用 opency Pher 的信息,请参阅使用 OpenPher 访问 Neptune 图.

Neptune 不支持在一笔交易中多个并发查询

尽管 Bolt 驱动程序本身允许在事务中进行并发查询,但 Neptune 不支持同时运行的事务中的多个查询。相反,Neptune 要求在一个事务中按顺序运行多个查询,并且在启动下一个查询之前完全使用每个查询的结果。

下面的示例展示了如何使用 Bolt 在事务中按顺序运行多个查询,以便在下一个查询开始之前完全消耗每个查询的结果:

final String query = "MATCH (n) RETURN n"; try (Driver driver = getDriver(HOST_BOLT, getDefaultConfig())) { try (Session session = driver.session(readSessionConfig)) { try (Transaction trx = session.beginTransaction()) { final Result res_1 = trx.run(query); Assert.assertEquals(10000, res_1.list().size()); final Result res_2 = trx.run(query); Assert.assertEquals(10000, res_2.list().size()); } } }

在故障转移后创建新的连接

在故障转移的情况下,Bolt 驱动程序可以继续连接到旧的写入器实例而不是新的活动实例,因为 DNS 名称解析为特定的 IP 地址。

为防止这种情况,请关闭然后重新连接Driver任何故障转移后的对象。

长期应用程序的连接处理

构建长期使用寿命的应用程序时,例如在容器内或 Amazon EC2 实例上运行的应用程序时,请实例化Driver对象一次,然后在应用程序的生命周期内重复使用该对象。Driver 对象是线程安全的,并且将其初始化的开销非常大。

用于的连接处理Amazon Lambda

不建议在内部使用 Bolt 驱动程序Amazon Lambda函数,因为它们的连接开销和管理要求。使用HTTPS 终端节点相反。

完成对驱动程序的对象后,关闭

在完成对客户端的操作后,务必将其关闭,以便服务器关闭 Bolt 连接并释放与连接关联的所有资源。如果您使用driver.close().

如果驱动程序未正确关闭,Neptune 将在 20 分钟后终止所有空闲的 Bolt 连接,如果您使用 IAM 身份验证,则在 10 天后终止。

Neptune 支持不超过 1000 个并发 Bolt 连接。如果您在使用完连接后未明确将其关闭,并且活动连接数达到 1000 的限制,则任何新的连接尝试均将失败。

使用明确的交易模式进行阅读和写作

在 Neptune 和 Bolt 驱动程序中使用事务时,最好将读取和写入事务的访问模式明确设置为正确的设置。

只读事务

对于只读事务,如果在构建会话时没有传递适当的访问模式配置,则使用默认隔离级别,即变异查询隔离。因此,只读事务将访问模式设置为非常重要read显式。

自动提交读取交易示例:

SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.READ) .build(); Session session = driver.session(sessionConfig); try { (Add your application code here) } catch (final Exception e) { throw e; } finally { driver.close() }

阅读交易示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withDefaultAccessMode(AccessMode.READ) .build(); driver.session(sessionConfig).readTransaction( new TransactionWork<List<String>>() { @Override public List<String> execute(org.neo4j.driver.Transaction tx) { (Add your application code here) } } );

在两种情况下,SNAPSHOT隔离是用来实现的Neptune 只读事务语义.

由于只读副本仅接受只读查询,因此提交至只读副本的任何查询均按照SNAPSHOT隔离语义。

只读事务没有脏读或不可重复的读取。

只读事务

对于变异查询,创建写入事务有三种不同的机制,每种机制如下所示:

隐式写交易示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withDefaultAccessMode(AccessMode.WRITE) .build(); driver.session(sessionConfig).writeTransaction( new TransactionWork<List<String>>() { @Override public List<String> execute(org.neo4j.driver.Transaction tx) { (Add your application code here) } } );

自动提交写入事务示例:

SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.Write) .build(); Session session = driver.session(sessionConfig); try { (Add your application code here) } catch (final Exception e) { throw e; } finally { driver.close() }

显式写入事务示例:

Driver driver = GraphDatabase.driver(url, auth, config); SessionConfig sessionConfig = SessionConfig .builder() .withFetchSize(1000) .withDefaultAccessMode(AccessMode.WRITE) .build(); Transaction beginWriteTransaction = driver.session(sessionConfig).beginTransaction(); (Add your application code here) beginWriteTransaction.commit(); driver.close();

写入事务的隔离级别

  • 作为突变查询的一部分所做的读取将在下面运行READ COMMITTED事务隔离。

  • 作为突变查询的一部分所做的读取没有脏读。

  • 在变异查询中读取时,记录和记录范围将被锁定。

  • 当更改事务已读取索引范围时,可以强力保证在该读取结束之前,任何并发事务都不会修改该范围。

突变查询不是线程安全的。

有关冲突,请参阅使用锁定等待超时解决冲突.

在发生故障的情况下,不会自动重试变异查询。

重试例外的逻辑

对于允许重试的所有例外情况,通常最好使用指数退避和重试策略它提供了逐渐延长重试之间的等待时间,以便更好地处理诸如ConcurrentModificationException错误消息。下面显示了指数退避和重试模式的示例:

public static void main() { try (Driver driver = getDriver(HOST_BOLT, getDefaultConfig())) { retriableOperation(driver, "CREATE (n {prop:'1'})") .withRetries(5) .withExponentialBackoff(true) .maxWaitTimeInMilliSec(500) .call(); } } protected RetryableWrapper retriableOperation(final Driver driver, final String query){ return new RetryableWrapper<Void>() { @Override public Void submit() { log.info("Performing graph Operation in a retry manner......"); try (Session session = driver.session(writeSessionConfig)) { try (Transaction trx = session.beginTransaction()) { trx.run(query).consume(); trx.commit(); } } return null; } @Override public boolean isRetryable(Exception e) { if (isCME(e)) { log.debug("Retrying on exception.... {}", e); return true; } return false; } private boolean isCME(Exception ex) { return ex.getMessage().contains("Operation failed due to conflicting concurrent operations"); } }; } /** * Wrapper which can retry on certain condition. Client can retry operation using this class. */ @Log4j2 @Getter public abstract class RetryableWrapper<T> { private long retries = 5; private long maxWaitTimeInSec = 1; private boolean exponentialBackoff = true; /** * Override the method with custom implementation, which will be called in retryable block. */ public abstract T submit() throws Exception; /** * Override with custom logic, on which exception to retry with. */ public abstract boolean isRetryable(final Exception e); /** * Define the number of retries. * * @param retries -no of retries. */ public RetryableWrapper<T> withRetries(final long retries) { this.retries = retries; return this; } /** * Max wait time before making the next call. * * @param time - max polling interval. */ public RetryableWrapper<T> maxWaitTimeInMilliSec(final long time) { this.maxWaitTimeInSec = time; return this; } /** * ExponentialBackoff coefficient. */ public RetryableWrapper<T> withExponentialBackoff(final boolean expo) { this.exponentialBackoff = expo; return this; } /** * Call client method which is wrapped in submit method. */ public T call() throws Exception { int count = 0; Exception exceptionForMitigationPurpose = null; do { final long waitTime = exponentialBackoff ? Math.min(getWaitTimeExp(retries), maxWaitTimeInSec) : 0; try { return submit(); } catch (Exception e) { exceptionForMitigationPurpose = e; if (isRetryable(e) && count < retries) { Thread.sleep(waitTime); log.debug("Retrying on exception attempt - {} on exception cause - {}", count, e.getMessage()); } else if (!isRetryable(e)) { log.error(e.getMessage()); throw new RuntimeException(e); } } } while (++count < retries); throw new IOException(String.format( "Retry was unsuccessful.... attempts %d. Hence throwing exception " + "back to the caller...", count), exceptionForMitigationPurpose); } /* * Returns the next wait interval, in milliseconds, using an exponential backoff * algorithm. */ private long getWaitTimeExp(final long retryCount) { if (0 == retryCount) { return 0; } return ((long) Math.pow(2, retryCount) * 100L); } }