Upgrading a domain (SDK) - Amazon OpenSearch Service
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Upgrading a domain (SDK)

This sample uses the OpenSearchService low-level Python client from the Amazon SDK for Python (Boto) to check if a domain is eligible for upgrade to a specific version, upgrades it, and continuously checks the upgrade status.

import boto3 from botocore.config import Config import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. DOMAIN_NAME = '' # The name of the domain to upgrade TARGET_VERSION = '' # The version you want to upgrade the domain to. For example, OpenSearch_1.1 my_config = Config( # Optionally lets you specify a Region other than your default. region_name='us-east-1' ) client = boto3.client('opensearch', config=my_config) def check_versions(): """Determine whether domain is eligible for upgrade""" response = client.get_compatible_versions( DomainName=DOMAIN_NAME ) compatible_versions = response['CompatibleVersions'] for i in range(len(compatible_versions)): if TARGET_VERSION in compatible_versions[i]["TargetVersions"]: print('Domain is eligible for upgrade to ' + TARGET_VERSION) upgrade_domain() print(response) else: print('Domain not eligible for upgrade to ' + TARGET_VERSION) def upgrade_domain(): """Upgrades the domain""" response = client.upgrade_domain( DomainName=DOMAIN_NAME, TargetVersion=TARGET_VERSION ) print('Upgrading domain to ' + TARGET_VERSION + '...' + response) time.sleep(5) wait_for_upgrade() def wait_for_upgrade(): """Get the status of the upgrade""" response = client.get_upgrade_status( DomainName=DOMAIN_NAME ) if (response['UpgradeStep']) == 'UPGRADE' and (response['StepStatus']) == 'SUCCEEDED': print('Domain successfully upgraded to ' + TARGET_VERSION) elif (response['StepStatus']) == 'FAILED': print('Upgrade failed. Please try again.') elif (response['StepStatus']) == 'SUCCEEDED_WITH_ISSUES': print('Upgrade succeeded with issues') elif (response['StepStatus']) == 'IN_PROGRESS': time.sleep(30) wait_for_upgrade() def main(): check_versions() if __name__ == "__main__": main()