本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Neptune Streams
借助 Neptune Streams 特征,您可以生成完整的更改日志条目序列,以即时记录对图形数据所做的每一个更改。有关该功能的概述,请参阅 使用 Neptune Streams 实时捕获图形更改。
主题
启用 Neptune Streams
您可以通过设置 neptune_streams 数据库集群参数随时启用或禁用 Neptune Streams。将参数设置为 1
会启用流,将其设置为 0
会禁用流。
注意
更改 neptune_streams
数据库集群参数后,您必须重启集群中的所有数据库实例才能使更改生效。
您可以设置 neptune_streams_expiry_days 数据库集群参数,以控制流记录在删除之前保留在服务器上的天数(从 1 到 90)。默认值为 7。
Neptune Streams 最初是作为一项实验性特征引入的,您可以在实验室模式下使用数据库集群 neptune_lab_mode
参数启用或禁用该特征(请参阅Neptune 实验室模式)。使用实验室模式启用流的功能现在已弃用,未来会禁用。
禁用 Neptune Streams
您随时可以关闭正在运行的 Neptune Streams。
要关闭流,请更新数据库集群参数组,以便将 neptune_streams
参数的值设置为 0。
重要
关闭 Streams 后,您无法再访问更改日志数据。请确保在关闭 Streams 前读取您感兴趣的内容。
调用 Neptune Streams REST API
您可以使用 REST API 访问 Neptune Streams,该 API 将向以下其中一个本地端点发送 HTTP GET 请求:
对于 SPARQL 图形数据库:
https://
。Neptune-DNS
:8182/sparql/stream对于 Gremlin 或 openCypher 图形数据库:
https://
或Neptune-DNS
:8182/propertygraph/streamhttps://
。Neptune-DNS
:8182/pg/stream
注意
从引擎版本 1.1.0.0 开始,Gremlin 流端点 (https://
) 及其关联的输出格式 (Neptune-DNS
:8182/gremlin/streamGREMLIN_JSON
) 已被弃用。为了向后兼容,它仍然受支持,但可能会在将来版本中移除。
仅允许进行 HTTP GET
操作。
如果 HTTP 请求包含 Accept-Encoding
标头并将 gzip
指定为可接受的压缩格式(即 "Accept-Encoding: gzip"
),则 Neptune 支持对响应进行 gzip
压缩。
参数
-
limit
– long,可选。范围:1–100000。默认值:10。指定要返回的最大记录数。响应还受 10 MB 大小的限制,该大小限制不能修改,并且优先级高于
limit
参数中指定的记录数。如果达到了 10 MB 限制,响应中将包含一条超出阈值记录。 -
iteratorType
– 字符串,可选。该参数可接受以下值:
AT_SEQUENCE_NUMBER
(默认值)– 指示读取应该从由commitNum
和opNum
参数共同指定的事件序列号开始。AFTER_SEQUENCE_NUMBER
– 指示读取应该从紧接在由commitNum
和opNum
参数共同指定的事件序列号之后的事件序列号开始。TRIM_HORIZON
– 指示读取应该从系统中最后一条未剪裁的记录开始,该记录是更改日志流中时间最久的未过期(尚未删除)记录。当您没有特定的开始事件序列号时,该模式在应用程序启动期间非常有用。LATEST
– 指示读取应该从系统中最近的记录开始,该记录是更改日志流中最新的未过期(尚未删除)记录。当需要从当前的流顶部读取记录以免处理较旧的记录时,例如在灾难恢复或零停机时间升级期间,这很有用。请注意,在此模式下,最多只返回一条记录。
-
commitNum
– long,当 iteratorType 为AT_SEQUENCE_NUMBER
或AFTER_SEQUENCE_NUMBER
时是必需的。从更改日志流中读取的起始记录的提交编号。
如果
iteratorType
为TRIM_HORIZON
或LATEST
,则忽略此参数。 -
opNum
– long,可选(默认值为1
)。从更改日志流数据中开始读取的指定提交中的操作序列号。
对于更改 SPARQL 图形数据的操作,通常是每个操作仅生成一条更改记录。但是,更改 Gremlin 图形数据的操作可以是每个操作生成多条更改记录,如以下示例所示:
INSERT
– Gremlin 顶点可以有多个标签,而 Gremlin 元素可以有多个属性。插入元素时,将为每个标签和属性生成单独的更改记录。UPDATE
– 更改 Gremlin 元素属性时,将生成两条更改记录:第一条为删除先前值的记录,第二条为插入新值的记录。-
DELETE
– 为每个删除的元素属性生成一条单独的更改记录。例如,当删除具有属性的 Gremlin 边缘时,将为每个属性生成一条更改记录,然后,生成一条删除边缘标签的更改记录。删除 Gremlin 顶点时,将首先删除所有入边和出边属性,然后删除边缘标签、顶点属性,最后是顶点标签。上述每个删除操作都会生成一条更改记录。
Neptune Streams API 响应格式
Neptune Streams REST API 请求的响应包含以下字段:
-
lastEventId
– 流响应中最后一次更改的序列标识符。事件 ID 由两个字段组成:commitNum
标识更改图形的事务,opNum
标识该事务中的特定操作。如以下示例所示。"eventId": { "commitNum": 12, "opNum": 1 }
lastTrxTimestamp
– 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。format
– 返回的更改记录的序列化格式。对于 Gremlin 或 openCypher 更改记录,可能的值为PG_JSON
;对于 SPARQL 更改记录,可能的值为NQUADS
。-
records
– 包含在响应中的序列化更改日志流记录的数组。records
数组中的每条记录都包含以下字段:commitTimestamp
– 请求提交事务的时间(使用 Unix 纪元时间表示,单位为毫秒)。eventId
– 流更改记录的序列标识符。data
— 序列化的 Gremlin、SPARQL 或变更记录。 OpenCypher 下一部分Neptune Streams 中的序列化格式中更详细地介绍了每个记录的序列化格式。op
– 造成更改的操作。isLastOp
– 仅当此操作是其事务中的最后一个操作时才存在。如果存在,则它设置为true
。有助于确保整个事务都被消耗掉。
totalRecords
– 响应中的记录总数。
例如,对于包含多个操作的事务,以下响应返回 Gremlin 更改数据:
{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }
以下响应返回事务中最后一个操作的 SPARQL 更改数据(该操作在事务编号 97 中由 EventId(97, 1)
标识)。
{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }
Neptune Streams API 异常
下表描述了 Neptune Streams 异常。
错误代码 | HTTP 代码 | 可以重试? | 消息 |
---|---|---|---|
|
400 |
否 |
输入参数中提供了无效的或 out-of-range 值。 |
|
400 |
否 |
所有请求的记录均超出了允许的最长使用期限并且已经过期。 |
|
500 |
是 |
请求速率超出吞吐量上限。 |
|
404 |
否 |
找不到请求的资源。可能指定了错误的流。 |
|
500 |
是 |
由于缺少内存,请求不成功,但可在服务器较不忙时重试。 |