使用 Neptune Streams - Amazon Neptune
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Neptune Streams

借助 Neptune Streams 功能,您可以生成完整的更改日志条目序列,以即时记录对图形数据所做的每一个更改。有关该功能的概述,请参阅 使用 Neptune 直播实时捕获图表变化

启用 Neptune Streams

您可以随时启用或禁用 Neptune Streams,方法是将neptune_streams数据库集群参数. 将参数设置为 1 会启用流,将其设置为 0 会禁用流。

注意

直至最近,Neptune Streams 一直是一项实验性功能,您可以使用数据库集群以实验室模式启用或禁用该功能neptune_lab_mode参数(请参阅Neptune 实验室模式)。使用实验室模式启用流的功能现在已弃用,未来会禁用。

打开 Streams 后,更改日志流中的更改记录将在创建后保留一周。

禁用 Neptune Streams

您可以随时关闭运行 Neptune Streams。

要关闭流,请更新数据库集群参数组,以便将 neptune_streams 参数的值设置为 0。

重要

关闭 Streams 后,您无法再访问更改日志数据。请确保在关闭 Streams 读取您感兴趣的内容。

调用 Neptune Streams REST API

您可以使用 REST API 访问 Neptune Streams,该 API 将向以下其中一个本地终端节点发送 HTTP GET 请求:

  • 对于 SPARQL 图形数据库:https://Neptune-DNS:8182/sparql/stream

  • 对于 Grimlin 或 OpenYeper 图形数据库:https://Neptune-DNS:8182/propertygraph/stream要么https://Neptune-DNS:8182/pg/stream.

注意

截至引擎版本 1.1.0.0,Gremlin 流终端节点 (https://Neptune-DNS:8182/gremlin/stream) 以及其关联的输出格式(GREMLIN_JSON)。为了向后兼容,它仍受支持,但可能会在以 future 的版本中删除。

仅允许进行 HTTP GET 操作。

Neptune 支持gzip如果 HTTP 请求包含以下内容,则对响应进行压缩:Accept-Encoding指定gzip作为可接受的压缩格式(也就是说,"Accept-Encoding: gzip")。

参数

  • limit— long(可选)。Range (范围):1—100000。默认值:10.

    指定要返回的最大记录数。响应还受 10 MB 大小的限制,该大小限制不能修改,并且优先级高于 limit 参数中指定的记录数。如果达到了 10 MB 限制,响应中将包含一条超出阈值记录。

  • iteratorType— 字符串(可选)。

    该参数可接受以下值:

    • AT_SEQUENCE_NUMBER(默认值)— 指示读取应该从由commitNumopNum参数。

    • AFTER_SEQUENCE_NUMBER— 指示读取应该从紧接在由由commitNumopNum参数。

    • TRIM_HORIZON— 指示读取应该从系统中最后一条未剪裁的记录开始,该记录是更改日志流中时间最久的未过期(尚未删除)记录。当您没有特定的开始事件序列号时,该模式在应用程序启动期间非常有用。

    • LATEST— 指示读取应该从系统中最近的记录开始,该记录是更改日志流中时间最新的未过期(尚未删除)记录。当需要从当前流顶部读取记录以免处理较旧的记录时,例如在灾难恢复或零停机升级期间,这非常有用。请注意,在此模式下,最多只返回一条记录。

  • commitNum— long(当 iteratorType 为时是必需的)AT_SEQUENCE_NUMBER要么AFTER_SEQUENCE_NUMBER.

    从更改日志流中读取的起始记录的提交编号。

    如果您将忽略此参数iteratorTypeTRIM_HORIZON要么LATEST.

  • opNum— long(可选)(默认值为)1)。

    从更改日志流数据中开始读取的指定提交中的操作序列号。

对于更改 SPARQL 图形数据的操作,通常是每个操作仅生成一条更改记录。但是,更改 Gremlin 图形数据的操作可以是每个操作生成多条更改记录,如以下示例所示:

  • INSERT— Grimlin 顶点可以有多个标签,而 Grimlin 元素可以有多个属性。插入元素时,将为每个标签和属性生成单独的更改记录。

  • UPDATE— 更改 Grimlin 元素属性时,将生成两条更改记录:第一条为删除先前值的记录,第二条为插入新值的记录。

  • DELETE— 为每个删除的元素属性生成一条单独的更改记录。例如,当删除具有属性的 Gremlin 边缘时,将为每个属性生成一条更改记录,然后,生成一条删除边缘标签的更改记录。

    删除 Gremlin 顶点时,将首先删除所有入边和出边属性,然后删除边缘标签、顶点属性,最后是顶点标签。上述每个删除操作都会生成一条更改记录。

Neptune Streams API 响应格式

对 Neptune Streams REST API 请求的响应包含以下字段:

  • lastEventId— 流响应中最后一次更改的序列标识符。事件 ID 由两个字段组成:一个commitNum标识更改图表的交易,以及opNum标识该事务中的特定操作。如下例所示。

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp— 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。

  • format— 返回的更改记录的序列化格式。可能的值包括:GREMLIN_JSON对于 Gremlin 更改记录,以及NQUADS对于 SPARQL 更改记录。

  • records— 包含在响应中的序列化更改日志流记录的数组。中的每条记录records数组包含以下字段:

    • commitTimestamp— 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。

    • eventId— 流更改记录的序列标识符。

    • data— 序列化的 Gremlin、SPARQL 或 OpenCypher 更改记录。下一节更详细地介绍了每个记录的序列化格式,Neptune Streams 中的序列化格式.

    • op— 创建更改的操作。

    • isLastOp— 仅当此操作是交易中的最后一项操作时才出现。当存在时,它被设置为true. 对于确保整个交易被消耗很有用。

  • totalRecords— 响应中的记录总数。

例如,对于包含多个操作的事务,以下响应返回 Grimlin 更改数据:

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "GREMLIN_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

以下响应返回事务中最后一个操作的 SPARQL 更改数据(EventId(97, 1)在交易号码 97 中)。

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp", true } ], "totalRecords": 1 }

Neptune Streams API 异常

下表介绍了 Neptune Streams 异常。

错误代码 HTTP 代码 可以重试? 消息

InvalidParameterException

400

无效或 out-of-range 提供的作为输入参数的值。

ExpiredStreamException

400

所有请求的记录均超出了允许的最长使用期限并且已经过期。

ThrottlingException

500

请求速率超出吞吐量上限。

StreamRecordsNotFoundException

404

找不到请求的资源。可能指定了错误的流。

MemoryLimitExceededException

500

由于缺少内存,请求不成功,但可在服务器较不忙时重试。