将数据摄取到 Amazon OpenSearch 无服务器集合中 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据摄取到 Amazon OpenSearch 无服务器集合中

这些章节详细介绍了支持将数据摄取到 Amazon OpenSearch 无服务器集合中的摄取管线。它们还介绍了一些您可用于与 OpenSearch API 操作进行交互的客户端。您的客户端应该与 OpenSearch 2.x 兼容,以便与 OpenSearch 无服务器集成。

所需的最低权限

为将数据摄取到 OpenSearch 无服务器集合中,必须在数据访问策略中为写入数据的主体分配以下最低权限:

[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/target-collection/logs" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012:user/my-user" ] } ]

如果您计划写入到其他索引,则权限可以更广。例如,您可以允许针对所有索引 (index/target-collection/*) 或索引子集 (index/target-collection/logs*) 的权限,而不是指定单个目标索引。

有关所有可用 OpenSearch API 操作及其相关权限的参考,请参阅Amazon OpenSearch 无服务器中受支持的操作和插件

OpenSearch Ingestion

您可以使用 Amazon OpenSearch Ingestion,而不是使用第三方客户端将数据直接发送到 OpenSearch 无服务器集合。您可以配置数据生成器以将数据发送到 OpenSearch Ingestion,后者自动将数据传输到您指定的集合。您还可以配置 OpenSearch Ingestion 以在传输之前转换数据。有关更多信息,请参阅 Amazon OpenSearch Ingestion

OpenSearch Ingestion 管道需要权限才能写入配置为其接收器的 OpenSearch 无服务器集合。这些权限包括能够描述集合以及向其发送 HTTP 请求。有关使用 OpenSearch Ingestion 向集合添加数据的说明,请参阅向 Amazon OpenSearch Ingestion 管道授予访问集合的权限

要开始使用 OpenSearch Ingestion,请参阅 教程:使用 Amazon OpenSearch Ingestion 将数据摄取到集合

Fluent Bit

您可以使用适用于 Fluent Bit 映像的 AmazonOpenSearch 输出插件将数据摄取到 OpenSearch 无服务器集合中。

注意

您必须有 2.30.0 或更高版本的适用于 Fluent Bit 映像的 Amazon 才能与 OpenSearch 无服务器集成。

示例配置

配置文件的此示例输出部分显示了如何使用 OpenSearch 无服务器集合作为目标。添加 AWS_Service_Name 参数(即 aoss)十分重要。Host 是集合端点。

[OUTPUT] Name opensearch Match * Host collection-endpoint.us-west-2.aoss.amazonaws.com Port 443 Index my_index Trace_Error On Trace_Output On AWS_Auth On AWS_Region <region> AWS_Service_Name aoss tls On Suppress_Type_Name On

Amazon Data Firehose

Firehose 支持将 OpenSearch 无服务器作为传输目标。有关将数据发送到 OpenSearch 无服务器中的说明,请参阅《Amazon Data Firehose 开发人员指南》中的创建 Kinesis Data Firehose 传输流选择 OpenSearch 无服务器作为您的目标

您提供给 Firehose 用于传输的 IAM 角色,必须在具有目标集合的 aoss:WriteDocument 最低权限的数据访问策略中指定,并且您必须具有预先存在的索引以向其发送数据。有关更多信息,请参阅 所需的最低权限

在将数据发送到 OpenSearch 无服务器之前,您可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的 Amazon Kinesis Data Firehose 数据转换

Go

以下示例代码使用适用于 Go 的 opensearch-go 客户端与指定的 OpenSearch 无服务器集合建立安全连接,并创建单个索引。必须提供 regionhost 的值。

package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }

Java

以下示例代码使用适用于 Java 的 opensearch-java 客户端与指定的 OpenSearch 无服务器集合建立安全连接,并创建单个索引。必须提供 regionhost 的值。

与 OpenSearch Service 相比,重要区别在于服务名称(aoss 而不是 es)。

// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();

以下示例代码会重新建立安全连接,然后搜索索引。

import org.opensearch.client.opensearch.OpenSearchClient; >>>>>>> aoss-slr-update SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();

JavaScript

以下示例代码使用适用于 JavaScript 的 opensearch-js 客户端与指定的 OpenSearch 无服务器集合建立安全连接、创建单个索引、添加文档,并搜索索引。必须提供 noderegion 的值。

与 OpenSearch Service 相比,重要区别在于服务名称(aoss 而不是 es)。

Version 3

此示例使用适用于 JavaScript in Node.js 的 SDK 版本 3

const { defaultProvider } = require('@aws-sdk/credential-provider-node'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => { const credentialsProvider = defaultProvider(); return credentialsProvider(); }, }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();
Version 2

此示例使用适用于 JavaScript in Node.js 的 SDK 版本 2

const AWS = require('aws-sdk'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => new Promise((resolve, reject) => { AWS.config.getCredentials((err, credentials) => { if (err) { reject(err); } else { resolve(credentials); } }); }), }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();

Logstash

您可以使用 Logstash OpenSearch 插件,将日志发布到 OpenSearch 无服务器集合。

要使用 Logstash 将数据发送到 OpenSearch 无服务器
  1. 使用 Docker 或 Linux,安装 logstash-output-opensearch 插件的 2.0.0 或更高版本

    Docker

    Docker 将托管已预安装 OpenSearch 输出插件的 Logstash OSS 软件:opensearchproject/logstash-oss-with-opensearch-output-plugin。您可以像任何其他映像一样拉取该映像:

    docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
    Linux

    首先,请安装最新版本的 Logstash(如果您尚未这样做)。然后,安装版本 2.0.0 的输出插件:

    cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch

    如果已安装该插件,请将其更新到最新版本:

    bin/logstash-plugin update logstash-output-opensearch

    从该插件的版本 2.0.0 开始,Amazon SDK 使用版本 3。如果您使用的是 8.4.0 之前的 Logstash 版本,则必须移除所有预安装的 Amazon 插件,然后安装 logstash-integration-aws 插件:

    /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
  2. 为使 OpenSearch 输出插件能与 OpenSearch 无服务器配合使用,您必须对 logstash.conf 的 opensearch 输出部分进行以下修改:

    • auth_type 下,将 aoss 指定为 service_name

    • hosts 指定您的集合端点。

    • 添加参数 default_server_major_versionlegacy_template。这些参数是该插件与 OpenSearch 无服务器配合使用所必需的。

    output { opensearch { hosts => "collection-endpoint:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }

    此示例配置文件从 S3 桶中的文件获取其输入,然后将它们发送到 OpenSearch 无服务器集合:

    input { s3 { bucket => "my-s3-bucket" region => "us-east-1" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443" index => my-index auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key' aws_secret_access_key => 'your-secret-key' region => 'us-east-1' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
  3. 然后,使用新配置运行 Logstash,以测试该插件:

    bin/logstash -f config/test-plugin.conf

Python

以下示例代码使用适用于 Python 的 opensearch-py 客户端与指定的 OpenSearch 无服务器集合建立安全连接,并创建单个索引。必须提供 regionhost 的值。

与 OpenSearch Service 相比,重要区别在于服务名称(aoss 而不是 es)。

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)

Ruby

opensearch-aws-sigv4 Gem 提供对 OpenSearch 无服务器以及 OpenSearch Service 的即时访问权限。它具有 opensearch-ruby 客户端的所有功能,因为它是这款 Gem 的依赖项。

在实例化 Sigv4 签名程序时,指定 aoss 为服务名称:

require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)

与其他客户端签署 HTTP 请求

如果与其他客户端构建 HTTP 请求,在对 OpenSearch 无服务器集合签署请求时,以下要求适用。

  • 必须将服务名称指定为 aoss

  • 所有 Amazon 签名版本 4 请求都需要 x-amz-content-sha256 标头。它将提供请求负载的哈希。如果有请求负载,请将该值设置为其安全哈希算法(SHA)加密哈希(SHA256)。如果没有请求负载,请将该值设置为 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855,它是空字符串的哈希。

使用 cURL 创建索引

以下示例请求使用客户端 URL 请求库(cURL)将单个文档发送到集合中名为 movies-index 的索引:

curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:us-east-1:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'

使用 Postman 创建索引

下图演示了如何使用 Postman 将请求发送到集合。有关身份验证的说明,请参阅 Authenticate with Amazon Signature authentication workflow in Postman

JSON response showing creation of a "movies-index" with successful result and no shards.