将 OpenSearch Ingestion 管道与 Amazon DynamoDB 结合使用
您可以使用 DynamoDB
无论有没有完整初始快照,都可以处理 DynamoDB 数据。
-
借助完整快照:DynamoDB 使用时间点故障恢复(PITR)创建备份,并将其上传至 Amazon S3。OpenSearch Ingestion 随后将快照编入一个或多个 OpenSearch 索引。为保持一致性,该管道将所有 DynamoDB 变更与 OpenSearch 进行同步。此选项要求同时启用 PITR 和 DynamoDB Streams。
-
没有快照:OpenSearch Ingestion 只会流式传输新的 DynamoDB 事件。如果您已有快照或者需要在无历史数据的情况下进行实时流式传输,则选择此选项。此选项要求仅启用 DynamoDB Streams。
有关的更多信息,请参阅《Amazon DynamoDB 开发人员指南》中的 DynamoDB zero-ETL integration with Amazon OpenSearch Service。
先决条件
要设置管道,您必须有一个已启用 DynamoDB Streams 的 DynamoDB 表。您的流应使用 NEW_IMAGE 流视图类型。但是,如果这种流视图类型适合您的用例,OpenSearch Ingestion 管道也可以使用 NEW_AND_OLD_IMAGES 流式传输事件。
如果使用快照,则还必须启用表中的时间点恢复。有关更多信息,请参阅《Amazon DynamoDB 开发人员指南》中的创建表、启用时间点恢复和启用流。
步骤 1:配置管道角色
设置 DynamoDB 表后,设置要在管道配置中使用的管道角色,并在该角色中添加以下 DynamoDB 权限:
您也可以使用 Amazon KMS 客户托管密钥加密导出数据文件。要解密导出的对象,请在管道的导出配置中指定 s3_sse_kms_key_id 作为密钥 ID,格式如下:arn:aws:kms:。以下策略包括使用客户自主管理型密钥时的必需权限:region:account-id:key/my-key-id
{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource":arn:aws:kms:}region:account-id:key/my-key-id
步骤 2:创建管道
然后,您可以配置如下所示的 OpenSearch Ingestion 管道,指定 DynamoDB 作为源。此示例管道使用 PITR 快照从 table-a 中摄取数据,然后从 DynamoDB Streams 摄取事件。LATEST 的起始位置指示管道应从 DynamoDB Streams 读取最新数据。
version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:region:account-id:table/table-a" export: s3_bucket: "my-bucket" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-east-1" sink: - opensearch: hosts: ["https://search-mydomain.region.es.amazonaws.com"] index: "${getMetadata(\"table-name\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"
您可以使用预先配置的 DynamoDB 蓝图,以创建此管道。有关更多信息,请参阅 使用蓝图。
数据一致性
OpenSearch Ingestion 支持端到端确认以确保数据持久性。管道读取快照或流时,它会动态创建分区以进行并行处理。管道在摄取 OpenSearch 域或集合中的所有记录后收到确认时,会将分区标记为已完成。
如果要摄取到 OpenSearch 无服务器搜索集合中,您可以在管道中生成文档 ID。如果要摄取到 OpenSearch 无服务器时间序列集合中,请注意管道不会生成文档 ID。
OpenSearch Ingestion 管道还会将传入事件操作映射到相应的批量索引操作,以帮助摄取文档。这可保持数据一致性,以使 DynamoDB 中的每个数据更改都与 OpenSearch 中的相应文档更改保持一致。
映射数据类型
OpenSearch Service 将每个传入文档中的数据类型动态映射到 DynamoDB 中相应的数据类型。下表显示 OpenSearch Service 如何自动映射各种数据类型。
| 数据类型 | OpenSearch | DynamoDB |
|---|---|---|
| 数字 |
OpenSearch 自动映射数值数据。如果该数字是整数,则 OpenSearch 会将其映射为长整型值。如果该数字是小数,则 OpenSearch 会将其映射为浮点值。 OpenSearch 会根据第一个发送的文档动态映射各种属性。如果您在 DynamoDB 中为同一属性混合了多种数据类型(例如整数和小数),则映射可能会失败。 例如,如果您的第一个文档具有整数属性,而后面的文档具有与小数相同的属性,则 OpenSearch 将无法摄取第二个文档。在这些情况下,应提供一个显式的映射模板,如下所示:
如果需要双精度,请使用字符串类型字段映射。OpenSearch 中没有支持 38 位精度的等效数字类型。 |
DynamoDB 支持数字。 |
| 数字集 | OpenSearch 自动将数字集映射到长整型值或浮点值数组。与标量数字一样,这取决于摄取的第一个数字是整数还是小数。您可以像映射标量字符串一样提供数字集的映射。 |
DynamoDB 支持表示数字集的类型。 |
| 字符串 |
OpenSearch 自动将字符串值映射为文本。在某些情况下(例如枚举值),您可以映射到关键字类型。 以下示例显示如何将名为
|
DynamoDB 支持字符串。 |
| 字符串集 |
OpenSearch 自动将字符串集映射到字符串数组。您可以像映射标量字符串一样提供字符串集的映射。 |
DynamoDB 支持表示字符串集的类型。 |
| 二元 |
OpenSearch 自动将二进制数据映射为文本。您可以提供一个映射,在 OpenSearch 中将其作为二进制字段写入。 以下示例显示如何将名为
|
DynamoDB 支持二进制类型属性。 |
| 二进制集 |
OpenSearch 自动将二进制集映射为文本形式的二进制数据数组。您可以像映射标量二进制一样提供数字集的映射。 |
DynamoDB 支持表示二进制值集的类型。 |
| 布尔值 |
OpenSearch 将 DynamoDB 布尔类型映射为 OpenSearch 布尔类型。 |
DynamoDB 支持布尔类型属性。 |
| Null |
OpenSearch 可以摄取具有 DynamoDB 空类型的文档。它将该值作为空值保存在文档中。此类型没有映射,并且此字段未编制索引或不可搜索。 如果对空类型使用相同的属性名称,然后更改为其他类型(例如字符串),则 OpenSearch 会为第一个非空值创建动态映射。后续值仍然可以是 DynamoDB 空值。 |
DynamoDB 支持空类型属性。 |
| Map |
OpenSearch 将 DynamoDB 映射属性映射到嵌套字段。嵌套字段内也适用相同的映射。 以下示例将嵌套字段中的字符串映射到 OpenSearch 中的关键字类型:
|
DynamoDB 支持映射类型属性。 |
| 列表 |
OpenSearch 根据 DynamoDB 列表中的内容为其提供不同的结果。 当列表包含所有相同类型的标量类型(例如,所有字符串的列表)时,OpenSearch 会摄取该列表作为该类型的数组。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射,为映射列表提供映射。 您无法提供混合类型的列表。 |
DynamoDB 支持列表类型属性。 |
| 设置 |
OpenSearch 根据 DynamoDB 集合中的内容为其提供不同的结果。 当集合包含所有相同类型的标量类型(例如,所有字符串的集合)时,OpenSearch 会摄取该集合作为该类型的数组。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射,为映射集合提供映射。 您无法提供混合类型的集合。 |
DynamoDB 支持表示集合的类型。 |
我们建议在 OpenSearch Ingestion 管道中配置死信队列(DLQ)。如果已配置该队列,OpenSearch Service 会将所有因动态映射失败而无法摄取的失败文档发送到该队列。
如果自动映射失败,则可以在管道配置中使用 template_type 和 template_content 来定义显式映射规则。或者,您可以在启动管道之前直接在搜索域或集合中创建映射模板。
限制
为 DynamoDB 设置 OpenSearch Ingestion 管道时,请考虑以下限制:
-
OpenSearch Ingestion 与 DynamoDB 的集成目前不支持跨区域摄取。您的 DynamoDB 表和 OpenSearch Ingestion 管道必须位于同一 Amazon Web Services 区域。
-
您的 DynamoDB 表和 OpenSearch Ingestion 管道必须位于同一 Amazon Web Services 账户。
-
OpenSearch Ingestion 管道仅支持将一个 DynamoDB 表作为其来源。
-
DynamoDB Streams 仅在日志中存储最多 24 小时的数据。如果从大型表的初始快照中摄取数据需要 24 小时或更长时间,则会丢失一些初始数据。为了缓解这种数据丢失,请估计表的大小并配置适当的 OpenSearch Ingestion 管道计算单元。
适用于 DynamoDB 的建议 CloudWatch 警报
建议使用以下 CloudWatch 指标来监控摄取管道的性能。这些指标可能有助您确定处理的导出数据量、处理的流事件量、处理导出和流事件时的错误数以及写入目标的文档数量。您可以设置 CloudWatch 警报,从而在其中任何指标在指定期限内超出指定的值时执行某个操作。
| 指标 | 描述 |
|---|---|
dynamodb-pipeline.BlockingBuffer.bufferUsage.value |
指示正在使用的缓冲区数量。 |
dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value
|
显示正在主动处理 Amazon S3 导出对象的 OCU 总数。 |
dynamodb-pipeline.dynamodb.bytesProcessed.count
|
处理的 DynamoDB 源字节数。 |
dynamodb-pipeline.dynamodb.changeEventsProcessed.count
|
处理的 DynamoDB 流更改事件数量。 |
dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count
|
处理的 DynamoDB 更改事件中的错误数。 |
dynamodb-pipeline.dynamodb.exportJobFailure.count
|
Number of export job submission attempts that have failed. |
dynamodb-pipeline.dynamodb.exportJobSuccess.count
|
Number of export jobs that have been submitted successfully. |
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count
|
处理的导出记录总数。 |
dynamodb-pipeline.dynamodb.exportRecordsTotal.count
|
从 DynamoDB 导出的记录总数,这对于跟踪数据导出量至关重要。 |
dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count
|
Total number of export data files that have been processed successfully from Amazon S3. |
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count
|
Count of errors during bulk requests due to malformed request. |
dynamodb-pipeline.opensearch.bulkRequestLatency.avg
|
Average latency for bulk write requests made to OpenSearch. |
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count
|
Number of bulk requests that failed because the target data could not be found. |
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count
|
Number of retries by OpenSearch Ingestion pipelines to write OpenSearch cluster. |
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum
|
Total size in bytes of all bulk requests made to OpenSearch. |
dynamodb-pipeline.opensearch.documentErrors.count
|
Number of errors when sending documents to OpenSearch. The documents causing the errors witll be sent to DLQ. |
dynamodb-pipeline.opensearch.documentsSuccess.count
|
Number of documents successfully written to an OpenSearch cluster or collection. |
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count
|
Number of documents successfully indexed in OpenSearch on the first attempt. |
|
|
Count of errors due to version conflicts in documents during processing. |
|
|
Average latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writint to the destination. |
dynamodb-pipeline.opensearch.PipelineLatency.max
|
Maximum latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing the destination. |
dynamodb-pipeline.opensearch.recordsIn.count
|
Count of records successfully ingested into OpenSearch. This metric is essential for tracking the volume of data being processed and stored. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count
|
Number of records that failed to write to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count
|
Number of records that are written to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count
|
Count of latency measurements for requests to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum
|
Total latency for all requests to the Amazon S3 dead-letter queue |
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum
|
Total size in bytes of all requests made to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.recordsProcessed.count
|
Total number of records processed in the pipeline, a key metric for overal throughput. |
dynamodb.changeEventsProcessed.count
|
No records are being gathered from DynamoDB streams. This could be due to no activitiy on the table, an export being in progress, or an issue accessing the DynamoDB streams. |
|
|
The attempt to trigger an export to S3 failed. |
|
|
Count of bulk request errors in OpenSearch due to invalid input, crucial for monitoring data quality and operational issues. |
opensearch.EndToEndLatency.avg
|
The end to end latnecy is higher than desired for reading from DynamoDB streams. This could be due to an underscaled OpenSearch cluster or a maximum pipeline OCU capacity that is too low for the WCU throughput on the DynamoDB table. This end to end latency will be high after an export and should decrease over time as it catches up to the latest DynamoDB streams. |