Amazon DynamoDB
开发人员指南 (API Version 2012-08-10)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

演练:DynamoDB 流 低级 API

本节是一个展示操作中的 DynamoDB 流 的 Java 程序演练。有关源代码,请参阅完成程序:低级别 DynamoDB 流 API

此程序执行以下操作:

  1. 创建一个启用了流的 DynamoDB 表。

  2. 描述此表的流设置。

  3. 修改表中的数据。

  4. 描述流中的分区。

  5. 从分区中读取流记录。

  6. 清除。

注意

此编码无法处理所有异常,因此在流量较高的条件下不能可靠地运行。建议利用 Kinesis Client Library (KCL) 通过 Kinesis 适配器使用 DynamoDB 中的流记录,如使用 DynamoDB 流 Kinesis 适配器处理流记录中所述。

以下各节将描述这些步骤,本演练结尾将显示完整的应用程序。

步骤 1:创建启用了流的表

第一步是在 DynamoDB 中创建一个表,如以下代码段所示。该表启用了一个流,该流将捕获已修改的每个项目的 NEW_AND_OLD_IMAGES

Copy
ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition() .withAttributeName("Id") .withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement() .withAttributeName("Id") .withKeyType(KeyType.HASH)); //Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema) .withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification);

步骤 2:描述表的流设置

DescribeTable API 可让您查看表的当前流设置。以下代码段将帮助您确认流已启用并且流将捕获正确数据。

Copy
DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn); System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());

步骤 3:修改表中的数据

下一步是对表中的数据进行一些更改。以下代码段将一个新项目添加到表,更新该项目中的属性,然后删除该项目。

Copy
// Add a new item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN("101")); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN("101")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Message", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++;

步骤 4:描述流中的分区

表中的数据修改将导致流记录被写入该表的流。

步骤 2:描述表的流设置中,我们已确定当前流 ARN 并已将其分配给变量 myStreamArn。我们可以将它与 DescribeStream 操作结合使用以获取流中的分区。

由于我们并未更改 DynamoDB 表中的过多数据,因此列表中只有一个分区。以下代码段演示如何获取此信息。

Copy
DescribeStreamResult describeStreamResult = streamsClient.describeStream(new DescribeStreamRequest() .withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards();

步骤 5:读取流记录

我们将为列表中的每个分区获取一个分区迭代器,然后使用该迭代器获取并打印流记录。

以下代码段使用一个循环来处理分区列表,即使只有一个分区也是如此。

Copy
for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println( "Processing " + shardId + " from stream "+ streamArn); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(myStreamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Use the iterator to read the data records from the shard GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest(). withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Getting records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } }

步骤 6:清除

演示已完成,我们可以将表删除。请注意,与此表关联的流可继续用于读取,即使表已删除。流将在 24 小时后自动被删除。

Copy
dynamoDBClient.deleteTable(tableName);