使用 Bolt 协议向 Neptune 进行 OpenPher 查询 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Bolt 协议向 Neptune 进行 OpenPher 查询

螺栓是一种面向语句的客户端/服务器协议,最初由 Neo4J 开发,并根据知识共享 3.0 进行许可署名-相同方式共享license. 它是客户驱动的,这意味着客户始终启动消息交换。

要使用 Neo4J 的 Bolt 驱动程序连接到 Neptune,只需使用boltURI 方案。如果您有单个 Neptune 实例正在运行,请使用 read_write 终端节点。如果正在运行多个实例,则建议使用两个驱动程序,一个用于写入器,另一个用于所有只读副本。如果您只有默认的两个终端节点,一个 read_write 和 read_only 驱动程序就足够了,但是如果您还有自定义终端节点,请考虑为每个端点创建一个驱动程序实例。

Neptune 允许多达 1000 个并发 Bolt 连接。

Neptune 支持以下 Neo4J Bolt 消息规范版本:4.0.0.

有关使用 Bolt 驱动程序的各种语言中 OpenPher 查询的示例,请参阅 NEO4J司机和语言指南文档中)。

使用 Bolt 与 Java 连接到 Neptune

你可以从 Maven 下载你想使用的任何版本的驱动程序MVN 存储库,或者可以将这个依赖项添加到你的项目中:

<dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>4.3.3</version> </dependency>

然后,要使用这些 Bolt 驱动程序之一在 Java 中连接到 Neptune,请使用如下代码为集群中的主/写入器实例创建驱动程序实例:

import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; final Driver driver = GraphDatabase.driver("bolt://(your cluster endpoint URL):8182", AuthTokens.none(), Config.builder().withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

如果你有一个或多个读取器副本,你也可以使用这样的代码为它们创建驱动程序实例:

final Driver read_only_driver = // (without connection timeout) GraphDatabase.driver("bolt://(your cluster endpoint URL):8182", Config.builder().withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

或者,有超时时间:

final Driver read_only_timeout_driver = // (with connection timeout) GraphDatabase.driver("bolt://(your cluster endpoint URL):8182", Config.builder().withConnectionTimeout(30, TimeUnit.SECONDS) .withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

如果您有自定义终端节点,那么为每个端点创建一个驱动程序实例也可能是值得的。

使用 Bolt 的 Python OpenPher 查询示例

以下是如何使用 Bolt 在 Python 中进行 OpenPher 查询:

python -m pip install neo4j
from neo4j import GraphDatabase uri = "bolt://(your cluster endpoint URL):8182" driver = GraphDatabase.driver(uri, auth=("username", "password"), encrypted=True)

请注意,auth参数将被忽略。

使用 Bolt 的 .NET OpenPher 查询示例

以下是如何使用 Bolt 在 .NET 中进行 OpenPher 查询:

Install-Package Neo4j.Driver-4.3.0
using Neo4j.Driver; namespace hello { // This example creates a node and reads a node in a Neptune // Cluster where IAM Authentication is not enabled. public class HelloWorldExample : IDisposable { private bool _disposed = false; private readonly IDriver _driver; private static string url = "bolt://(your cluster endpoint URL):8182"; private static string createNodeQuery = "CREATE (a:Greeting) SET a.message = 'HelloWorldExample'"; private static string readNodeQuery = "MATCH(n:Greeting) RETURN n.message"; ~HelloWorldExample() => Dispose(false); public HelloWorldExample(string uri) { _driver = GraphDatabase.Driver(uri, AuthTokens.None, o => o.WithEncryptionLevel(EncryptionLevel.Encrypted)); } public void createNode() { // Open a session using (var session = _driver.Session()) { // Run the query in a write transaction var greeting = session.WriteTransaction(tx => { var result = tx.Run(createNodeQuery); // Consume the result return result.Consume(); }); // The output will look like this: // ResultSummary{Query=`CREATE (a:Greeting) SET a.message = 'HelloWorldExample"..... Console.WriteLine(greeting); } } public void retrieveNode() { // Open a session using (var session = _driver.Session()) { // Run the query in a read transaction var greeting = session.ReadTransaction(tx => { var result = tx.Run(readNodeQuery); // Consume the result. Read the single node // created in a previous step. return result.Single()[0].As<string>();; }); // The output will look like this: // HelloWorldExample Console.WriteLine(greeting); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (_disposed) return; if (disposing) { _driver?.Dispose(); } _disposed = true; } public static void Main() { using (var apiCaller = new HelloWorldExample(url)) { apiCaller.createNode(); apiCaller.retrieveNode(); } } } }

使用 Bolt 和 IAM 身份验证的 Java OpenPher 查询示例

以下是如何使用 Bolt 与 IAM 身份验证一起在 Java 中进行 OpenPher 查询:

public class Example { /** * Neptune regions: * us-east-1, us-east-2, us-west-1, us-west-2, ca-central-1, * sa-east-1, eu-north-1, eu-west-1, eu-west-2, eu-west-3, * eu-central-1, me-south-1, af-south-1, ap-east-1, ap-northeast-1, * ap-northeast-2, ap-southeast-1, ap-southeast-2, ap-south-1, cn-north-1, * cn-northwest-1, us-gov-east-1, us-gov-west-1 */ private static final String SERVICE_REGION = "us-east-1"; // Access key private static final String ACCESS_KEY = "(access key id)"; // Secret key private static final String SECRET_KEY = "(secret access key)"; // AWS_SESSION_TOKEN is an optional environment variable. // Specify a session token only if you are using temporary security credentials private static final String AWS_SESSION_TOKEN = "(session token)"; private static final Gson GSON = new Gson(); // Query to create the node private static final String CREATE_NODE_QUERY = "CREATE (n:Node {message : 'HelloWorld'})"; // Query to read the node private static final String READ_NODE_QUERY = "MATCH (n:Node) RETURN n"; // URL of the Neptune cluster private static final String URL = "bolt://(your cluster endpoint URL):8182"; public static void main(String[] args) { // Create the driver final Driver driver = GraphDatabase.driver(URL, AuthTokens.basic("username", getSignedHeader()), getDefaultConfig()); // Consume and print the output of the created node System.out.println(driver.session().run(CREATE_NODE_QUERY).consume()); // Consume and print the output after reading the node created in the previous step. final Record rec = driver.session().run(READ_NODE_QUERY).list().get(0); // If your node name changes in the return statement, change the variable accordingly. System.out.println(rec.get("n").asNode().toString()); // Close the driver and it will close all the necessary resources driver.close(); } private static Config getDefaultConfig() { return Config.builder() .withConnectionTimeout(30, TimeUnit.SECONDS) .withMaxConnectionPoolSize(1000) .withDriverMetrics() .withLeakedSessionsLogging() .withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build(); } private static String getSignedHeader() { // If you are using permanent credentials, use the BasicAWSCredentials access key and secret key final BasicAWSCredentials permanentCreds = new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY); final AWSCredentialsProvider creds = new AWSStaticCredentialsProvider(permanentCreds); // Or, if you are using temporary credentials, use the BasicSessionCredentials to // pass the access key, secret key, and session token, like this: // final BasicSessionCredentials temporaryCredentials = new BasicSessionCredentials(ACCESS_KEY, SECRET_KEY, AWS_SESSION_TOKEN); // final AWSCredentialsProvider tempCreds = new AWSStaticCredentialsProvider(temporaryCredentials); String signedHeader = ""; final Request<Void> request = new DefaultRequest<Void>("neptune-db"); // Request to neptune request.setHttpMethod(HttpMethodName.GET); request.setEndpoint(URI.create("https://(your cluster endpoint URL)")); final AWS4Signer signer = new AWS4Signer(); signer.setRegionName(SERVICE_REGION); signer.setServiceName(request.getServiceName()); signer.sign(request, creds.getCredentials()); signedHeader = getAuthInfoJson(request); return signedHeader; } private static String getAuthInfoJson(final Request<Void> request) { final Map<String, Object> obj = new HashMap<>(); obj.put("Authorization", request.getHeaders().get("Authorization")); obj.put("HttpMethod", request.getHttpMethod()); obj.put("X-Amz-Date", request.getHeaders().get("X-Amz-Date")); obj.put("Host", request.getEndpoint().getHost()); // If temporary credentials are used, include the security token in // the request, like this: // obj.put("X-Amz-Security-Token", request.getHeaders().get("X-Amz-Security-Token")); final String json = GSON.toJson(obj); return json; } }

使用 Bolt 和 IAM 身份验证的 Python OpenPher 查询示例

以下是如何使用 Bolt 与 IAM 身份验证一起在 Python 中进行 OpenPher 查询:

import json from neo4j import GraphDatabase from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import Credentials # Access key ACCESS_KEY = "(your access key)" # Secret key SECRET_KEY = "(your secret key)" # AWS_SESSION_TOKEN is an optional environment variable. # Specify a session token only if you are using temporary security credentials. AWS_SESSION_TOKEN = "(session token)" # Neptune regions: # us-east-1, us-east-2, us-west-1, us-west-2, ca-central-1, # sa-east-1, eu-north-1, eu-west-1, eu-west-2, eu-west-3, # eu-central-1, me-south-1, af-south-1, ap-east-1, ap-northeast-1, # ap-northeast-2, ap-southeast-1, ap-southeast-2, ap-south-1, cn-north-1, # cn-northwest-1, us-gov-east-1, us-gov-west-1 # For example: SERVICE_REGION = "us-east-1" # Query to create the node CREATE_NODE_QUERY = "CREATE (n:Node {message : 'HelloWorld'})" # Query to read the node READ_NODE_QUERY = "MATCH (n:Node) RETURN n" # URI for the cluster uri = "bolt://(your cluster endpoint URL):8182" # Create the credentials credentials = Credentials(ACCESS_KEY, SECRET_KEY) # Or, if you are using temporary credentials, pass the session token as follows: # credentials = Credentials(ACCESS_KEY, SECRET_KEY, token=AWS_SESSION_TOKEN) sigv4 = SigV4Auth(credentials, 'neptune-db', SERVICE_REGION) request = AWSRequest(method='GET', url="http(s)://(your cluster endpoint URL)/opencypher") request.headers.add_header('host', '(your cluster endpoint URL)') # Sign the request sigv4.add_auth(request) auth_obj = { "Authorization" : request.headers['Authorization'], "HttpMethod" : "GET", "X-Amz-Date" : request.headers['X-Amz-Date'], # include X-Amz-Security-Token if you are using temporary credentials, like this: #"X-Amz-Security-Token": request.headers['X-Amz-Security-Token'], "Host" : request.headers['host'] } driver = GraphDatabase.driver(uri, auth=("USERNAME", json.dumps(auth_obj)), encrypted=True) # Create the node result = driver.session().run(CREATE_NODE_QUERY) # Read the node created in the previous object result = driver.session().run(READ_NODE_QUERY) # The output will look like this: <Record n=<Node properties={'message': 'HelloWorld'}>> print(result.single()) # Close the driver driver.close()

使用 IAM 身份验证和 Bolt 的 Node.js 示例

var AWS = require('aws-sdk'); const neo4j = require('neo4j-driver') const aws = require('aws-sdk') const region = 'us-east-1'; const neptune_endpoint = "(your cluster endpoint URL)"; const neptune_port = 8182; async function init() { const endpoint = new aws.Endpoint(neptune_endpoint); endpoint.port = neptune_port; var request = new aws.HttpRequest(endpoint, region); request.method = 'GET' request.headers['host'] = neptune_endpoint; // MUST BE LOWERCASE request.path = "/opencypher" // OPTION: Use CredentialProviderChain instead of EnvironmentCredentials // var provider = new aws.CredentialProviderChain(); // const creds = await provider.resolvePromise(); var creds = new AWS.EnvironmentCredentials('AWS'); var signer = new aws.Signers.V4(request, 'neptune-db'); signer.addAuthorization(creds, new Date()); var auth_info = { "Authorization": request.headers['Authorization'], "HttpMethod": request.method, "X-Amz-Date": request.headers['X-Amz-Date'], "Host": request.headers['host'] "X-Amz-Security-Token": request.headers['x-amz-security-token'] } const auth_string = JSON.stringify(auth_info); return neo4j.driver("bolt://" + neptune_endpoint + ":" + neptune_port, neo4j.auth.basic('USERNAME', auth_string), { encrypted: 'ENCRYPTION_ON', trust: 'TRUST_SYSTEM_CA_SIGNED_CERTIFICATES' } ) } function request(driver) { const session = driver.session() session.run('MATCH (n) RETURN count(*) AS count') .then(result => { console.log(result) result.records.forEach(record => { console.log(record.get('count')) }) }) .catch(error => { console.log(error) }) .then(() => session.close()) } async function shutdown(driver) { // on application exit: await driver.close() } init().then(driver => { request(driver); return driver; }).then(driver => { shutdown(driver); }).catch (err => { console.log(err); });

Neptune 中的螺栓连接行为

以下是有关 Neptune Bolt 连接的一些事项:

  • 由于 Bolt 连接是在 TCP 层创建的,因此不能使用应用程序负载均衡器在他们面前,就像使用 HTTP 端点一样。

  • Neptune 用于 Bolt 连接的端口是8182.

  • 根据传递给它的 Bolt 序言,Neptune 服务器选择合适的最高版本(1、2、3 或 4.0)。

  • 客户端可以在任何时间点打开的与 Neptune 服务器的最大连接数为 1,000 个。

  • 如果客户端未在查询后关闭连接,则可用于执行下一查询。

  • 但是,如果连接空闲 20 分钟,服务器将自动关闭连接。

  • 如果未启用 IAM 身份验证,则可以使用AuthTokens.none()而不是提供虚拟用户名和密码。例如,在 Java 中:

    GraphDatabase.driver("bolt://(your cluster endpoint URL):8182", AuthTokens.none(), Config.builder().withEncryption().withTrustStrategy(TrustStrategy.trustSystemCertificates()).build());
  • 启用 IAM 身份验证后,如果 Bolt 连接由于其他原因尚未关闭,则 Bolt 连接将在建立超过 10 天后的几分钟内断开连接。

  • 如果客户端发送查询以通过连接执行而没有消耗先前查询的结果,则会丢弃新查询。要放弃之前的结果,客户端必须通过连接发送重置消息。

  • 一次只能在给定连接上创建一个交易。

  • 如果在事务过程中发生异常,Neptune 服务器将回退该事务并关闭连接。在这种情况下,驱动程序为下一个查询创建一个新连接。

  • 请注意,会话不是线程安全的。多个 parallel 操作必须使用多个单独的会话。