将 OpenSearch Ingestion 管道与机器学习离线批量推理结合使用 - Amazon OpenSearch Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 OpenSearch Ingestion 管道与机器学习离线批量推理结合使用

Amazon OpenSearch Ingestion(OSI)管道支持机器学习(ML)离线批量推理处理,可以低成本高效地丰富大量数据。如果具有可异步处理的大型数据集,即可使用离线批量推理。离线批量推理适用于 Amazon Bedrock 和 SageMaker 模型。此功能在所有支持带有 OpenSearch Service 2.17+ 域的 OpenSearch Ingestion 的 Amazon Web Services 区域 均可用。

注意

离线批量推理处理利用 OpenSearch 一项名为 ML Commons 的功能。ML Commons 通过传输和 REST API 调用提供机器学习算法。这些调用为每个机器学习请求选择正确的节点和资源,并监控机器学习任务,以确保正常运行。通过这种方式,ML Commons 使您可以利用现有的开源机器学习算法,减少开发新的机器学习功能所需的工作量。有关 ML Commons 的更多信息,请参阅 OpenSearch.org 文档中的机器学习

工作方式

通过向管道添加机器学习推理处理器,可在 OpenSearch Ingestion 上创建离线批量推理管道。该处理器使您的管道能够连接到 SageMaker 等人工智能服务,以运行批量推理作业。您可以通过在目标域上运行的人工智能连接器(支持 batch_predict)配置处理器,使其连接到所需的人工智能服务。

OpenSearch Ingestion 使用带 ML Commons 的 ml_inference 处理器,创建离线批量推理作业。ML Commons 随后使用 batch_predict API,该 API 通过部署在 Amazon Bedrock、Amazon SageMaker、Cohere 和OpenAI 外部模型服务器上的模型,以离线异步模式对大型数据集进行推理。下图显示 OpenSearch Ingestion 管道,该管道协调多个组件以执行端到端的流程:

批量 AI 推理处理的三管道架构。

管道组件的工作原理如下:

管道 1(数据准备和转换)*:

  • 来源:从 OpenSearch Ingestion 支持的外部来源扫描数据。

  • 数据处理器:原始数据经过处理并转换为正确格式,以便在集成的人工智能服务上进行批量推理。

  • S3(接收器):处理后的数据暂存于 Amazon S3 存储桶中,可随时作为输入数据用于在集成的 AI 服务上运行批量推理任务。

管道 2(触发 ML batch_inference):

  • 来源:通过自动化 S3 事件检测,识别由管道 1 输出创建的新文件。

  • Ml_inference 处理器:通过异步批处理作业生成 ML 推理的处理器。该处理器通过在目标域上运行的已配置的 AI 连接器连接到 AI 服务。

  • 任务 ID:每个批处理作业在 ml-commons 中都关联一个任务 ID,用于跟踪和管理。

  • OpenSearch ML Commons:ML Commons 托管实时神经搜索模型,管理远程 AI 服务器的连接器,并提供批量推理和作业管理的 API。

  • 人工智能服务:OpenSearch ML Commons 与 Amazon Bedrock 和 Amazon SageMaker 等人工智能服务进行交互,对数据执行批量推理,从而生成预测或见解。结果将异步保存到单独的 S3 文件中。

管道 3(批量摄取):

  • S3(来源):批处理作业的结果存储在 S3 中,这是此管道的来源。

  • 数据转换处理器:在摄取批量推理输出之前,对其进行进一步的处理和转换。这确保在 OpenSearch 索引中正确映射数据。

  • OpenSearch 索引(接收器):在 OpenSearch 中对处理后的结果进行索引,用于存储、搜索和进一步分析。

注意

*管道 1 所述流程为可选流程。如果您愿意,可跳过该过程,直接将准备好的数据上传到 S3 接收器,即可创建批处理作业。

关于 ml_inference 处理器

OpenSearch Ingestion 通过在 S3 Scan 来源与机器学习推理处理器之间建立专用集成,实现批量处理功能。S3 Scan 采用纯元数据模式运行,无需读取实际文件内容即可高效收集 S3 文件信息。ml_inference 处理器使用 S3 文件 URL 与 ML Commons 协同进行批量处理。此设计通过最大限度地减少扫描阶段不必要的数据传输,优化批量推理工作流程。您可以使用参数定义 ml_inference 处理器。示例如下:

processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://Amazontest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AmazonTestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # Amazon configuration settings aws: # Amazon Web Services 区域 where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml # Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"

使用 ml_inference 处理器提高摄取性能

OpenSearch Ingestion ml_inference 处理器显著提升基于机器学习搜索的数据摄取性能。对于需要机器学习模型生成数据的使用案例,包括语义搜索、多模态搜索、文档增强和查询理解,该处理器是理想之选。在语义搜索中,处理器能够将大规模、高维向量的创建和摄取速度提升一个数量级。

与实时模型调用相比,处理器的离线批量推断功能具有显著优势。实时处理需要依赖具有容量限制的在线模型服务器,而批量推理则能根据需求动态扩展计算资源,并以并行方式处理数据。例如,当 OpenSearch Ingestion 管道收到十亿个源数据请求时,会为机器学习批量推理输入创建 100 个 S3 文件。ml_inference 处理器随后使用 100 个 ml.m4.xlarge Amazon Elastic Compute Cloud(Amazon EC2)实例启动 SageMaker 批处理作业,在 14 小时内完成十亿次请求的向量化处理,这在实时模式下几乎不可能完成。

配置 ml_inference 处理器以接收语义搜索的数据请求

以下步骤将引导您完成设置和配置 OpenSearch Ingestion ml_inference 处理器的过程,以使用文本嵌入模型提取十亿个语义搜索的数据请求。

步骤 1:在 OpenSearch 中创建连接器并注册模型

在以下步骤中,使用 ML Commons batch_inference_sagemaker_connector_blueprint,在 Amazon SageMaker 中创建连接器和模型。如果您更喜欢使用 OpenSearch Amazon CloudFormation 集成模板,请参阅本节后续内容 (替代流程)步骤 1:使用 Amazon CloudFormation 集成模板创建连接器和模型

在 OpenSearch 中创建连接器并注册模型
  1. 在 SageMaker 中创建 Deep Java 库(DJL)机器学习模型,以实现批量转换。要查看其他 DJL 模型,请参阅 GitHub 上的 semantic_search_with_CFN_template_for_Sagemaker

    POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } }
  2. 创建连接器,将 batch_predict 作为 actions 字段中的新 action 类型:

    POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] }
  3. 使用返回的连接器 ID 注册 SageMaker 模型:

    POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" }
  4. 使用 batch_predict 操作类型调用模型:

    POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }

    响应包含批处理作业的任务 ID:

    { "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" }
  5. 使用任务 ID 调用 Get Task API,以检查批处理作业状态:

    GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567

    响应包含任务状态:

    { "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }

(替代流程)步骤 1:使用 Amazon CloudFormation 集成模板创建连接器和模型

如果您愿意,可使用 Amazon CloudFormation 自动创建所有所需的 Amazon SageMaker 连接器和模型,以实现机器学习推理。此方法通过使用 Amazon OpenSearch Service 控制台中提供的预配置模板简化设置。有关更多信息,请参阅 使用 Amazon CloudFormation 设置用于语义搜索的远程推理

部署创建所有所需 SageMaker 连接器和模型的 Amazon CloudFormation 堆栈
  1. 打开 Amazon OpenSearch Service 控制台。

  2. 在导航窗格中,选择集成

  3. 在“搜索”字段中,输入 SageMaker,然后选择通过 Amazon SageMaker 与文本嵌入模型集成

  4. 选择配置域,然后选择配置 VPC 域配置公共域

  5. 在模板字段中输入信息。对于启用离线批量推理,选择 true,以配置用于离线批量处理的资源。

  6. 选择创建以创建 Amazon CloudFormation 堆栈。

  7. 创建堆栈后,在 Amazon CloudFormation 控制台中打开输出选项卡,查找 connector_idmodel_id。在后续配置管道时,您将需要这些值。

步骤 2:为机器学习离线批量推理创建 OpenSearch Ingestion 管道

使用以下示例,为机器学习离线批量推理创建 OpenSearch Ingestion 管道。有关为 OpenSearch Ingestion 创建管道的更多信息,请参阅 创建 Amazon OpenSearch Ingestion 管道

开始前的准备工作

在以下示例中,您可以为 sts_role_arn 参数指定 IAM 角色 ARN。使用以下步骤,验证此角色是否已映射到可访问 OpenSearch 中 ml-commons 的后端角色。

  1. 导航到 OpenSearch Service 域的 OpenSearch 控制面板插件。您可以在 OpenSearch Service 控制台的域控制面板中找到控制面板端点。

  2. 从主菜单中选择安全角色,然后选择 ml_full_access 角色。

  3. 选择映射的用户管理映射

  4. 后端角色下,输入需要权限才能调用域的 Lambda 角色的 ARN。例如:arn:aws:iam::111,122,223,333:role/lambda-role

  5. 选择映射并确认在映射的用户下显示的用户或角色。

为机器学习离线批量推理创建 OpenSearch Ingestion 管道的示例

version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name: name data_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name: name data_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region: name default_bucket_owner: account_ID codec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml - copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null

步骤 3:准备数据以供摄取

要为机器学习离线批量推理处理准备数据,您可以使用自有工具或流程自行准备数据,也可以使用 OpenSearch Data Prepper。确认数据已按正确格式组织,具体可通过两种方式实现:使用管道从数据来源获取数据,或创建机器学习数据集。

以下示例使用 MS MARCO 数据集,该数据集包含一组用于自然语言处理任务的真实用户查询。该数据集采用 JSONL 格式,其中每行代表向机器学习嵌入模型发送的一个请求:

{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...

要使用 MS MARCO 数据集进行测试,请设想这样的场景:构建 10 亿个输入请求,这些请求分布在 100 个文件中,每个文件包含 1000 万个请求。这些文件存储在 Amazon S3 中,前缀为 s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/。OpenSearch Ingestion 管道将同时扫描这 100 个文件,并启动配备 100 个工作节点的 SageMaker 批处理作业进行并行处理,从而实现高效的向量化处理,并将十亿份文档摄取到 OpenSearch 中。

在生产环境中,您可以使用 OpenSearch Ingestion 管道生成 S3 文件,用于批量推理输入。该管道支持多种数据来源,并按计划运行,持续将源数据转换为 S3 文件。随后,人工智能服务器会通过计划的离线批处理作业自动处理这些文件,从而确保持续的数据处理和摄取。

步骤 4:监控批量推理作业

您可以使用 SageMaker 控制台或 Amazon CLI 监控批量推理作业。您还可以使用 Get Task API 监控批量作业:

GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }

该 API 会返回活跃批处理作业任务的列表:

{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }

监控批量推理作业并确认其完成后,您可以运行多种类型的 AI 搜索,包括语义搜索、混合搜索、对话式搜索(使用 RAG)、神经稀疏搜索和多模态搜索。有关 OpenSearch Service 所支持 AI 搜索的更多信息,请参阅 AI 搜索

要搜索原始向量,请使用 knn 查询类型,提供 vector 数组作为输入,并指定返回的结果数量 k

GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }

要运行 AI 驱动的搜索,请使用 neural 查询类型。指定 query_text 输入、OpenSearch Ingestion 管道中配置的嵌入模型的 model_id 以及返回的结果数量 k。要将嵌入字段排除在搜索结果之外,请在 _source.excludes 参数中指定嵌入字段的名称:

GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }