使用 Java 在 Amazon DocumentDB 中执行 CRUD 操作 - Amazon DocumentDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Java 在 Amazon DocumentDB 中执行 CRUD 操作

本节讨论如何使用 MongoDB Java 驱动程序在 Amazon DocumentDB 中执行 CRUD(创建、读取、更新、删除)操作。

在 DocumentDB 集合中创建和插入文档

向 Amazon DocumentDB 插入文档可以让您向集合添加新数据。根据您的需求和正在处理的数据量,可采用多种方法执行插入。向集合插入单个文档的最基本方法是 insertOne()。要一次插入多个文档,可以使用 insertMany() 方法,该方法允许您在单个操作中添加文档数组。在 DocumentDB 集合中插入许多文档的另一个方法是 bulkWrite()。在本指南中,我们将讨论在 DocumentDB 集合中创建文档的所有方法。

insertOne()

让我们首先研究如何向 Amazon DocumentDB 集合插入单个文档。可使用 insertOne() 方法实现插入单个文档。此方法采用BsonDocument用于插入,并返回一个InsertOneResult可用于获取新插入文档的对象 ID 的对象 ID 的对象。以下示例代码显示了向集合插入一个餐厅文档:

Document article = new Document() .append("restaurantId", "REST-21G145") .append("name", "Future-proofed Intelligent Bronze Hat") .append("cuisine", "International") .append("rating", new Document() .append("average", 1.8) .append("totalReviews", 267)) .append("features", Arrays.asList("Outdoor Seating", "Live Music")); try { InsertOneResult result = collection.insertOne(article); System.out.println("Inserted document with the following id: " + result.getInsertedId()); } catch (MongoWriteException e) { // Handle duplicate key or other write errors System.err.println("Failed to insert document: " + e.getMessage()); throw e; } catch (MongoException e) { // Handle other MongoDB errors System.err.println("MongoDB error: " + e.getMessage()); throw e; }

使用 insertOne() 时,请确保包含适当的错误处理。例如,在上面的代码中,“restaurantId”具有唯一索引,因此再次运行此代码将触发以下 MongoWriteException

Failed to insert document: Write operation error on server docdbCluster.docdb.amazonaws.com:27017. Write error: WriteError{code=11000, message='E11000 duplicate key error collection: Restaurants index: restaurantId_1', details={}}.

insertMany()

用于向集合插入许多文档的主要方法是 insertMany() 和 bulkWrite()

insertMany() 方法是在单个操作中插入多个文档的最简单方法。该方法接受文档列表并将其插入到集合中。当您要插入一批相互独立且无需任何特殊处理或混合操作的新文档时,此方法最为理想。以下代码显示了从文件中读取 JSON 文档并将其插入到集合中的过程。该insertMany()函数返回一个可用于获取所有插入文档 IDs 的InsertManyResultInsertManyResult对象。

// Read JSON file content String content = new String(Files.readAllBytes(Paths.get(jsonFileName))); JSONArray jsonArray = new JSONArray(content); // Convert JSON articles to Documents List < Document > restaurants = new ArrayList < > (); for (int i = 0; i < jsonArray.length(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); Document doc = Document.parse(jsonObject.toString()); restaurants.add(doc); } //insert documents in collection InsertManyResult result = collection.insertMany(restaurants); System.out.println("Count of inserted documents: " + result.getInsertedIds().size());

bulkWrite()

bulkWrite() 方法允许在单个批处理中执行多个写入操作(插入、更新、删除)。当您需要在单个批处理中执行不同类型的操作时(例如在更新其他文档的同时插入一些文档),可以使用 bulkWrite()bulkWrite() 支持两种类型的批处理写入,有序和无序:

  • 有序操作 –(默认)Amazon DocumentDB 按顺序处理写入操作,并在遇到首个错误时停止。当操作顺序至关重要时(例如后续操作依赖先前操作),有序操作很有用。但是,有序操作通常比无序操作慢。对于有序操作,您必须解决批处理在遇到首个错误时停止的情况,这可能会导致部分操作未被处理。

  • 无序操作 – 允许 Amazon DocumentDB 将插入操作作为数据库中的单次执行来处理。如果一个文档出现错误,将继续对剩余文档执行操作。这在您要插入大量数据并且可以容忍某些失败时尤为有用,例如,在数据迁移或批量导入期间,某些文档可能会因为键重复而导致失败。对于无序操作,您必须处理部分成功场景,即部分操作成功而其他操作失败。

使用 bulkWrite() 方法时,需要一些基本类。首先,WriteModel 类充当所有写入操作的基类,并且具有特定的实现如 InsertOneModelUpdateOneModelUpdateManyModelDeleteOneModelDeleteManyModel,可用于处理不同类型的操作。

BulkWriteOptions类是配置批量操作行为所必需的,例如设置 ordered/unordered 执行或绕过文档验证。BulkWriteResult 类提供有关执行结果的详细信息,包括已插入、已更新和已删除的文档的数量。

对于错误处理,MongoBulkWriteException 类至关重要,因为其包含有关批量操作期间所有失败的信息,而 BulkWriteError 类则提供有关单个操作失败的具体详细信息。以下代码显示了在执行单个 bulkWrite() 方法调用的过程中插入文档列表以及更新和删除单个文档的示例。该代码还显示了如何使用 BulkWriteOptionsBulkWriteResult,以及如何对 bulkWrite() 操作进行正确的错误处理。

List < WriteModel < Document >> bulkOperations = new ArrayList < > (); // get list of 10 documents representing 10 restaurants List < Document > restaurantsToInsert = getSampleData(); for (Document doc: restaurantsToInsert) { bulkOperations.add(new InsertOneModel < > (doc)); } // Update operation bulkOperations.add(new UpdateOneModel < > ( new Document("restaurantId", "REST-Y2E9H5"), new Document("", new Document("stats.likes", 20)) .append("", new Document("rating.average", 4.5)))); // Delete operation bulkOperations.add(new DeleteOneModel < > (new Document("restaurantId", "REST-D2L431"))); // Perform bulkWrite operation try { BulkWriteOptions options = new BulkWriteOptions() .ordered(false); // Allow unordered inserts BulkWriteResult result = collection.bulkWrite(bulkOperations, options); System.out.println("Inserted: " + result.getInsertedCount()); System.out.println("Updated: " + result.getModifiedCount()); System.out.println("Deleted: " + result.getDeletedCount()); } catch (MongoBulkWriteException e) { System.err.println("Bulk write error occurred: " + e.getMessage()); // Log individual write errors for (BulkWriteError error: e.getWriteErrors()) { System.err.printf("Error at index %d: %s (Code: %d)%n", error.getIndex(), error.getMessage(), error.getCode()); // Log the problematic document Document errorDoc = new Document(error.getDetails()); if (errorDoc != null) { System.err.println("Problematic document: " + errorDoc); } } } catch (Exception e) { System.err.println("Error during bulkWrite: " + e.getMessage()); }

可重试写入

与 MongoDB 不同,Amazon DocumentDB 不支持可重试写入。因此,您必须在其应用程序中实现自定义的重试逻辑,尤其用于处理网络问题或临时服务不可用情况。通常,实现良好的重试策略包括增加重试之间的延迟和限制总重试次数。有关使用错误处理构建重试逻辑的代码示例,请参阅下面的 使用重试逻辑进行错误处理

从 DocumentDB 集合中读取和检索数据

在 Amazon DocumentDB 中查询文档围绕几个关键组件展开,这些组件允许您精确检索和操作数据。该find()方法是 MongoDB Jav APIs a 驱动程序中的基本查询方法。该方法允许复杂数据检索,并提供多种选项用于筛选、排序和投影结果。除了 find() 方法之外,FiltersFindIterable 是另外两个基本组件,为 MongoDB Java 驱动程序中的查询操作提供构建块。

Filters 类是 MongoDB Java 驱动程序中的一个实用程序类,提供流畅的 API 用于构造查询筛选条件。该类提供静态工厂方法,用于创建表示各种查询条件的 Bson 对象的实例。最常用的方法包括用于等值比较的 eq(),用于数值比较的 gt()lt()gte()lte(),用于组合多个条件的 and()or(),用于数组成员资格测试的 in()nin(),以及用于用于模式匹配的 regex()。该类采用类型安全设计,与基于原始文档的查询相比,可提供更好的编译时检查,使其成为在 Java 应用程序中构造 DocumentDB 查询的首选方法。错误处理非常强大,对于无效的筛选条件构造,会抛出明确的异常。

FindIterable 是一个专用接口,旨在处理 find() 方法的结果。该接口提供一组丰富的方法来优化和控制查询执行,提供流畅的 API 以进行方法链接。该接口包含基本的查询修改方法,例如,limit() 用于限制返回的文档数量,skip() 用于分页,sort() 用于对结果排序,projection() 用于选择特定字段,以及 hint() 用于选择索引。FindIterable 中的 batch、skip 和 limit 操作是基本的分页和数据管理工具,可帮助控制如何从数据库中检索和处理文档。

Batching(batchSize)控制单次网络往返中 DocumentDB 向客户端返回的文档数量。设置批处理大小时,DocumentDB 不会一次性返回所有匹配的文档,而是按指定的批处理大小分组返回。

Skip 允许您偏移结果的起始点,本质上是告知 DocumentDB 在开始返回匹配项之前跳过指定数量的文档。例如,skip(20) 将绕过前 20 个匹配文档。这通常用于要检索后续结果页面的分页场景。

Limit 限制可以从查询返回的文档总数。当您指定 limit(n) 时,即使数据库中存在更多匹配项,DocumentDB 也将在返回“n”个文档后停止返回文档。

从 Amazon DocumentDB 检索文档时,FindIterable 支持迭代器和光标模式。使用 FindIterable 作为迭代器的优势在于,允许文档延迟加载,并且仅在应用程序请求时才获取文档。使用迭代器的另一个优势在于,您无需负责维护到集群的连接,因此无需显式关闭连接。

FindIterable 还支持 MongoCursor,允许在处理 Amazon DocumentDB 查询时使用光标模式。MongoCursor 是特定于 MongoDB Java 驱动程序的实现,用于控制数据库操作和资源管理。它实现了AutoCloseable接口,允许通过 try-with-resources块进行明确的资源管理,这对于正确关闭数据库连接和释放服务器资源至关重要。默认情况下,光标会在 10 分钟后超时,并且 DocumentDB 不允许您选择更改此超时行为。使用批处理数据时,请确保在光标超时前检索下一批数据。使用 MongoCursor 时的一个关键考虑因素是,需要显式关闭以防止资源泄漏。

本节介绍了 find()FiltersFindIterable 的几个示例。

以下代码示例显示了如何使用 find() 通过其“restaurantId”字段检索单个文档:

Document filter = new Document("restaurantId", "REST-21G145"); Document result = collection.find(filter).first();

尽管使用 Filters 可以更好地检查编译时错误,但 Java 驱动程序也允许您直接在 find() 方法中指定 Bson 筛选条件。以下示例代码可将 Bson 文档传递给 find()

result = collection.find(new Document("$and", Arrays.asList( new Document("rating.totalReviews", new Document("$gt", 1000)), new Document("priceRange", "$$"))))

下一个示例代码显示了将 Filters 类与 find() 结合使用的几个示例:

FindIterable < Document > results; // Exact match results = collection.find(Filters.eq("name", "Thai Curry Palace")); // Not equal results = collection.find(Filters.ne("cuisine", "Thai")); // find an element in an array results = collection.find(Filters.in("features", Arrays.asList("Private Dining"))); // Greater than results = collection.find(Filters.gt("rating.average", 3.5)); // Between (inclusive) results = collection.find(Filters.and( Filters.gte("rating.totalReviews", 100), Filters.lte("rating.totalReviews", 200))); // AND results = collection.find(Filters.and( Filters.eq("cuisine", "Thai"), Filters.gt("rating.average", 4.5))); // OR results = collection.find(Filters.or( Filters.eq("cuisine", "Thai"), Filters.eq("cuisine", "American"))); // All document where the Field exists results = collection.find(Filters.exists("michelin")); // Regex results = collection.find(Filters.regex("name", Pattern.compile("Curry", Pattern.CASE_INSENSITIVE))); // Find all document where the array contain the list of value regardless of its order results = collection.find(Filters.all("features", Arrays.asList("Private Dining", "Parking"))); // Array size results = collection.find(Filters.size("features", 4));

以下示例显示了如何在 FindIterable 对象上链接 sort()skip()limit()、和 batchSize() 操作。这些操作的提供顺序将影响查询性能。作为最佳实践,这些操作的顺序应为 sort()projection()skip()limit()batchSize()

FindIterable < Document > results = collection.find(Filters.gt("rating.totalReviews", 1000)) // Sorting .sort(Sorts.orderBy( Sorts.descending("address.city"), Sorts.ascending("cuisine"))) // Field selection .projection(Projections.fields( Projections.include("name", "cuisine", "priceRange"), Projections.excludeId())) // Pagination .skip(20) .limit(10) .batchSize(2);

以下示例代码显示了在 FindIterable 上创建迭代器。该代码使用 Java 的 forEach 构造遍历结果集。

collection.find(Filters.eq("cuisine", "American")).forEach(doc -> System.out.println(doc.toJson()));

在最后一个 find() 代码示例中,显示了如何使用 cursor() 进行文档检索。其在 try 块中创建光标,确保在代码退出 try 块时关闭光标。

try (MongoCursor < Document > cursor = collection.find(Filters.eq("cuisine", "American")) .batchSize(25) .cursor()) { while (cursor.hasNext()) { Document doc = cursor.next(); System.out.println(doc.toJson()); } } // Cursor automatically closed

更新 DocumentDB 集合中的现有文档

Amazon DocumentDB 提供了灵活而强大的机制,可修改现有文档,并在文档不存在时插入新文档。MongoDB Java 驱动程序提供了多种更新方法:updateOne() 用于单个文档更新,updateMany() 用于多个文档更新,以及 replaceOne() 用于完整文档替换。除了这三个方法之外,UpdatesUpdateOptionsUpdateResult 是其他基本组件,为 MongoDB Java 驱动程序中的更新操作提供构建块。

MongoDB Java 驱动程序中的 Updates 类是一个实用程序类,提供用于创建更新运算符的静态工厂方法。其充当主要生成器,以类型安全且可读的方式构造更新操作。诸如 set()unset()、和 inc() 之类的基本方法允许直接修改文档。当使用 Updates.combine() 方法(允许以原子方式执行多个更新操作,从而确保数据一致性)组合多个操作时,该类的功能优势尤为突出。

UpdateOptions 是 MongoDB 的 Java 驱动程序中的一个功能强大的配置类,为文档更新操作提供基本的自定义功能。该类的两个重要方面:为更新操作提供更新插入和数组筛选条件支持。通过 upsert(true) 启用的更新插入功能允许在更新操作期间未找到匹配文档时创建新文档。通过 arrayFilters(),更新操作可以精确更新满足特定条件的数组元素。

MongoDB 的 Java 驱动程序中的 UpdateResult 提供反馈机制,详细说明更新操作的结果。该类封装了三个关键指标:与更新条件匹配的文档数量(matchedCount)、实际修改的文档数量(modifiedCount)以及有关任何已更新插入文档的信息(upsertedId)。了解这些指标对于正确处理错误、验证更新操作和维护应用程序中的数据一致性而言至关重要。

更新并替换单个文档

在 DocumentDB 中,可以使用 updateOne() 方法来实现更新单个文档。该方法采用三个参数:通常由 Filters 类提供的用于标识要更新文档的 filter 参数、用于确定要更新字段的 Update 参数以及用于设置不同更新选项的可选 UpdateOptions 参数。使用 updateOne() 方法只会更新第一个符合选择条件的文档。以下示例代码将更新一个文档的单个字段:

collection.updateOne(Filters.eq("restaurantId", "REST-Y2E9H5"), Updates.set("name", "Amazing Japanese sushi"));

要更新一个文档中的多个字段,请结合使用 updateOne()Update.combine(),如以下示例所示。此示例还显示了如何向文档中的数组添加项。

List<Bson> updates = new ArrayList<>(); // Basic field updates updates.add(Updates.set("name", "Shanghai Best")); // Array operations updates.add(Updates.addEachToSet("features", Arrays.asList("Live Music"))); // Counter updates updates.add(Updates.inc("rating.totalReviews", 10)); // Combine all updates Bson combinedUpdates = Updates.combine(updates); // Execute automic update with one call collection.updateOne(Filters.eq("restaurantId","REST-1J83NH"), combinedUpdates);

以下代码示例演示了如何更新数据库中的文档。如果指定的文档不存在,则操作会自动将其作为新文档插入。此代码还显示了如何使用通过 UpdateResult 对象提供的指标。

Bson filter = Filters.eq("restaurantId", "REST-0Y9GL0"); Bson update = Updates.set("cuisine", "Indian"); // Upsert operation UpdateOptions options = new UpdateOptions().upsert(true); UpdateResult result = collection.updateOne(filter, update, options); if (result.getUpsertedId() != null) { System.out.println("Inserted document with _id: " + result.getUpsertedId()); } else { System.out.println("Updated " + result.getModifiedCount() + " document(s)"); }

以下代码示例演示了如何使用 replaceOne() 方法将现有文档完全替换为新文档,而非更新单个字段。replaceOne() 方法将覆盖整个文档,仅保留原始文档的 _id 字段。如果多个文档符合筛选条件,则仅替换首个匹配文档。

Document newDocument = new Document() .append("restaurantId", "REST-0Y9GL0") .append("name", "Bhiryani Adda") .append("cuisine", "Indian") .append("rating", new Document() .append("average", 4.8) .append("totalReviews", 267)) .append("features", Arrays.asList("Outdoor Seating", "Live Music")); UpdateResult result = collection.replaceOne( Filters.eq("restaurantId", "REST-0Y9GL0"), newDocument); System.out.printf("Modified %d document%n", result.getModifiedCount());

更新多个文档

可通过两种方式同时更新集合中的多个文档。您可以使用 updateMany() 方法,或结合使用 UpdateManyModelbulkWrite() 方法。updateMany() 方法使用 filter 参数来选择要更新的文档,使用 Update 参数来标识要更新的字段,使用可选的 UpdateOptions 参数来指定更新选项。

以下示例代码演示了 updateMany() 方法的使用:

Bson filter = Filters.and( Filters.in("features", Arrays.asList("Private Dining")), Filters.eq("cuisine", "Thai")); UpdateResult result1 = collection.updateMany(filter, Updates.set("priceRange", "$$$"));

以下示例代码演示了使用相同更新的 bulkWrite() 方法:

BulkWriteOptions options = new BulkWriteOptions().ordered(false); List < WriteModel < Document >> updates = new ArrayList < > (); Bson filter = Filters.and( Filters.in("features", Arrays.asList("Private Dining")), Filters.eq("cuisine", "Indian")); Bson updateField = Updates.set("priceRange", "$$$"); updates.add(new UpdateManyModel < > (filter, updateField)); BulkWriteResult result = collection.bulkWrite(updates, options); System.out.printf("Modified %d document%n", result.getModifiedCount());

从 DocumentDB 集合中移除文档

MongoDB Java 驱动程序提供 deleteOne() 用于移除单个文档,并且提供 deleteMany() 用于移除符合特定条件的多个文档。与更新类似,删除操作也可以与 bulkWrite() 方法结合使用。deleteOne()deleteMany() 都会返回 DeleteResult 对象,该对象提供有关操作结果的信息,包括已删除文档的数量。以下是使用 deleteMany() 移除多个文档的示例:

Bson filter = Filters.and( Filters.eq("cuisine", "Thai"), Filters.lt("rating.totalReviews", 50)); DeleteResult result = collection.deleteMany(filter); System.out.printf("Deleted %d document%n", result.getDeletedCount());

使用重试逻辑进行错误处理

Amazon DocumentDB 强大的错误处理策略应将错误分为可重试错误(例如网络超时、连接问题)和不可重试错误(例如身份验证失败、无效查询)。对于因可重试的错误而导致的操作失败,可以实现每次重试时间延迟以及最大重试次数。CRUD 操作应该位于捕获 MongoException 及其子类的 try-catch 块中。此外,还应包括错误的监控与日志记录,以实现操作可见性。以下是显示如何实现重试错误处理的示例代码:

int MAX_RETRIES = 3; int INITIAL_DELAY_MS = 1000; int retryCount = 0; while (true) { try { crud_operation(); //perform crud that will throw MongoException or one of its subclass break; } catch (MongoException e) { if (retryCount < MAX_RETRIES) { retryCount++; long delayMs = INITIAL_DELAY_MS * (long) Math.pow(2, retryCount - 1); try { TimeUnit.MILLISECONDS.sleep(delayMs); } catch (InterruptedException t) { Thread.currentThread().interrupt(); throw new RuntimeException("Retry interrupted", t); } continue; } else throw new RuntimeException("Crud operation failed", e); } }