Amazon Elasticsearch Service
开发人员指南 (API 版本 2015-01-01)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用 Curator 在 Amazon Elasticsearch Service 中轮换数据

本章具有使用 AWS Lambda 和 Curator 管理索引和快照的示例代码。Curator 提供许多筛选器,可以帮助您确定满足特定条件的索引和快照,如超过 60 天前创建的索引或无法完成的快照。

尽管 Curator 通常用作命令行界面 (CLI),它也具有 Python API,这意味着您可以在 Lambda 函数中使用它。

有关配置 Lambda 函数和创建部署程序包的信息,请参阅从 Amazon S3 将流数据加载到 Amazon ES。有关更多信息,请参阅或 AWS Lambda Developer Guide。本章仅包含示例代码、基本设置、触发器和权限。

示例代码

以下示例代码使用 Curator 和官方 Python Elasticsearch 客户端来删除名称中包含的时间戳指示数据已超过 30 天的任何索引。例如,如果索引名称为 my-logs-2014.03.02,则会删除该索引。即使您今天创建的索引,但因为此筛选器使用索引的名称来确定数据存在的时间,所以会删除索引。

该代码还包含其他一些已注释掉的常用筛选器示例,包括按创建日期确定存在时间的筛选器。AWS 适用于 Python 的 开发工具包 (Boto3) 和 requests-aws4auth 库可为针对 Amazon ES 的请求签名。

警告

本节中的两个代码示例都会删除数据 — 可能包括很多数据。在非关键域中修改和测试每个示例,直到您对其行为满意为止。

索引删除

import boto3 from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection import curator host = '' # For example, search-my-domain.region.es.amazonaws.com region = '' # For example, us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) # Lambda execution starts here. def lambda_handler(event, context): # Build the Elasticsearch client. es = Elasticsearch( hosts = [{'host': host, 'port': 443}], http_auth = awsauth, use_ssl = True, verify_certs = True, connection_class = RequestsHttpConnection ) # A test document. document = { "title": "Moneyball", "director": "Bennett Miller", "year": "2011" } # Index the test document so that we have an index that matches the timestring pattern. # You can delete this line and the test document if you already created some test indices. es.index(index="movies-2017.01.31", doc_type="movie", id="1", body=document) index_list = curator.IndexList(es) # Filters by age, anything with a time stamp older than 30 days in the index name. index_list.filter_by_age(source='name', direction='older', timestring='%Y.%m.%d', unit='days', unit_count=30) # Filters by naming prefix. # index_list.filter_by_regex(kind='prefix', value='my-logs-2017') # Filters by age, anything created more than one month ago. # index_list.filter_by_age(source='creation_date', direction='older', unit='months', unit_count=1) print("Found %s indices to delete" % len(index_list.indices)) # If our filtered list contains any indices, delete them. if index_list.indices: curator.DeleteIndices(index_list).do_action()

您必须更新 hostregion 的值。

下一个代码示例将删除超过两个星期的任何快照。它还将拍摄新快照。

快照删除

import boto3 from datetime import datetime from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection import logging import curator # Adding a logger isn't strictly required, but helps with understanding Curator's requests and debugging. logger = logging.getLogger('curator') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.INFO) host = '' # For example, search-my-domain.region.es.amazonaws.com region = '' # For example, us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) now = datetime.now() # Clunky, but this approach keeps colons out of the URL. date_string = '-'.join((str(now.year), str(now.month), str(now.day), str(now.hour), str(now.second))) snapshot_name = 'my-snapshot-prefix-' + date_string repository_name = 'my-repo' # Lambda execution starts here. def lambda_handler(event, context): # Build the Elasticsearch client. es = Elasticsearch( hosts = [{'host': host, 'port': 443}], http_auth = awsauth, use_ssl = True, verify_certs = True, connection_class = RequestsHttpConnection, timeout = 120 # Deleting snapshots can take a while, so keep the connection open for long enough to get a response. ) try: # Get all snapshots in the repository. snapshot_list = curator.SnapshotList(es, repository=repository_name) # Filter by age, any snapshot older than two weeks. # snapshot_list.filter_by_age(source='creation_date', direction='older', unit='weeks', unit_count=2) # Delete the old snapshots. curator.DeleteSnapshots(snapshot_list, retry_interval=30, retry_count=3).do_action() except (curator.exceptions.SnapshotInProgress, curator.exceptions.NoSnapshots, curator.exceptions.FailedExecution) as e: print(e) # Split into two try blocks. We still want to try and take a snapshot if deletion failed. try: # Get the list of indices. # You can filter this list if you didn't want to snapshot all indices. index_list = curator.IndexList(es) # Take a new snapshot. This operation can take a while, so we don't want to wait for it to complete. curator.Snapshot(index_list, repository=repository_name, name=snapshot_name, wait_for_completion=False).do_action() except (curator.exceptions.SnapshotInProgress, curator.exceptions.FailedExecution) as e: print(e)

必须更新 hostregionsnapshot_namerepository_name 的值。如果输出对于您而言过于详细,可将 logging.INFO 更改为 logging.WARN

由于拍摄和删除快照可能需要一段时间,此代码对连接和 Lambda 超时更为敏感 — 因此会产生额外的日志记录代码。在 Elasticsearch 客户端中可以看到,我们将超时设置为 120 秒。如果 DeleteSnapshots 函数需要更长时间获取来自 Amazon ES 域的响应,则可能需要增加此值。您还必须增加 Lambda 函数超时的默认值三秒。有关建议值,请参阅 基本设置

基本设置

建议对本章的代码示例使用以下设置。

示例代码 内存 超时
索引删除 128MB 10 秒
快照删除 128MB 3 minutes

触发

这些函数不是对某些事件 (例如文件上传至 Amazon S3) 做出反应,而是按计划发生。您可能希望以更高或更低的频率运行这些函数。

示例代码 服务 规则类型 示例表达式
索引删除 CloudWatch Events 计划表达式 rate(1 day)
快照删除 CloudWatch Events 计划表达式 rate(4 hours)

权限

本章中的两个 Lambda 函数都需要所有 Lambda 函数需要的基本日志记录权限,以及面向 Amazon ES 域的 HTTP 方法权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:us-west-1:123456789012:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-1:123456789012:log-group:/aws/lambda/your-lambda-function:*" ] }, { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpGet", "es:ESHttpPut", "es:ESHttpDelete" ], "Resource": "arn:aws:es:us-west-1:123456789012:domain/my-domain/*" } ] }