使用 Neptune Streams - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Neptune Streams

借助 Neptune Streams 特征,您可以生成完整的更改日志条目序列,以即时记录对图形数据所做的每一个更改。有关该功能的概述,请参阅 使用 Neptune Streams 实时捕获图形更改

启用 Neptune Streams

您可以通过设置 neptune_streams 数据库集群参数随时启用或禁用 Neptune Streams。将参数设置为 1 会启用流,将其设置为 0 会禁用流。

注意

更改 neptune_streams 数据库集群参数后,您必须重启集群中的所有数据库实例才能使更改生效。

您可以设置 neptune_streams_expiry_days 数据库集群参数,以控制流记录在删除之前保留在服务器上的天数(从 1 到 90)。默认值为 7。

Neptune Streams 最初是作为一项实验性特征引入的,您可以在实验室模式下使用数据库集群 neptune_lab_mode 参数启用或禁用该特征(请参阅Neptune 实验室模式)。使用实验室模式启用流的功能现在已弃用,未来会禁用。

禁用 Neptune Streams

您随时可以关闭正在运行的 Neptune Streams。

要关闭流,请更新数据库集群参数组,以便将 neptune_streams 参数的值设置为 0。

重要

关闭 Streams 后,您无法再访问更改日志数据。请确保在关闭 Streams 读取您感兴趣的内容。

调用 Neptune Streams REST API

您可以使用 REST API 访问 Neptune Streams,该 API 将向以下其中一个本地端点发送 HTTP GET 请求:

  • 对于 SPARQL 图形数据库:https://Neptune-DNS:8182/sparql/stream

  • 对于 Gremlin 或 openCypher 图形数据库:https://Neptune-DNS:8182/propertygraph/streamhttps://Neptune-DNS:8182/pg/stream

注意

引擎版本 1.1.0.0 开始,Gremlin 流端点 (https://Neptune-DNS:8182/gremlin/stream) 及其关联的输出格式 (GREMLIN_JSON) 已被弃用。为了向后兼容,它仍然受支持,但可能会在将来版本中移除。

仅允许进行 HTTP GET 操作。

如果 HTTP 请求包含 Accept-Encoding 标头并将 gzip 指定为可接受的压缩格式(即 "Accept-Encoding: gzip"),则 Neptune 支持对响应进行 gzip 压缩。

参数
  • limit – long,可选。范围:1–100000。默认值:10。

    指定要返回的最大记录数。响应还受 10 MB 大小的限制,该大小限制不能修改,并且优先级高于 limit 参数中指定的记录数。如果达到了 10 MB 限制,响应中将包含一条超出阈值记录。

  • iteratorType – 字符串,可选。

    该参数可接受以下值:

    • AT_SEQUENCE_NUMBER(默认值)– 指示读取应该从由 commitNumopNum 参数共同指定的事件序列号开始。

    • AFTER_SEQUENCE_NUMBER – 指示读取应该从紧接在由 commitNumopNum 参数共同指定的事件序列号之后的事件序列号开始。

    • TRIM_HORIZON – 指示读取应该从系统中最后一条未剪裁的记录开始,该记录是更改日志流中时间最久的未过期(尚未删除)记录。当您没有特定的开始事件序列号时,该模式在应用程序启动期间非常有用。

    • LATEST – 指示读取应该从系统中最近的记录开始,该记录是更改日志流中最新的未过期(尚未删除)记录。当需要从当前的流顶部读取记录以免处理较旧的记录时,例如在灾难恢复或零停机时间升级期间,这很有用。请注意,在此模式下,最多只返回一条记录。

  • commitNum – long,当 iteratorType 为 AT_SEQUENCE_NUMBERAFTER_SEQUENCE_NUMBER 时是必需的。

    从更改日志流中读取的起始记录的提交编号。

    如果 iteratorTypeTRIM_HORIZONLATEST,则忽略此参数。

  • opNum – long,可选(默认值为 1)。

    从更改日志流数据中开始读取的指定提交中的操作序列号。

对于更改 SPARQL 图形数据的操作,通常是每个操作仅生成一条更改记录。但是,更改 Gremlin 图形数据的操作可以是每个操作生成多条更改记录,如以下示例所示:

  • INSERT – Gremlin 顶点可以有多个标签,而 Gremlin 元素可以有多个属性。插入元素时,将为每个标签和属性生成单独的更改记录。

  • UPDATE – 更改 Gremlin 元素属性时,将生成两条更改记录:第一条为删除先前值的记录,第二条为插入新值的记录。

  • DELETE – 为每个删除的元素属性生成一条单独的更改记录。例如,当删除具有属性的 Gremlin 边缘时,将为每个属性生成一条更改记录,然后,生成一条删除边缘标签的更改记录。

    删除 Gremlin 顶点时,将首先删除所有入边和出边属性,然后删除边缘标签、顶点属性,最后是顶点标签。上述每个删除操作都会生成一条更改记录。

Neptune Streams API 响应格式

Neptune Streams REST API 请求的响应包含以下字段:

  • lastEventId – 流响应中最后一次更改的序列标识符。事件 ID 由两个字段组成:commitNum 标识更改图形的事务,opNum 标识该事务中的特定操作。如以下示例所示。

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp – 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。

  • format – 返回的更改记录的序列化格式。对于 Gremlin 或 openCypher 更改记录,可能的值为 PG_JSON;对于 SPARQL 更改记录,可能的值为 NQUADS

  • records – 包含在响应中的序列化更改日志流记录的数组。records 数组中的每条记录都包含以下字段:

    • commitTimestamp – 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。

    • eventId – 流更改记录的序列标识符。

    • data— 序列化的 Gremlin、SPARQL 或变更记录。 OpenCypher 下一部分Neptune Streams 中的序列化格式中更详细地介绍了每个记录的序列化格式。

    • op – 造成更改的操作。

    • isLastOp – 仅当此操作是其事务中的最后一个操作时才存在。如果存在,则它设置为 true。有助于确保整个事务都被消耗掉。

  • totalRecords – 响应中的记录总数。

例如,对于包含多个操作的事务,以下响应返回 Gremlin 更改数据:

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

以下响应返回事务中最后一个操作的 SPARQL 更改数据(该操作在事务编号 97 中由 EventId(97, 1) 标识)。

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Neptune Streams API 异常

下表描述了 Neptune Streams 异常。

错误代码 HTTP 代码 可以重试? 消息

InvalidParameterException

400

输入参数中提供了无效的或 out-of-range 值。

ExpiredStreamException

400

所有请求的记录均超出了允许的最长使用期限并且已经过期。

ThrottlingException

500

请求速率超出吞吐量上限。

StreamRecordsNotFoundException

404

找不到请求的资源。可能指定了错误的流。

MemoryLimitExceededException

500

由于缺少内存,请求不成功,但可在服务器较不忙时重试。