本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用带机器学习离线批量推理的 OpenSearch 摄取管道
Amazon OpenSearch Ingestion (OSI) 管道支持机器学习 (ML) 离线批量推理处理,可以低成本高效地丰富大量数据。只要有可以异步处理的大型数据集,就可以使用离线批量推理。离线批量推理适用于 Amazon Bedrock 和模型。 SageMaker 此功能适用于所有支持 S OpenSearch ervice 2. Amazon Web Services 区域 17+ 域名的 OpenSearch Ingestion。
注意
要进行实时推理处理,请使用适用于第三方平台的 Amazon OpenSearch 服务 ML 连接器。
离线批量推理处理利用了 OpenSearch 名为 ML Commons 的功能。机器学习共享资源通过传输和 REST API 调用提供机器学习算法。这些调用为每个 ML 请求选择正确的节点和资源,并监控 ML 任务以确保正常运行时间。通过这种方式,机器学习共享资源允许您利用现有的开源机器学习算法,减少开发新的机器学习功能所需的工作量。有关 ML Commons 的更多信息,请参阅 OpenSearch .org 文档中的机器学习
工作原理
通过向管道添加机器学习推理处理器,您可以在 In OpenSearch gestion 上创建离线批量推理
OpenSearch Ingestion 使用带有 ML Commons 的ml_inference处理器来创建离线批量推理作业。然后,机器学习共享资源使用 batch_predic
管道组件的工作原理如下:
管道 1(数据准备和转换)*:
-
来源:数据是从 OpenSearch Ingestion 支持的外部来源扫描的。
-
数据处理器:处理原始数据并将其转换为正确的格式,以便在集成的 AI 服务上进行批量推理。
-
S3(Sink):处理过的数据暂存在 Amazon S3 存储桶中,准备用作在集成 AI 服务上运行批量推理任务的输入。
管道 2(触发 ML batch_inference):
-
来源:自动检测由管道 1 的输出创建的新文件的 S3 事件。
-
ml_inference 处理器:通过异步批处理作业生成 ML 推断的处理器。它通过在目标域上运行的已配置的 AI 连接器连接到 AI 服务。
-
任务 ID:每个批处理作业都与 ml-commons 中的任务 ID 相关联,用于跟踪和管理。
-
OpenSearch ML Commons:ML Commons 托管实时神经搜索模型,管理与远程 AI 服务器的连接器, APIs 并为批量推理和作业管理提供服务。
-
人工智能服务: OpenSearch 机器学习共享资源与 Amazon Bedrock 和 Amazon 等人工智能服务 SageMaker进行交互,对数据进行批量推断,从而生成预测或见解。结果将异步保存到单独的 S3 文件中。
管道 3(批量摄取):
-
S3(来源):批处理作业的结果存储在 S3 中,这是该管道的来源。
-
数据转换处理器:在摄取之前,对批量推理输出进行进一步的处理和转换。这样可以确保数据在 OpenSearch 索引中正确映射。
-
OpenSearch index(Sink):将处理后的结果编入索引, OpenSearch 以便存储、搜索和进一步分析。
注意
*管道 1 描述的过程是可选的。如果您愿意,可以跳过该过程,只需将准备好的数据上传到 S3 接收器中即可创建批处理作业。
关于 ml_inference 处理器
OpenSearch Ingestion 使用 S3 扫描源和 ML 推理处理器之间的专门集成进行批处理。S3 扫描在仅限元数据模式下运行,无需读取实际文件内容即可高效收集 S3 文件信息。ml_inference处理器使用 S3 文件与 ML Commons 协调 URLs 以进行批处理。这种设计通过最大限度地减少扫描阶段不必要的数据传输来优化批量推理工作流程。您可以使用参数定义ml_inference处理器。示例如下:
processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://Amazon test-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "Amazon TestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # Amazon configuration settings aws: # Amazon Web Services 区域 where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml# Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"
使用 ml_inference 处理器提高摄取性能
OpenSearch Ingestion ml_inference 处理器显著增强了支持 ML 的搜索的数据摄取性能。该处理器非常适合需要机器学习模型生成数据的用例,包括语义搜索、多模态搜索、文档丰富和查询理解。在语义搜索中,处理器可以将大体积、高维向量的创建和摄取速度提高一个数量级。
与实时模型调用相比,处理器的离线批量推断功能具有明显的优势。虽然实时处理需要容量有限的实时模型服务器,但批量推理可以根据需要动态扩展计算资源,并行处理数据。例如,当 OpenSearch 摄取管道收到十亿个源数据请求时,它会为机器学习批量推理输入创建 100 个 S3 文件。然后,ml_inference处理器使用 100 个 ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) 实例启动 SageMaker 批处理作业,在 14 小时内完成 10 亿个请求的矢量化——在实时模式下几乎不可能完成这项任务。
将 ml_inference 处理器配置为提取语义搜索的数据请求
以下过程将引导您完成设置和配置 OpenSearch Ingestion ml_inference 处理器的过程,以使用文本嵌入模型提取十亿个用于语义搜索的数据请求。
主题
步骤 1:创建连接器并在中注册模型 OpenSearch
在以下步骤中,使用 ML Commons batch_inference_sagemaker_connector_blueprint
要在中创建连接器并注册模型 OpenSearch
-
在中创建深度 Java 库 (DJL) 机器学习模型以 SageMaker 进行批量转换。要查看其他 DJL 模型,请参阅 semantic_search_with_cfn_ tem
plate_for_sageMaker,网址为: GitHub POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } } -
在
actions字段中使用batch_predict以下新action类型创建连接器:POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] } -
使用返回的连接器 ID 注册 SageMaker 模型:
POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" } -
使用以下
batch_predict操作类型调用模型:POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }响应包含批处理作业的任务 ID:
{ "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" } -
使用任务 ID 调用 Get Task API 来检查批处理作业状态:
GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567响应包含任务状态:
{ "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }
(替代程序)步骤 1:使用 Amazon CloudFormation 集成模板创建连接器和模型
如果您愿意,可以使用 Amazon CloudFormation 自动创建所有必需的 Amazon SageMaker 连接器和模型以进行机器学习推理。此方法通过使用 Amazon S OpenSearch ervice 控制台中提供的预配置模板来简化设置。有关更多信息,请参阅 用于 Amazon CloudFormation 为语义搜索设置远程推理。
部署可创建所有必需 SageMaker 连接器和模型的 Amazon CloudFormation 堆栈
-
打开亚马逊 OpenSearch 服务控制台。
-
在导航窗格中,选择集成。
-
在 “搜索” 字段中输入
SageMaker,然后选择 “通过 Amazon 与文本嵌入模型集成” SageMaker。 -
选择配置域,然后选择配置 VPC 域或配置公共域。
-
在模板字段中输入信息。在 “启用离线批量推理” 中,选择 true 以配置用于离线批处理的资源。
-
选择 “创建” 以创建 Amazon CloudFormation 堆栈。
-
创建堆栈后,在 Amazon CloudFormation 控制台中打开输出选项卡找到 c onnector_id 和 model_id。稍后在配置管道时,您将需要这些值。
步骤 2:为机器学习离线批量推理创建 OpenSearch 摄取管道
使用以下示例为机器学习离线批量推 OpenSearch 断创建采集管道。有关为 OpenSearch Ingestion 创建管道的更多信息,请参阅。创建 Amazon OpenSearch Ingestion 管道
开始之前
在以下示例中,您可以为参数指定 IAM 角色 ARN。sts_role_arn使用以下步骤验证此角色是否已映射到有权访问 ml-commons 的后端角色。 OpenSearch
-
导航到您的 OpenSearch 服务域的 OpenSearch 仪表板插件。您可以在 OpenSearch 服务控制台的域控制面板上找到仪表板终端节点。
-
从主菜单中选择安全、角色,然后选择 ml_full_access 角色。
-
选择映射的用户、管理映射。
-
在后端角色下,输入需要权限才能调用您的域名的 Lambda 角色的 ARN。这是一个例子:arn: aws: iam::: role/
111122223333lambda-role -
选择映射并确认在映射的用户下显示的用户或角色。
为机器学习离线批量推理创建 OpenSearch 摄取管道的示例
version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name:namedata_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name:namedata_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region:namedefault_bucket_owner:account_IDcodec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region:us-west-2bucket:batch-inference-dlqkey_path_prefix:bedrock-dlqsts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml- copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null
第 3 步:准备好数据以供摄取
要为机器学习离线批量推理处理准备数据,请使用自己的工具或流程自行准备数据,也可以使用 OpenSearch Data Prepper
以下示例使用 MS MARCO
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...
要使用 MS MARCO 数据集进行测试,请想象一下这样的场景:您构建 10 亿个输入请求,分布在 100 个文件中,每个文件包含 1000 万个请求。这些文件将存储在亚马逊 S3 中,前缀为 s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/。 OpenSearch Ingestion 管道将同时扫描这 100 个文件,并启动一个包含 100 个工作人员的 SageMaker 批处理作业进行并行处理,从而实现高效的矢量化处理,并将这十亿个文档摄入其中。 OpenSearch
在生产环境中,您可以使用 OpenSearch 摄取管道为批量推理输入生成 S3 文件。该管道支持各种数据源
步骤 4:监控批量推理作业
您可以使用 SageMaker 控制台或监控批量推理作业。 Amazon CLI您还可以使用 Get Task API 来监控批处理作业:
GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }
API 会返回活跃的批处理作业任务列表:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }
第 5 步:运行搜索
监控批量推理作业并确认其已完成后,您可以运行各种类型的 AI 搜索,包括语义搜索、混合搜索、对话式(使用 RAG)、神经稀疏搜索和多模态搜索。有关 OpenSearch 服务支持的 AI 搜索的更多信息,请参阅 AI 搜索
要搜索原始向量,请使用knn查询类型,提供vector数组作为输入,并指定返回结果的k数量:
GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }
要运行 AI 驱动的搜索,请使用neural查询类型。指定query_text输入、您在 model_id OpenSearch Ingestion 管道中配置的嵌入模型以及返回的结果k数。要从搜索结果中排除嵌入,请在_source.excludes参数中指定嵌入字段的名称:
GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }