Using Neptune Streams - Amazon Neptune
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using Neptune Streams

With the Neptune Streams feature, you can generate a complete sequence of change-log entries that record every change made to your graph data as it happens. For an overview of this feature, see Capturing graph changes in real time using Neptune streams.

Enabling Neptune Streams

You can enable or disable Neptune Streams at any time by setting the neptune_streams DB cluster parameter. Setting the parameter to 1 enables Streams, and setting it to 0 disables Streams.

Note

After changing the neptune_streams DB cluster parameter, you must reboot all DB instances in the cluster for the change to take effect.

You can set the neptune_streams_expiry_days DB cluster parameter to control how many days, from 1 to 90, that stream records remain on the server before being deleted. The default is 7.

Neptune Streams was initially introduced as an experimental feature that you enabled or disabled in Lab Mode using the DB Cluster neptune_lab_mode parameter (see Neptune Lab Mode). Using Lab Mode to enable Streams is now deprecated and will be disabled in the future.

Disabling Neptune Streams

You can turn Neptune Streams off any time that it is running.

To turn Streams off, update the DB Cluster parameter group so that the value of the neptune_streams parameter is set to 0.

Important

As soon as Streams is turned off, you can't access the change-log data any more. Be sure to read what you are interested in before turning Streams off.

Calling the Neptune Streams REST API

You access Neptune Streams using a REST API that sends an HTTP GET request to one of the following local endpoints:

  • For a SPARQL graph DB:   https://Neptune-DNS:8182/sparql/stream.

  • For a Gremlin or openCypher graph DB:   https://Neptune-DNS:8182/propertygraph/stream or https://Neptune-DNS:8182/pg/stream.

Note

As of engine release 1.1.0.0, the Gremlin stream endpoint (https://Neptune-DNS:8182/gremlin/stream) is being deprecated, along with its associated output format (GREMLIN_JSON). It is still supported for backward compatibility but may be removed in future releases.

Only an HTTP GET operation is allowed.

Neptune supports gzip compression of the response, provided that the HTTP request includes an Accept-Encoding header that specifies gzip as an accepted compression format (that is, "Accept-Encoding: gzip").

Parameters
  • limit   –   long, optional. Range: 1–100,000. Default: 10.

    Specifies the maximum number of records to return. There is also a size limit of 10 MB on the response that can't be modified and that takes precedence over the number of records specified in the limit parameter. The response does include a threshold-breaching record if the 10 MB limit was reached.

  • iteratorType   –   String, optional.

    This parameter can take one of the following values:

    • AT_SEQUENCE_NUMBER(default)   –   Indicates that reading should start from the event sequence number specified jointly by the commitNum and opNum parameters.

    • AFTER_SEQUENCE_NUMBER   –   Indicates that reading should start right after the event sequence number specified jointly by the commitNum and opNum parameters.

    • TRIM_HORIZON   –   Indicates that reading should start at the last untrimmed record in the system, which is the oldest unexpired (not yet deleted) record in the change-log stream. This mode is useful during application startup, when you don't have a specific starting event sequence number.

    • LATEST   –   Indicates that reading should start at the most recent record in the system, which is the latest unexpired (not yet deleted) record in the change-log stream. This is useful when there is a need to read records from current top of the streams so as not to process older records, such as during disaster recovery or a zero-downtime upgrade. Note that in this mode, there is at most only one record returned.

  • commitNum   –   long, required when iteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.

    The commit number of the starting record to read from the change-log stream.

    This parameter is ignored when iteratorType is TRIM_HORIZON or LATEST.

  • opNum   –   long, optional (the default is 1).

    The operation sequence number within the specified commit to start reading from in the change-log stream data.

Operations that change SPARQL graph data generally only generate a single change record per operation. However, operations that change Gremlin graph data can generate multiple change records per operation, as in the following examples:

  • INSERT   –   A Gremlin vertex can have multiple labels, and a Gremlin element can have multiple properties. A separate change record is generated for each label and property when an element is inserted.

  • UPDATE   –   When a Gremlin element property is changed, two change records are generated: the first for removing the previous value, and the second for inserting the new value.

  • DELETE   –   A separate change record is generated for each element property that is deleted. For example, when a Gremlin edge with properties is deleted, one change record is generated for each of the properties, and after that, one is generated for deletion of the edge label.

    When a Gremlin vertex is deleted, all the incoming and outgoing edge properties are deleted first, then the edge labels, then the vertex properties, and finally the vertex labels. Each of these deletions generates a change record.

Neptune Streams API Response Format

A response to a Neptune Streams REST API request has the following fields:

  • lastEventId   –   Sequence identifier of the last change in the stream response. An event ID is composed of two fields: A commitNum identifies a transaction that changed the graph, and an opNum identifies a specific operation within that transaction. This is shown in the following example.

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp   –   The time at which the commit for the transaction was requested, in milliseconds from the Unix epoch.

  • format   –   Serialization format for the change records being returned. The possible values are PG_JSON for Gremlin or openCypher change records, and NQUADS for SPARQL change records.

  • records   –   An array of serialized change-log stream records included in the response. Each record in the records array contains these fields:

    • commitTimestamp   –   The time at which the commit for the transaction was requested, in milliseconds from the Unix epoch.

    • eventId   –   The sequence identifier of the stream change record.

    • data   –   The serialized Gremlin, SPARQL, or OpenCypher change record. The serialization formats of each record are described in more detail in the next section, Serialization Formats in Neptune Streams.

    • op   –   The operation that created the change.

    • isLastOp   –   Only present if this operation is the last one in its transaction. When present, it is set to true. Useful for ensuring that an entire transaction is consumed.

  • totalRecords   –   The total number of records in the response.

For example, the following response returns Gremlin change data, for a transaction that contains more than one operation:

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

The following response returns SPARQL change data for the last operation in a transaction (the operation identified by EventId(97, 1) in transaction number 97).

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Neptune Streams API Exceptions

The following table describes Neptune Streams exceptions.

Error Code HTTP Code OK to Retry? Message

InvalidParameterException

400

No

An invalid or out-of-range value was supplied as an input parameter.

ExpiredStreamException

400

No

All of the requested records exceed the maximum age allowed and have expired.

ThrottlingException

500

Yes

Rate of requests exceeds the maximum throughput.

StreamRecordsNotFoundException

404

No

The requested resource could not be found. The stream may not be specified correctly.

MemoryLimitExceededException

500

Yes

The request processing did not succeed due to lack of memory, but can be retried when the server is less busy.