使用 Python 进行IAM身份验证连接到亚马逊 Neptune 数据库 - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Python 进行IAM身份验证连接到亚马逊 Neptune 数据库

本部分演示使用 Python 编写的程序示例,此示例阐释如何对 Amazon Neptune 使用签名版本 4。此示例基于 https://docs.amazonaws.cn/general/latest/gr/sigv4-signed-request-examples.html 中Amazon Web Services 一般参考签名版本 4 签名过程部分中的示例。

要使用此示例程序,您需要:

  • 计算机上安装有 Python 3.x,您可以从 Python 站点获取。这些程序已使用 Python 3.6 测试过。

  • Python 请求库,示例脚本使用此库发出 Web 请求。一种方便的 Python 程序包安装方法是使用 pip,它可从 Python 程序包索引站点获取程序包。然后,您可以在命令行上运行 requests 来安装 pip install requests

  • 环境变量中名为 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 的访问密钥(访问密钥 ID 和私有访问密钥)。作为最佳实践,我们建议您不要在代码中嵌入凭证。有关更多信息,请参阅《Amazon Account Management 参考指南》中的 Amazon 账户最佳实践

    名为 SERVICE_REGION 的环境变量中的 Neptune 数据库集群的区域。

    如果您使用的是临时凭证,则除了 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYSERVICE_REGION 之外,您还必须指定 AWS_SESSION_TOKEN

    注意

    如果您使用的是临时凭证,这些凭证(包括会话令牌)将在指定时间间隔后到期。

    在请求新凭证时,您必须更新您的会话令牌。有关更多信息,请参阅使用临时安全凭证以请求对 Amazon 资源的访问权限

以下示例演示如何使用 Python 对 Neptune 发出签名请求。该请求发出 “GET或” POST 请求。身份验证信息是通过 Authorization 请求标头传递的。

这个例子也可以作为一个 Amazon Lambda 函数使用。有关更多信息,请参阅 为 Neptune 身份验证设置 Amazon Lambda IAM

向 Gremlin 和 Neptun SPARQL e 端点发出签名请求
  1. 创建一个名为 neptunesigv4.py 的新文件并在文本编辑器中打开它。

  2. 复制以下代码并将其粘贴到 neptunesigv4.py 文件中:

    # Amazon Neptune version 4 signing example (version v3) # The following script requires python 3.6+ # (sudo yum install python36 python36-virtualenv python36-pip) # => the reason is that we're using urllib.parse() to manually encode URL # parameters: the problem here is that SIGV4 encoding requires whitespaces # to be encoded as %20 rather than not or using '+', as done by previous/ # default versions of the library. # See: https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html import sys, datetime, hashlib, hmac import requests # pip3 install requests import urllib import os import json from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace from argparse import RawTextHelpFormatter from argparse import ArgumentParser # Configuration. https is required. protocol = 'https' # The following lines enable debugging at httplib level (requests->urllib3->http.client) # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA. # # The only thing missing will be the response.body which is not logged. # # import logging # from http.client import HTTPConnection # HTTPConnection.debuglevel = 1 # logging.basicConfig() # logging.getLogger().setLevel(logging.DEBUG) # requests_log = logging.getLogger("requests.packages.urllib3") # requests_log.setLevel(logging.DEBUG) # requests_log.propagate = True # Read AWS access key from env. variables. Best practice is NOT # to embed credentials in code. access_key = os.getenv('AWS_ACCESS_KEY_ID', '') secret_key = os.getenv('AWS_SECRET_ACCESS_KEY', '') region = os.getenv('SERVICE_REGION', '') # AWS_SESSION_TOKEN is optional environment variable. Specify a session token only if you are using temporary # security credentials. session_token = os.getenv('AWS_SESSION_TOKEN', '') ### Note same script can be used for AWS Lambda (runtime = python3.6). ## Steps to use this python script for AWS Lambda # 1. AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN and AWS_REGION variables are already part of Lambda's Execution environment # No need to set them up explicitly. # 3. Create Lambda deployment package https://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html # 4. Create a Lambda function in the same VPC and assign an IAM role with neptune access def lambda_handler(event, context): # sample_test_input = { # "host": "END_POINT:8182", # "method": "GET", # "query_type": "gremlin", # "query": "g.V().count()" # } # Lambda uses AWS_REGION instead of SERVICE_REGION global region region = os.getenv('AWS_REGION', '') host = event['host'] method = event['method'] query_type = event['query_type'] query = event['query'] return make_signed_request(host, method, query_type, query) def validate_input(method, query_type): # Supporting GET and POST for now: if (method != 'GET' and method != 'POST'): print('First parameter must be "GET" or "POST", but is "' + method + '".') sys.exit() # SPARQL UPDATE requires POST if (method == 'GET' and query_type == 'sparqlupdate'): print('SPARQL UPDATE is not supported in GET mode. Please choose POST.') sys.exit() def get_canonical_uri_and_payload(query_type, query, method): # Set the stack and payload depending on query_type. if (query_type == 'sparql'): canonical_uri = '/sparql/' payload = {'query': query} elif (query_type == 'sparqlupdate'): canonical_uri = '/sparql/' payload = {'update': query} elif (query_type == 'gremlin'): canonical_uri = '/gremlin/' payload = {'gremlin': query} if (method == 'POST'): payload = json.dumps(payload) elif (query_type == 'openCypher'): canonical_uri = '/openCypher/' payload = {'query': query} elif (query_type == "loader"): canonical_uri = "/loader/" payload = query elif (query_type == "status"): canonical_uri = "/status/" payload = {} elif (query_type == "gremlin/status"): canonical_uri = "/gremlin/status/" payload = {} elif (query_type == "openCypher/status"): canonical_uri = "/openCypher/status/" payload = {} elif (query_type == "sparql/status"): canonical_uri = "/sparql/status/" payload = {} else: print( 'Third parameter should be from ["gremlin", "sparql", "sparqlupdate", "loader", "status] but is "' + query_type + '".') sys.exit() ## return output as tuple return canonical_uri, payload def make_signed_request(host, method, query_type, query): service = 'neptune-db' endpoint = protocol + '://' + host print() print('+++++ USER INPUT +++++') print('host = ' + host) print('method = ' + method) print('query_type = ' + query_type) print('query = ' + query) # validate input validate_input(method, query_type) # get canonical_uri and payload canonical_uri, payload = get_canonical_uri_and_payload(query_type, query, method) # assign payload to data or params data = payload if method == 'POST' else None params = payload if method == 'GET' else None # create request URL request_url = endpoint + canonical_uri # create and sign request creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=request_url, data=data, params=params) SigV4Auth(creds, service, region).add_auth(request) r = None # ************* SEND THE REQUEST ************* if (method == 'GET'): print('++++ BEGIN GET REQUEST +++++') print('Request URL = ' + request_url) r = requests.get(request_url, headers=request.headers, verify=False, params=params) elif (method == 'POST'): print('\n+++++ BEGIN POST REQUEST +++++') print('Request URL = ' + request_url) if (query_type == "loader"): request.headers['Content-type'] = 'application/json' r = requests.post(request_url, headers=request.headers, verify=False, data=data) else: print('Request method is neither "GET" nor "POST", something is wrong here.') if r is not None: print() print('+++++ RESPONSE +++++') print('Response code: %d\n' % r.status_code) response = r.text r.close() print(response) return response help_msg = ''' export AWS_ACCESS_KEY_ID=[MY_ACCESS_KEY_ID] export AWS_SECRET_ACCESS_KEY=[MY_SECRET_ACCESS_KEY] export AWS_SESSION_TOKEN=[MY_AWS_SESSION_TOKEN] export SERVICE_REGION=[us-east-1|us-east-2|us-west-2|eu-west-1] python version >=3.6 is required. Examples: For help python3 program_name.py -h Examples: Queries python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q status python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q sparql/status python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q sparql -d "SELECT ?s WHERE { ?s ?p ?o }" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a POST -q sparql -d "SELECT ?s WHERE { ?s ?p ?o }" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a POST -q sparqlupdate -d "INSERT DATA { <https://s> <https://p> <https://o> }" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q gremlin/status python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q gremlin -d "g.V().count()" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a POST -q gremlin -d "g.V().count()" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q openCypher/status python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q openCypher -d "MATCH (n1) RETURN n1 LIMIT 1;" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a POST -q openCypher -d "MATCH (n1) RETURN n1 LIMIT 1;" python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q loader -d '{"loadId": "68b28dcc-8e15-02b1-133d-9bd0557607e6"}' python3 program_name.py -ho your-neptune-endpoint -p 8182 -a GET -q loader -d '{}' python3 program_name.py -ho your-neptune-endpoint -p 8182 -a POST -q loader -d '{"source": "source", "format" : "csv", "failOnError": "fail_on_error", "iamRoleArn": "iam_role_arn", "region": "region"}' Environment variables must be defined as AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and SERVICE_REGION. You should also set AWS_SESSION_TOKEN environment variable if you are using temporary credentials (ex. IAM Role or EC2 Instance profile). Current Limitations: - Query mode "sparqlupdate" requires POST (as per the SPARQL 1.1 protocol) ''' def exit_and_print_help(): print(help_msg) exit() def parse_input_and_query_neptune(): parser = ArgumentParser(description=help_msg, formatter_class=RawTextHelpFormatter) group_host = parser.add_mutually_exclusive_group() group_host.add_argument("-ho", "--host", type=str) group_port = parser.add_mutually_exclusive_group() group_port.add_argument("-p", "--port", type=int, help="port ex. 8182, default=8182", default=8182) group_action = parser.add_mutually_exclusive_group() group_action.add_argument("-a", "--action", type=str, help="http action, default = GET", default="GET") group_endpoint = parser.add_mutually_exclusive_group() group_endpoint.add_argument("-q", "--query_type", type=str, help="query_type, default = status ", default="status") group_data = parser.add_mutually_exclusive_group() group_data.add_argument("-d", "--data", type=str, help="data required for the http action", default="") args = parser.parse_args() print(args) # Read command line parameters host = args.host port = args.port method = args.action query_type = args.query_type query = args.data if (access_key == ''): print('!!! ERROR: Your AWS_ACCESS_KEY_ID environment variable is undefined.') exit_and_print_help() if (secret_key == ''): print('!!! ERROR: Your AWS_SECRET_ACCESS_KEY environment variable is undefined.') exit_and_print_help() if (region == ''): print('!!! ERROR: Your SERVICE_REGION environment variable is undefined.') exit_and_print_help() if host is None: print('!!! ERROR: Neptune DNS is missing') exit_and_print_help() host = host + ":" + str(port) make_signed_request(host, method, query_type, query) if __name__ == "__main__": parse_input_and_query_neptune()
  3. 在终端中,导航到 neptunesigv4.py 文件的位置。

  4. 输入以下命令,使用正确的值替换访问密钥、私有密钥和区域。

    export AWS_ACCESS_KEY_ID=MY_ACCESS_KEY_ID export AWS_SECRET_ACCESS_KEY=MY_SECRET_ACCESS_KEY export SERVICE_REGION=us-east-1 or us-east-2 or us-west-1 or us-west-2 or ca-central-1 or sa-east-1 or eu-north-1 or eu-west-1 or eu-west-2 or eu-west-3 or eu-central-1 or me-south-1 or me-central-1 or il-central-1 or af-south-1 or ap-east-1 or ap-northeast-1 or ap-northeast-2 or ap-southeast-1 or ap-southeast-2 or ap-south-1 or cn-north-1 or cn-northwest-1 or us-gov-east-1 or us-gov-west-1

    如果您使用的是临时凭证,则除了 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYSERVICE_REGION 之外,您还必须指定 AWS_SESSION_TOKEN

    export AWS_SESSION_TOKEN=MY_AWS_SESSION_TOKEN
    注意

    如果您使用的是临时凭证,这些凭证(包括会话令牌)将在指定时间间隔后到期。

    在请求新凭证时,您必须更新您的会话令牌。有关更多信息,请参阅使用临时安全凭证以请求对 Amazon 资源的访问权限

  5. 输入以下命令之一将签名请求发送给 Neptune 数据库实例。这些示例使用 Python 版本 3.6。

    终端节点状态

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q status

    Gremlin

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q gremlin -d "g.V().count()" python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a POST -q gremlin -d "g.V().count()"

    Gremlin 状态

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q gremlin/status

    SPARQL

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q sparql -d "SELECT ?s WHERE { ?s ?p ?o }"

    SPARQL UPDATE

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a POST -q sparqlupdate -d "INSERT DATA { <https://s> <https://p> <https://o> }"

    SPARQL状态

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q sparql/status

    openCypher

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q openCypher -d "MATCH (n1) RETURN n1 LIMIT 1;" python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a POST -q openCypher -d "MATCH (n1) RETURN n1 LIMIT 1;"

    openCypher 状态

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q openCypher/status

    加载程序

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q loader -d '{"loadId": "68b28dcc-8e15-02b1-133d-9bd0557607e6"}' python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a GET -q loader -d '{}' python3.6 neptunesigv4.py -ho your-neptune-endpoint -p 8182 -a POST -q loader -d '{"source": "source", "format" : "csv", "failOnError": "fail_on_error", "iamRoleArn": "iam_role_arn", "region": "region"}'
  6. 运行 Python 脚本的语法如下所示:

    python3.6 neptunesigv4.py -ho your-neptune-endpoint -p port -a GET|POST -q gremlin|sparql|sparqlupdate|loader|status -d "string0data"

    SPARQLUPDATE要求POST