使用带机器学习离线批量推理的 OpenSearch 摄取管道 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用带机器学习离线批量推理的 OpenSearch 摄取管道

Amazon OpenSearch Ingestion (OSI) 管道支持机器学习 (ML) 离线批量推理处理,可以低成本高效地丰富大量数据。只要有可以异步处理的大型数据集,就可以使用离线批量推理。离线批量推理适用于 Amazon Bedrock 和模型。 SageMaker 此功能适用于所有支持 S OpenSearch ervice 2. Amazon Web Services 区域 17+ 域名的 OpenSearch Ingestion。

注意

离线批量推理处理利用了 OpenSearch 名为 ML Commons 的功能。机器学习共享资源通过传输和 REST API 调用提供机器学习算法。这些调用为每个 ML 请求选择正确的节点和资源,并监控 ML 任务以确保正常运行时间。通过这种方式,机器学习共享资源允许您利用现有的开源机器学习算法,减少开发新的机器学习功能所需的工作量。有关 ML Commons 的更多信息,请参阅 OpenSearch .org 文档中的机器学习

工作原理

通过向管道添加机器学习推理处理器,您可以在 In OpenSearch gestion 上创建离线批量推理管道。此处理器使您的管道能够连接到 AI 服务 SageMaker ,例如运行批量推理作业。您可以将处理器配置为通过目标域上运行的 AI 连接器(支持 batch_predic t)连接到所需的 AI 服务。

OpenSearch Ingestion 使用带有 ML Commons 的ml_inference处理器来创建离线批量推理作业。然后,机器学习共享资源使用 batch_predic t API,该API使用部署在亚马逊 Bedrock、Amazon、 SageMaker Cohere和OpenAI的外部模型服务器上的模型,以离线异步模式对大型数据集进行推理。下图显示了一个 OpenSearch Ingestion 管道,该管道协调多个组件以端到端地执行此过程:

批量 AI 推理处理的三管道架构。

管道组件的工作原理如下:

管道 1(数据准备和转换)*:

  • 来源:数据是从 OpenSearch Ingestion 支持的外部来源扫描的。

  • 数据处理器:处理原始数据并将其转换为正确的格式,以便在集成的 AI 服务上进行批量推理。

  • S3(Sink):处理过的数据暂存在 Amazon S3 存储桶中,准备用作在集成 AI 服务上运行批量推理任务的输入。

管道 2(触发 ML batch_inference):

  • 来源:自动检测由管道 1 的输出创建的新文件的 S3 事件。

  • ml_inference 处理器:通过异步批处理作业生成 ML 推断的处理器。它通过在目标域上运行的已配置的 AI 连接器连接到 AI 服务。

  • 任务 ID:每个批处理作业都与 ml-commons 中的任务 ID 相关联,用于跟踪和管理。

  • OpenSearch ML Commons:ML Commons 托管实时神经搜索模型,管理与远程 AI 服务器的连接器, APIs 并为批量推理和作业管理提供服务。

  • 人工智能服务: OpenSearch 机器学习共享资源与 Amazon Bedrock 和 Amazon 等人工智能服务 SageMaker进行交互,对数据进行批量推断,从而生成预测或见解。结果将异步保存到单独的 S3 文件中。

管道 3(批量摄取):

  • S3(来源):批处理作业的结果存储在 S3 中,这是该管道的来源。

  • 数据转换处理器:在摄取之前,对批量推理输出进行进一步的处理和转换。这样可以确保数据在 OpenSearch 索引中正确映射。

  • OpenSearch index(Sink):将处理后的结果编入索引, OpenSearch 以便存储、搜索和进一步分析。

注意

*管道 1 描述的过程是可选的。如果您愿意,可以跳过该过程,只需将准备好的数据上传到 S3 接收器中即可创建批处理作业。

关于 ml_inference 处理器

OpenSearch Ingestion 使用 S3 扫描源和 ML 推理处理器之间的专门集成进行批处理。S3 扫描在仅限元数据模式下运行,无需读取实际文件内容即可高效收集 S3 文件信息。ml_inference处理器使用 S3 文件与 ML Commons 协调 URLs 以进行批处理。这种设计通过最大限度地减少扫描阶段不必要的数据传输来优化批量推理工作流程。您可以使用参数定义ml_inference处理器。示例如下:

processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://Amazon test-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "Amazon TestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # Amazon configuration settings aws: # Amazon Web Services 区域 where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml # Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"

使用 ml_inference 处理器提高摄取性能

OpenSearch Ingestion ml_inference 处理器显著增强了支持 ML 的搜索的数据摄取性能。该处理器非常适合需要机器学习模型生成数据的用例,包括语义搜索、多模态搜索、文档丰富和查询理解。在语义搜索中,处理器可以将大体积、高维向量的创建和摄取速度提高一个数量级。

与实时模型调用相比,处理器的离线批量推断功能具有明显的优势。虽然实时处理需要容量有限的实时模型服务器,但批量推理可以根据需要动态扩展计算资源,并行处理数据。例如,当 OpenSearch 摄取管道收到十亿个源数据请求时,它会为机器学习批量推理输入创建 100 个 S3 文件。然后,ml_inference处理器使用 100 个 ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) 实例启动 SageMaker 批处理作业,在 14 小时内完成 10 亿个请求的矢量化——在实时模式下几乎不可能完成这项任务。

将 ml_inference 处理器配置为提取语义搜索的数据请求

以下过程将引导您完成设置和配置 OpenSearch Ingestion ml_inference 处理器的过程,以使用文本嵌入模型提取十亿个用于语义搜索的数据请求。

步骤 1:创建连接器并在中注册模型 OpenSearch

在以下步骤中,使用 ML Commons batch_inference_sagemaker_connector_blueprint 在亚马逊中创建连接器和模型。 SageMaker如果您更喜欢使用 OpenSearch Amazon CloudFormation 集成模板,请参阅本节(替代程序)步骤 1:使用 Amazon CloudFormation 集成模板创建连接器和模型后面的内容。

要在中创建连接器并注册模型 OpenSearch
  1. 在中创建深度 Java 库 (DJL) 机器学习模型以 SageMaker 进行批量转换。要查看其他 DJL 模型,请参阅 semantic_search_with_cfn_ tem plate_for_sageMaker,网址为: GitHub

    POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } }
  2. actions字段中使用batch_predict以下新action类型创建连接器:

    POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] }
  3. 使用返回的连接器 ID 注册 SageMaker 模型:

    POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" }
  4. 使用以下batch_predict操作类型调用模型:

    POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }

    响应包含批处理作业的任务 ID:

    { "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" }
  5. 使用任务 ID 调用 Get Task API 来检查批处理作业状态:

    GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567

    响应包含任务状态:

    { "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }

(替代程序)步骤 1:使用 Amazon CloudFormation 集成模板创建连接器和模型

如果您愿意,可以使用 Amazon CloudFormation 自动创建所有必需的 Amazon SageMaker 连接器和模型以进行机器学习推理。此方法通过使用 Amazon S OpenSearch ervice 控制台中提供的预配置模板来简化设置。有关更多信息,请参阅 用于 Amazon CloudFormation 为语义搜索设置远程推理

部署可创建所有必需 SageMaker 连接器和模型的 Amazon CloudFormation 堆栈
  1. 打开亚马逊 OpenSearch 服务控制台。

  2. 在导航窗格中,选择集成

  3. 在 “搜索” 字段中输入SageMaker,然后选择 “通过 Amazon 与文本嵌入模型集成” SageMaker。

  4. 选择配置域,然后选择配置 VPC 域配置公共域

  5. 在模板字段中输入信息。在 “启用离线批量推理” 中,选择 true 以配置用于离线批处理的资源。

  6. 选择 “创建” 以创建 Amazon CloudFormation 堆栈。

  7. 创建堆栈后,在 Amazon CloudFormation 控制台中打开输出选项卡找到 c onnector_id 和 model_id稍后在配置管道时,您将需要这些值。

步骤 2:为机器学习离线批量推理创建 OpenSearch 摄取管道

使用以下示例为机器学习离线批量推 OpenSearch 断创建采集管道。有关为 OpenSearch Ingestion 创建管道的更多信息,请参阅。创建 Amazon OpenSearch Ingestion 管道

开始之前

在以下示例中,您可以为参数指定 IAM 角色 ARN。sts_role_arn使用以下步骤验证此角色是否已映射到有权访问 ml-commons 的后端角色。 OpenSearch

  1. 导航到您的 OpenSearch 服务域的 OpenSearch 仪表板插件。您可以在 OpenSearch 服务控制台的域控制面板上找到仪表板终端节点。

  2. 从主菜单中选择安全角色,然后选择 ml_full_access 角色。

  3. 选择映射的用户管理映射

  4. 后端角色下,输入需要权限才能调用您的域名的 Lambda 角色的 ARN。这是一个例子:arn: aws: iam::: role/ 111122223333 lambda-role

  5. 选择映射并确认在映射的用户下显示的用户或角色。

为机器学习离线批量推理创建 OpenSearch 摄取管道的示例

version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name: name data_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name: name data_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region: name default_bucket_owner: account_ID codec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml - copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null

第 3 步:准备好数据以供摄取

要为机器学习离线批量推理处理准备数据,请使用自己的工具或流程自行准备数据,也可以使用 OpenSearch Data Prepper。通过使用管道来使用数据源中的数据,或者通过创建机器学习数据集,验证数据是否按正确的格式组织。

以下示例使用 MS MARCO 数据集,其中包括一组用于自然语言处理任务的真实用户查询。数据集采用 JSONL 格式结构,其中每行表示发送到 ML 嵌入模型的请求:

{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...

要使用 MS MARCO 数据集进行测试,请想象一下这样的场景:您构建 10 亿个输入请求,分布在 100 个文件中,每个文件包含 1000 万个请求。这些文件将存储在亚马逊 S3 中,前缀为 s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/。 OpenSearch Ingestion 管道将同时扫描这 100 个文件,并启动一个包含 100 个工作人员的 SageMaker 批处理作业进行并行处理,从而实现高效的矢量化处理,并将这十亿个文档摄入其中。 OpenSearch

在生产环境中,您可以使用 OpenSearch 摄取管道为批量推理输入生成 S3 文件。该管道支持各种数据源,并按计划运行,将源数据持续转换为 S3 文件。然后,AI 服务器会通过计划的离线批处理作业自动处理这些文件,从而确保持续的数据处理和摄取。

步骤 4:监控批量推理作业

您可以使用 SageMaker 控制台或监控批量推理作业。 Amazon CLI您还可以使用 Get Task API 来监控批处理作业:

GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }

API 会返回活跃的批处理作业任务列表:

{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }

监控批量推理作业并确认其已完成后,您可以运行各种类型的 AI 搜索,包括语义搜索、混合搜索、对话式(使用 RAG)、神经稀疏搜索和多模态搜索。有关 OpenSearch 服务支持的 AI 搜索的更多信息,请参阅 AI 搜索

要搜索原始向量,请使用knn查询类型,提供vector数组作为输入,并指定返回结果的k数量:

GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }

要运行 AI 驱动的搜索,请使用neural查询类型。指定query_text输入、您在 model_id OpenSearch Ingestion 管道中配置的嵌入模型以及返回的结果k数。要从搜索结果中排除嵌入,请在_source.excludes参数中指定嵌入字段的名称:

GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }