本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将数据摄取到 Amazon OpenSearch 无服务器集合中
这些章节详细介绍了支持将数据摄取到 Amazon OpenSearch 无服务器集合中的摄取管线。它们还介绍了一些您可用于与 OpenSearch API 操作进行交互的客户端。您的客户端应该与 OpenSearch 2.x 兼容,以便与 OpenSearch 无服务器集成。
主题
所需的最低权限
为将数据摄取到 OpenSearch 无服务器集合中,必须在数据访问策略中为写入数据的主体分配以下最低权限:
[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/
target-collection
/logs
" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012
:user/my-user
" ] } ]
如果您计划写入到其他索引,则权限可以更广。例如,您可以允许针对所有索引 (index/target-collection
/*) 或索引子集 (index/target-collection
/logs*
) 的权限,而不是指定单个目标索引。
有关所有可用 OpenSearch API 操作及其相关权限的参考,请参阅Amazon OpenSearch 无服务器中受支持的操作和插件。
OpenSearch Ingestion
您可以使用 Amazon OpenSearch Ingestion,而不是使用第三方客户端将数据直接发送到 OpenSearch 无服务器集合。您可以配置数据生成器以将数据发送到 OpenSearch Ingestion,后者自动将数据传输到您指定的集合。您还可以配置 OpenSearch Ingestion 以在传输之前转换数据。有关更多信息,请参阅 Amazon OpenSearch Ingestion。
OpenSearch Ingestion 管道需要权限才能写入配置为其接收器的 OpenSearch 无服务器集合。这些权限包括能够描述集合以及向其发送 HTTP 请求。有关使用 OpenSearch Ingestion 向集合添加数据的说明,请参阅向 Amazon OpenSearch Ingestion 管道授予访问集合的权限。
要开始使用 OpenSearch Ingestion,请参阅 教程:使用 Amazon OpenSearch Ingestion 将数据摄取到集合。
Fluent Bit
您可以使用适用于 Fluent Bit 映像的 Amazon
注意
您必须有 2.30.0 或更高版本的适用于 Fluent Bit 映像的 Amazon 才能与 OpenSearch 无服务器集成。
示例配置:
配置文件的此示例输出部分显示了如何使用 OpenSearch 无服务器集合作为目标。添加 AWS_Service_Name
参数(即 aoss
)十分重要。Host
是集合端点。
[OUTPUT] Name opensearch Match * Host
collection-endpoint
.us-west-2
.aoss.amazonaws.com Port 443 Indexmy_index
Trace_Error On Trace_Output On AWS_Auth On AWS_Region<region>
AWS_Service_Name aoss tls On Suppress_Type_Name On
Amazon Data Firehose
Firehose 支持将 OpenSearch 无服务器作为传输目标。有关将数据发送到 OpenSearch 无服务器中的说明,请参阅《Amazon Data Firehose 开发人员指南》中的创建 Kinesis Data Firehose 传输流和选择 OpenSearch 无服务器作为您的目标。
您提供给 Firehose 用于传输的 IAM 角色,必须在具有目标集合的 aoss:WriteDocument
最低权限的数据访问策略中指定,并且您必须具有预先存在的索引以向其发送数据。有关更多信息,请参阅 所需的最低权限。
在将数据发送到 OpenSearch 无服务器之前,您可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的 Amazon Kinesis Data Firehose 数据转换。
Go
以下示例代码使用适用于 Go 的 opensearch-goregion
和 host
的值。
package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }
Java
以下示例代码使用适用于 Java 的 opensearch-javaregion
和 host
的值。
与 OpenSearch Service 域相比,重要区别在于服务名称(aoss
而不是 es
)。
// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();
以下示例代码会重新建立安全连接,然后搜索索引。
import org.opensearch.client.opensearch.OpenSearchClient; >>>>>>> aoss-slr-update SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();
JavaScript
以下示例代码使用适用于 JavaScript 的 opensearch-jsnode
和 region
的值。
与 OpenSearch Service 域相比,重要区别在于服务名称(aoss
而不是 es
)。
Logstash
您可以使用 Logstash OpenSearch 插件
要使用 Logstash 将数据发送到 OpenSearch 无服务器
-
使用 Docker 或 Linux,安装 logstash-output-opensearch
插件的 2.0.0 或更高版本。 -
为使 OpenSearch 输出插件能与 OpenSearch 无服务器配合使用,您必须对 logstash.conf 的
opensearch
输出部分进行以下修改:-
在
auth_type
下,将aoss
指定为service_name
。 -
为
hosts
指定您的集合端点。 -
添加参数
default_server_major_version
和legacy_template
。这些参数是该插件与 OpenSearch 无服务器配合使用所必需的。
output { opensearch { hosts => "
collection-endpoint
:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }此示例配置文件从 S3 桶中的文件获取其输入,然后将它们发送到 OpenSearch 无服务器集合:
input { s3 { bucket => "
my-s3-bucket
" region => "us-east-1
" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com:443" index =>my-index
auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key
' aws_secret_access_key => 'your-secret-key
' region => 'us-east-1
' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } } -
-
然后,使用新配置运行 Logstash,以测试该插件:
bin/logstash -f config/
test-plugin
.conf
Python
以下示例代码使用适用于 Python 的 opensearch-pyregion
和 host
的值。
与 OpenSearch Service 域相比,重要区别在于服务名称(aoss
而不是 es
)。
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)
Ruby
opensearch-aws-sigv4
Gem 提供对 OpenSearch 无服务器以及 OpenSearch Service 的即时访问权限。它具有 opensearch-ruby
在实例化 Sigv4 签名程序时,指定 aoss
为服务名称:
require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)
与其他客户端签署 HTTP 请求
如果与其他客户端构建 HTTP 请求,在对 OpenSearch 无服务器集合签署请求时,以下要求适用。
-
必须将服务名称指定为
aoss
。 -
所有 Amazon 签名版本 4 请求都需要
x-amz-content-sha256
标头。它将提供请求负载的哈希。如果有请求负载,请将该值设置为其安全哈希算法(SHA)加密哈希(SHA256)。如果没有请求负载,请将该值设置为e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
,它是空字符串的哈希。
使用 cURL 创建索引
以下示例请求使用客户端 URL 请求库(cURL)将单个文档发送到集合中名为 movies-index
的索引:
curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:
us-east-1
:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com/movies-index
/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'
使用 Postman 创建索引
下图演示了如何使用 Postman 将请求发送到集合。有关身份验证的说明,请参阅 Authenticate with Amazon Signature authentication workflow in Postman