Build and manage a resilient service using an Amazon SDK
The following code example shows how to create a load-balanced web service that returns book, movie, and song recommendations. The example shows how the service responds to failures, and how to restructure the service for more resilience when failures occur.
Use an Amazon EC2 Auto Scaling group to create Amazon Elastic Compute Cloud (Amazon EC2) instances based on a launch template and to keep the number of instances in a specified range.
Handle and distribute HTTP requests with Elastic Load Balancing.
Monitor the health of instances in an Auto Scaling group and forward requests only to healthy instances.
Run a Python web server on each EC2 instance to handle HTTP requests. The web server responds with recommendations and health checks.
Simulate a recommendation service with an Amazon DynamoDB table.
Control web server response to requests and health checks by updating Amazon Systems Manager parameters.
- Python
-
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository
. Run the interactive scenario at a command prompt.
class Runner: def __init__(self, resource_path, recommendation, autoscaler, loadbalancer, param_helper): self.resource_path = resource_path self.recommendation = recommendation self.autoscaler = autoscaler self.loadbalancer = loadbalancer self.param_helper = param_helper self.protocol = 'HTTP' self.port = 80 def deploy(self): recommendations_path = f'{self.resource_path}/recommendations.json' startup_script = f'{self.resource_path}/server_startup_script.sh' instance_policy = f'{self.resource_path}/instance_policy.json' print("\nFor this demo, we'll use the AWS SDK for Python (Boto3) to create several AWS resources\n" "to set up a load-balanced web service endpoint and explore some ways to make it resilient\n" "against various kinds of failures.\n\n" "Some of the resources create by this demo are:\n") print("\t* A DynamoDB table that the web service depends on to provide book, movie, and song recommendations.") print("\t* An EC2 launch template that defines EC2 instances that each contain a Python web server.") print("\t* An EC2 Auto Scaling group that manages EC2 instances across several Availability Zones.") print("\t* An Elastic Load Balancing (ELB) load balancer that targets the Auto Scaling group to distribute requests.") print('-'*88) q.ask("Press Enter when you're ready to start deploying resources.") print(f"Creating and populating a DynamoDB table named '{self.recommendation.table_name}'.") self.recommendation.create() self.recommendation.populate(recommendations_path) print('-'*88) print(f"Creating an EC2 launch template that runs '{startup_script}' when an instance starts.\n" f"This script starts a Python web server defined in the `server.py` script. The web server\n" f"listens to HTTP requests on port 80 and responds to requests to '/' and to '/healthcheck'.\n" f"For demo purposes, this server is run as the root user. In production, the best practice is to\n" f"run a web server, such as Apache, with least-privileged credentials.\n") print(f"The template also defines an IAM policy that each instance uses to assume a role that grants\n" f"permissions to access the DynamoDB recommendation table and Systems Manager parameters\n" f"that control the flow of the demo.\n") self.autoscaler.create_template(startup_script, instance_policy) print('-'*88) print(f"Creating an EC2 Auto Scaling group that maintains three EC2 instances, each in a different\n" f"Availability Zone.") zones = self.autoscaler.create_group(3) print('-'*88) print("At this point, you have EC2 instances created. Once each instance starts, it listens for\n" "HTTP requests. You can see these instances in the console or continue with the demo.") print('-'*88) q.ask("Press Enter when you're ready to continue.") print("\nCreating an Elastic Load Balancing target group and load balancer. The target group\n" "defines how the load balancer connects to instances. The load balancer provides a\n" "single endpoint where clients connect and dispatches requests to instances in the group.\n") vpc = self.autoscaler.get_default_vpc() subnets = self.autoscaler.get_subnets(vpc['VpcId'], zones) target_group = self.loadbalancer.create_target_group(self.protocol, self.port, vpc['VpcId']) self.loadbalancer.create_load_balancer([subnet['SubnetId'] for subnet in subnets], target_group) self.autoscaler.attach_load_balancer_target_group(target_group) print(f"Verifying access to the load balancer endpoint...") lb_success = self.loadbalancer.verify_load_balancer_endpoint() if not lb_success: print("Couldn't connect to the load balancer, verifying that the port is open...") current_ip_address = requests.get('http://checkip.amazonaws.com').text.strip() sec_group, port_is_open = self.autoscaler.verify_inbound_port(vpc, self.port, current_ip_address) if not port_is_open: print("For this example to work, the default security group for your default VPC must\n" "allows access from this computer. You can either add it automatically from this\n" "example or add it yourself using the AWS Management Console.\n") if q.ask(f"Do you want to add a rule to security group {sec_group['GroupId']} to allow\n" f"inbound traffic on port {self.port} from your computer's IP address of {current_ip_address}? (y/n) ", q.is_yesno): self.autoscaler.open_inbound_port(sec_group['GroupId'], self.port, current_ip_address) lb_success = self.loadbalancer.verify_load_balancer_endpoint() if lb_success: print("Your load balancer is ready. You can access it by browsing to:\n") print(f"\thttp://{self.loadbalancer.endpoint()}\n") else: print("Couldn't get a successful response from the load balancer endpoint. Troubleshoot by\n" "manually verifying that your VPC and security group are configured correctly and that\n" "you can successfully make a GET request to the load balancer endpoint:\n") print(f"\thttp://{self.loadbalancer.endpoint()}\n") print('-'*88) q.ask("Press Enter when you're ready to continue with the demo.") def demo_choices(self): actions = [ 'Send a GET request to the load balancer endpoint.', 'Check the health of load balancer targets.', 'Go to the next part of the demo.'] choice = 0 while choice != 2: print('-'*88) print("\nSee the current state of the service by selecting one of the following choices:\n") choice = q.choose("\nWhich action would you like to take? ", actions) print('-'*88) if choice == 0: print("Request:\n") print(f"GET http://{self.loadbalancer.endpoint()}") response = requests.get(f'http://{self.loadbalancer.endpoint()}') print("\nResponse:\n") print(f"{response.status_code}") if response.headers.get('content-type') == 'application/json': pp(response.json()) elif choice == 1: print("\nChecking the health of load balancer targets:\n") health = self.loadbalancer.check_target_health() for target in health: state = target['TargetHealth']['State'] print(f"\tTarget {target['Target']['Id']} on port {target['Target']['Port']} is {state}") if state != 'healthy': print(f"\t\t{target['TargetHealth']['Reason']}: {target['TargetHealth']['Description']}\n") print(f"\nNote that it can take a minute or two for the health check to update\n" f"after changes are made.\n") elif choice == 2: print("\nOkay, let's move on.") print('-'*88) def demo(self): ssm_only_policy = f'{self.resource_path}/ssm_only_policy.json' print("\nResetting parameters to starting values for demo.\n") self.param_helper.reset() print("\nThis part of the demonstration shows how to toggle different parts of the system\n" "to create situations where the web service fails, and shows how using a resilient\n" "architecture can keep the web service running in spite of these failures.") print('-'*88) print("At the start, the load balancer endpoint returns recommendations and reports that all targets are healthy.") self.demo_choices() print(f"The web service running on the EC2 instances gets recommendations by querying a DynamoDB table.\n" f"The table name is contained in a Systems Manager parameter named '{self.param_helper.table}'.\n" f"To simulate a failure of the recommendation service, let's set this parameter to name a non-existent table.\n") self.param_helper.put(self.param_helper.table, 'this-is-not-a-table') print("\nNow, sending a GET request to the load balancer endpoint returns a failure code. But, the service reports as\n" "healthy to the load balancer because shallow health checks don't check for failure of the recommendation service.") self.demo_choices() print(f"Instead of failing when the recommendation service fails, the web service can return a static response.\n" f"While this is not a perfect solution, it presents the customer with a somewhat better experience than failure.\n") self.param_helper.put(self.param_helper.failure_response, 'static') print(f"\nNow, sending a GET request to the load balancer endpoint returns a static response.\n" f"The service still reports as healthy because health checks are still shallow.\n") self.demo_choices() print("Let's reinstate the recommendation service.\n") self.param_helper.put(self.param_helper.table, self.recommendation.table_name) print("\nLet's also substitute bad credentials for one of the instances in the target group so that it can't\n" "access the DynamoDB recommendation table.\n") self.autoscaler.create_instance_profile( ssm_only_policy, self.autoscaler.bad_creds_policy_name, self.autoscaler.bad_creds_role_name, self.autoscaler.bad_creds_profile_name, ['AmazonSSMManagedInstanceCore']) instances = self.autoscaler.get_instances() bad_instance_id = instances[0] instance_profile = self.autoscaler.get_instance_profile(bad_instance_id) print(f"\nReplacing the profile for instance {bad_instance_id} with a profile that contains\n" f"bad credentials...\n") self.autoscaler.replace_instance_profile( bad_instance_id, self.autoscaler.bad_creds_profile_name, instance_profile['AssociationId']) print("Now, sending a GET request to the load balancer endpoint returns either a recommendation or a static response,\n" "depending on which instance is selected by the load balancer.\n") self.demo_choices() print("\nLet's implement a deep health check. For this demo, a deep health check tests whether\n" "the web service can access the DynamoDB table that it depends on for recommendations. Note that\n" "the deep health check is only for ELB routing and not for Auto Scaling instance health.\n" "This kind of deep health check is not recommended for Auto Scaling instance health, because it\n" "risks accidental termination of all instances in the Auto Scaling group when a dependent service fails.\n") print("By implementing deep health checks, the load balancer can detect when one of the instances is failing\n" "and take that instance out of rotation.\n") self.param_helper.put(self.param_helper.health_check, 'deep') print(f"\nNow, checking target health indicates that the instance with bad credentials ({bad_instance_id})\n" f"is unhealthy. Note that it might take a minute or two for the load balancer to detect the unhealthy \n" f"instance. Sending a GET request to the load balancer endpoint always returns a recommendation, because\n" "the load balancer takes unhealthy instances out of its rotation.\n") self.demo_choices() print("\nBecause the instances in this demo are controlled by an auto scaler, the simplest way to fix an unhealthy\n" "instance is to terminate it and let the auto scaler start a new instance to replace it.\n") self.autoscaler.terminate_instance(bad_instance_id) print("\nEven while the instance is terminating and the new instance is starting, sending a GET\n" "request to the web service continues to get a successful recommendation response because\n" "the load balancer routes requests to the healthy instances. After the replacement instance\n" "starts and reports as healthy, it is included in the load balancing rotation.\n" "\nNote that terminating and replacing an instance typically takes several minutes, during which time you\n" "can see the changing health check status until the new instance is running and healthy.\n") self.demo_choices() print("\nIf the recommendation service fails now, deep health checks mean all instances report as unhealthy.\n") self.param_helper.put(self.param_helper.table, 'this-is-not-a-table') print("\nWhen all instances are unhealthy, the load balancer continues to route requests even to\n" "unhealthy instances, allowing them to fail open and return a static response rather than fail\n" "closed and report failure to the customer.") self.demo_choices() self.param_helper.reset() def destroy(self): print("This concludes the demo of how to build and manage a resilient service.\n" "To keep things tidy and to avoid unwanted charges on your account, we can clean up all AWS resources\n" "that were created for this demo.") if q.ask("Do you want to clean up all demo resources? (y/n) ", q.is_yesno): self.loadbalancer.delete_load_balancer() self.loadbalancer.delete_target_group() self.autoscaler.delete_group() self.autoscaler.delete_template() self.autoscaler.delete_instance_profile( self.autoscaler.bad_creds_profile_name, self.autoscaler.bad_creds_role_name) self.recommendation.destroy() else: print("Okay, we'll leave the resources intact.\n" "Don't forget to delete them when you're done with them or you might incur unexpected charges.") def main(): parser = argparse.ArgumentParser() parser.add_argument( '--action', required=True, choices=['all', 'deploy', 'demo', 'destroy'], help="The action to take for the demo. When 'all' is specified, resources are\n" "deployed, the demo is run, and resources are destroyed.") parser.add_argument( '--resource_path', default='../../../workflows/resilient_service/resources', help="The path to resource files used by this example, such as IAM policies and\n" "instance scripts.") args = parser.parse_args() print('-'*88) print("Welcome to the demonstration of How to Build and Manage a Resilient Service!") print('-'*88) prefix = 'doc-example-resilience' recommendation = RecommendationService.from_client('doc-example-recommendation-service') autoscaler = AutoScaler.from_client(prefix) loadbalancer = LoadBalancer.from_client(prefix) param_helper = ParameterHelper.from_client(recommendation.table_name) runner = Runner(args.resource_path, recommendation, autoscaler, loadbalancer, param_helper) actions = [args.action] if args.action != 'all' else ['deploy', 'demo', 'destroy'] for action in actions: if action == 'deploy': runner.deploy() elif action == 'demo': runner.demo() elif action == 'destroy': runner.destroy() print('-'*88) print("Thanks for watching!") print('-'*88) if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') main()
Create a class that wraps Auto Scaling and Amazon EC2 actions.
class AutoScaler: """ Encapsulates Amazon EC2 Auto Scaling and EC2 management actions. """ def __init__( self, resource_prefix, inst_type, ami_param, autoscaling_client, ec2_client, ssm_client, iam_client): """ :param resource_prefix: The prefix for naming AWS resources that are created by this class. :param inst_type: The type of EC2 instance to create, such as t3.micro. :param ami_param: The Systems Manager parameter used to look up the AMI that is created. :param autoscaling_client: A Boto3 EC2 Auto Scaling client. :param ec2_client: A Boto3 EC2 client. :param ssm_client: A Boto3 Systems Manager client. :param iam_client: A Boto3 IAM client. """ self.inst_type = inst_type self.ami_param = ami_param self.autoscaling_client = autoscaling_client self.ec2_client = ec2_client self.ssm_client = ssm_client self.iam_client = iam_client self.launch_template_name = f"{resource_prefix}-template" self.group_name = f"{resource_prefix}-group" self.instance_policy_name = f"{resource_prefix}-pol" self.instance_role_name = f"{resource_prefix}-role" self.instance_profile_name = f"{resource_prefix}-prof" self.bad_creds_policy_name = f"{resource_prefix}-bc-pol" self.bad_creds_role_name = f"{resource_prefix}-bc-role" self.bad_creds_profile_name = f"{resource_prefix}-bc-prof" @classmethod def from_client(cls, resource_prefix): """ Creates this class from Boto3 clients. :param resource_prefix: The prefix for naming AWS resources that are created by this class. """ as_client = boto3.client('autoscaling') ec2_client = boto3.client('ec2') ssm_client = boto3.client('ssm') iam_client = boto3.client('iam') return cls( resource_prefix, 't3.micro', '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2', as_client, ec2_client, ssm_client, iam_client) def create_instance_profile( self, policy_file, policy_name, role_name, profile_name, aws_managed_policies=()): """ Creates a policy, role, and profile that is associated with instances created by this class. An instance's associated profile defines a role that is assumed by the instance. The role has attached policies that specify the AWS permissions granted to clients that run on the instance. :param policy_file: The name of a JSON file that contains the policy definition to create and attach to the role. :param policy_name: The name to give the created policy. :param role_name: The name to give the created role. :param profile_name: The name to the created profile. :param aws_managed_policies: Additional AWS-managed policies that are attached to the role, such as AmazonSSMManagedInstanceCore to grant use of Systems Manager to send commands to the instance. :return: The ARN of the profile that is created. """ assume_role_doc = { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"}, "Action": "sts:AssumeRole"}]} with open(policy_file) as file: instance_policy_doc = file.read() policy_arn = None try: pol_response = self.iam_client.create_policy( PolicyName=policy_name, PolicyDocument=instance_policy_doc) policy_arn = pol_response['Policy']['Arn'] log.info("Created policy with ARN %s.", policy_arn) except ClientError as err: if err.response['Error']['Code'] == 'EntityAlreadyExists': log.info("Policy %s already exists, nothing to do.", policy_name) list_pol_response = self.iam_client.list_policies(Scope='Local') for pol in list_pol_response['Policies']: if pol['PolicyName'] == policy_name: policy_arn = pol['Arn'] break if policy_arn is None: raise AutoScalerError(f"Couldn't create policy {policy_name}: {err}") try: self.iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(assume_role_doc)) self.iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) for aws_policy in aws_managed_policies: self.iam_client.attach_role_policy( RoleName=role_name, PolicyArn=f'arn:aws:iam::aws:policy/{aws_policy}') log.info("Created role %s and attached policy %s.", role_name, policy_arn) except ClientError as err: if err.response['Error']['Code'] == 'EntityAlreadyExists': log.info("Role %s already exists, nothing to do.", role_name) else: raise AutoScalerError(f"Couldn't create role {role_name}: {err}") try: profile_response = self.iam_client.create_instance_profile( InstanceProfileName=profile_name) waiter = self.iam_client.get_waiter('instance_profile_exists') waiter.wait(InstanceProfileName=profile_name) time.sleep(10) # wait a little longer profile_arn = profile_response['InstanceProfile']['Arn'] self.iam_client.add_role_to_instance_profile( InstanceProfileName=profile_name, RoleName=role_name) log.info("Created profile %s and added role %s.", profile_name, role_name) except ClientError as err: if err.response['Error']['Code'] == 'EntityAlreadyExists': prof_response = self.iam_client.get_instance_profile( InstanceProfileName=profile_name) profile_arn = prof_response['InstanceProfile']['Arn'] log.info("Instance profile %s already exists, nothing to do.", profile_name) else: raise AutoScalerError( f"Couldn't create profile {profile_name} and attach it to role\n" f"{role_name}: {err}") return profile_arn def get_instance_profile(self, instance_id): """ Gets data about the profile associated with an instance. :param instance_id: The ID of the instance to look up. :return: The profile data. """ try: response = self.ec2_client.describe_iam_instance_profile_associations( Filters=[{'Name': 'instance-id', 'Values': [instance_id]}]) except ClientError as err: raise AutoScalerError( f"Couldn't get instance profile association for instance {instance_id}: {err}") else: return response['IamInstanceProfileAssociations'][0] def replace_instance_profile( self, instance_id, new_instance_profile_name, profile_association_id): """ Replaces the profile associated with a running instance. After the profile is replaced, the instance is rebooted to ensure that it uses the new profile. When the instance is ready, Systems Manager is used to restart the Python web server. :param instance_id: The ID of the instance to update. :param new_instance_profile_name: The name of the new profile to associate with the specified instance. :param profile_association_id: The ID of the existing profile association for the instance. """ try: self.ec2_client.replace_iam_instance_profile_association( IamInstanceProfile={'Name': new_instance_profile_name}, AssociationId=profile_association_id) log.info("Replaced instance profile for association %s with profile %s.", profile_association_id, new_instance_profile_name) time.sleep(5) inst_ready = False tries = 0 while not inst_ready: if tries % 6 == 0: self.ec2_client.reboot_instances(InstanceIds=[instance_id]) log.info("Rebooting instance %s and waiting for it to to be ready.", instance_id) tries += 1 time.sleep(10) response = self.ssm_client.describe_instance_information() for info in response['InstanceInformationList']: if info['InstanceId'] == instance_id: inst_ready = True self.ssm_client.send_command( InstanceIds=[instance_id], DocumentName='AWS-RunShellScript', Parameters={'commands': ['cd / && sudo python3 server.py 80 us-west-2']}) log.info("Restarted the Python web server on instance %s.", instance_id) except ClientError as err: raise AutoScalerError( f"Couldn't replace instance profile for association {profile_association_id}: {err}") def delete_instance_profile(self, profile_name, role_name): """ Detaches a role from an instance profile, detaches policies from the role, and deletes all the resources. :param profile_name: The name of the profile to delete. :param role_name: The name of the role to delete. """ try: self.iam_client.remove_role_from_instance_profile( InstanceProfileName=profile_name, RoleName=role_name) self.iam_client.delete_instance_profile(InstanceProfileName=profile_name) log.info("Deleted instance profile %s.", profile_name) attached_policies = self.iam_client.list_attached_role_policies( RoleName=role_name) for pol in attached_policies['AttachedPolicies']: self.iam_client.detach_role_policy( RoleName=role_name, PolicyArn=pol['PolicyArn']) if not pol['PolicyArn'].startswith('arn:aws:iam::aws'): self.iam_client.delete_policy(PolicyArn=pol['PolicyArn']) log.info("Detached and deleted policy %s.", pol['PolicyName']) self.iam_client.delete_role(RoleName=role_name) log.info("Deleted role %s.", role_name) except ClientError as err: if err.response['Error']['Code'] == 'NoSuchEntity': log.info("Instance profile %s doesn't exist, nothing to do.", profile_name) else: raise AutoScalerError( f"Couldn't delete instance profile {profile_name} or detach " f"policies and delete role {role_name}: {err}") def create_template(self, server_startup_script_file, instance_policy_file): """ Creates an Amazon EC2 launch template to use with Amazon EC2 Auto Scaling. The launch template specifies a Bash script in its user data field that runs after the instance is started. This script installs Python packages and starts a Python web server on the instance. :param server_startup_script_file: The path to a Bash script file that is run when an instance starts. :param instance_policy_file: The path to a file that defines a permissions policy to create and attach to the instance profile. :return: Information about the newly created template. """ template = {} try: self.create_instance_profile( instance_policy_file, self.instance_policy_name, self.instance_role_name, self.instance_profile_name) with open(server_startup_script_file) as file: start_server_script = file.read() ami_latest = self.ssm_client.get_parameter(Name=self.ami_param) ami_id = ami_latest['Parameter']['Value'] lt_response = self.ec2_client.create_launch_template( LaunchTemplateName=self.launch_template_name, LaunchTemplateData={ 'InstanceType': self.inst_type, 'ImageId': ami_id, 'IamInstanceProfile': {'Name': self.instance_profile_name}, 'UserData': base64.b64encode(start_server_script.encode(encoding='utf-8')).decode(encoding='utf-8')}) template = lt_response['LaunchTemplate'] log.info( "Created launch template %s for AMI %s on %s.", self.launch_template_name, ami_id, self.inst_type) except ClientError as err: if err.response['Error']['Code'] == 'InvalidLaunchTemplateName.AlreadyExistsException': log.info("Launch template %s already exists, nothing to do.", self.launch_template_name) else: raise AutoScalerError( f"Couldn't create launch template {self.launch_template_name}: {err}.") return template def delete_template(self): """ Deletes a launch template. """ try: self.ec2_client.delete_launch_template(LaunchTemplateName=self.launch_template_name) self.delete_instance_profile(self.instance_profile_name, self.instance_role_name) log.info("Launch template %s deleted.", self.launch_template_name) except ClientError as err: if err.response['Error']['Code'] == 'InvalidLaunchTemplateName.NotFoundException': log.info("Launch template %s does not exist, nothing to do.", self.launch_template_name) else: raise AutoScalerError( f"Couldn't delete launch template {self.launch_template_name}: {err}.") def get_availability_zones(self): """ Gets a list of Availability Zones in the AWS Region of the Amazon EC2 client. :return: The list of Availability Zones for the client Region. """ try: response = self.ec2_client.describe_availability_zones() zones = [zone['ZoneName'] for zone in response['AvailabilityZones']] except ClientError as err: raise AutoScalerError(f"Couldn't get availability zones: {err}.") else: return zones def create_group(self, group_size): """ Creates an EC2 Auto Scaling group with the specified size. :param group_size: The number of instances to set for the minimum and maximum in the group. :return: The list of Availability Zones specified for the group. """ zones = [] try: zones = self.get_availability_zones() self.autoscaling_client.create_auto_scaling_group( AutoScalingGroupName=self.group_name, AvailabilityZones=zones, LaunchTemplate={ 'LaunchTemplateName': self.launch_template_name, 'Version': '$Default'}, MinSize=group_size, MaxSize=group_size) log.info( "Created EC2 Auto Scaling group %s with availability zones %s.", self.launch_template_name, zones) except ClientError as err: if err.response['Error']['Code'] == 'AlreadyExists': log.info("EC2 Auto Scaling group %s already exists, nothing to do.", self.group_name) else: raise AutoScalerError( f"Couldn't create EC2 Auto Scaling group {self.group_name}: {err}") return zones def get_instances(self): """ Gets data about the instances in the EC2 Auto Scaling group. :return: Data about the instances. """ try: as_response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name]) instance_ids = [i['InstanceId'] for i in as_response['AutoScalingGroups'][0]['Instances']] except ClientError as err: raise AutoScalerError( f"Couldn't get instances for Auto Scaling group {self.group_name}: {err}") else: return instance_ids def terminate_instance(self, instance_id): """ Terminates and instances in an EC2 Auto Scaling group. After an instance is terminated, it can no longer be accessed. :param instance_id: The ID of the instance to terminate. """ try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=instance_id, ShouldDecrementDesiredCapacity=False) log.info("Terminated instance %s.", instance_id) except ClientError as err: raise AutoScalerError( f"Couldn't terminate instance {instance_id}: {err}") def attach_load_balancer_target_group(self, lb_target_group): """ Attaches an Elastic Load Balancing (ELB) target group to this EC2 Auto Scaling group. The target group specifies how the load balancer forward requests to the instances in the group. :param lb_target_group: Data about the ELB target group to attach. """ try: self.autoscaling_client.attach_load_balancer_target_groups( AutoScalingGroupName=self.group_name, TargetGroupARNs=[lb_target_group['TargetGroupArn']]) log.info( "Attached load balancer target group %s to auto scaling group %s.", lb_target_group['TargetGroupName'], self.group_name) except ClientError as err: raise AutoScalerError( f"Couldn't attach load balancer target group {lb_target_group['TargetGroupName']}\n" f"to auto scaling group {self.group_name}") def _try_terminate_instance(self, inst_id): stopping = False log.info(f"Stopping {inst_id}.") while not stopping: try: self.autoscaling_client.terminate_instance_in_auto_scaling_group( InstanceId=inst_id, ShouldDecrementDesiredCapacity=True) stopping = True except ClientError as err: if err.response['Error']['Code'] == 'ScalingActivityInProgress': log.info("Scaling activity in progress for %s. Waiting...", inst_id) time.sleep(10) else: raise AutoScalerError(f"Couldn't stop instance {inst_id}: {err}.") def _try_delete_group(self): """ Tries to delete the EC2 Auto Scaling group. If the group is in use or in progress, the function waits and retries until the group is successfully deleted. """ stopped = False while not stopped: try: self.autoscaling_client.delete_auto_scaling_group( AutoScalingGroupName=self.group_name) stopped = True log.info("Deleted EC2 Auto Scaling group %s.", self.group_name) except ClientError as err: if (err.response['Error']['Code'] == 'ResourceInUse' or err.response['Error']['Code'] == 'ScalingActivityInProgress'): log.info("Some instances are still running. Waiting for them to stop...") time.sleep(10) else: raise AutoScalerError(f"Couldn't delete group {self.group_name}: {err}.") def delete_group(self): """ Terminates all instances in the group, deletes the EC2 Auto Scaling group. """ try: response = self.autoscaling_client.describe_auto_scaling_groups( AutoScalingGroupNames=[self.group_name]) groups = response.get('AutoScalingGroups', []) if len(groups) > 0: self.autoscaling_client.update_auto_scaling_group( AutoScalingGroupName=self.group_name, MinSize=0) instance_ids = [inst['InstanceId'] for inst in groups[0]['Instances']] for inst_id in instance_ids: self._try_terminate_instance(inst_id) self._try_delete_group() else: log.info("No groups found named %s, nothing to do.", self.group_name) except ClientError as err: raise AutoScalerError( f"Couldn't delete group {self.group_name}: {err}.") def get_default_vpc(self): """ Gets the default VPC for the account. :return: Data about the default VPC. """ try: response = self.ec2_client.describe_vpcs(Filters=[{'Name': 'is-default', 'Values': ['true']}]) except ClientError as err: raise AutoScalerError(f"Couldn't get default VPC: {err}") else: return response['Vpcs'][0] def verify_inbound_port(self, vpc, port, ip_address): """ Verify the default security group of the specified VPC allows ingress from this computer. This can be done by allowing ingress from this computer's IP address. In some situations, such as connecting from a corporate network, you must instead specify a prefix list ID. You can also temporarily open the port to any IP address while running this example. If you do, be sure to remove public access when you're done. :param vpc: The VPC used by this example. :param port: The port to verify. :param ip_address: This computer's IP address. :return: The default security group of the specific VPC, and a value that indicates whether the specified port is open. """ try: response = self.ec2_client.describe_security_groups( Filters=[ {'Name': 'group-name', 'Values': ['default']}, {'Name': 'vpc-id', 'Values': [vpc['VpcId']]}]) sec_group = response['SecurityGroups'][0] port_is_open = False log.info("Found default security group %s.", sec_group['GroupId']) for ip_perm in sec_group['IpPermissions']: if ip_perm.get('FromPort', 0) == port: log.info("Found inbound rule: %s", ip_perm) for ip_range in ip_perm['IpRanges']: cidr = ip_range.get('CidrIp', '') if cidr.startswith(ip_address) or cidr == '0.0.0.0/0': port_is_open = True if ip_perm['PrefixListIds']: port_is_open = True if not port_is_open: log.info( "The inbound rule does not appear to be open to either this computer's IP\n" "address of %s, to all IP addresses (0.0.0.0/0), or to a prefix list ID.", ip_address) else: break except ClientError as err: raise AutoScalerError( f"Couldn't verify inbound rule for port {port} for VPC {vpc['VpcId']}: {err}") else: return sec_group, port_is_open def open_inbound_port(self, sec_group_id, port, ip_address): """ Add an ingress rule to the specified security group that allows access on the specified port from the specified IP address. :param sec_group_id: The ID of the security group to modify. :param port: The port to open. :param ip_address: The IP address that is granted access. """ try: self.ec2_client.authorize_security_group_ingress( GroupId=sec_group_id, CidrIp=f'{ip_address}/32', FromPort=port, ToPort=port, IpProtocol='tcp') log.info("Authorized ingress to %s on port %s from %s.", sec_group_id, port, ip_address) except ClientError as err: raise AutoScalerError( f"Couldn't authorize ingress to {sec_group_id} on port {port} from {ip_address}: {err}") def get_subnets(self, vpc_id, zones): """ Gets the default subnets in a VPC for a specified list of Availability Zones. :param vpc_id: The ID of the VPC to look up. :param zones: The list of Availability Zones to look up. :return: The list of subnets found. """ try: response = self.ec2_client.describe_subnets( Filters=[ {'Name': 'vpc-id', 'Values': [vpc_id]}, {'Name': 'availability-zone', 'Values': zones}, {'Name': 'default-for-az', 'Values': ['true']}]) subnets = response['Subnets'] log.info("Found %s subnets for the specified zones.", len(subnets)) except ClientError as err: raise AutoScalerError(f"Couldn't get subnets: {err}") else: return subnets
Create a class that wraps Elastic Load Balancing actions.
class LoadBalancer: """Encapsulates Elastic Load Balancing (ELB) actions.""" def __init__( self, target_group_name, load_balancer_name, elb_client): """ :param target_group_name: The name of the target group associated with the load balancer. :param load_balancer_name: The name of the load balancer. :param elb_client: A Boto3 Elastic Load Balancing client. """ self.target_group_name = target_group_name self.load_balancer_name = load_balancer_name self.elb_client = elb_client self._endpoint = None @classmethod def from_client(cls, resource_prefix): """ Creates this class from a Boto3 client. :param resource_prefix: The prefix to give to AWS resources created by this class. """ elb_client = boto3.client('elbv2') return cls( f"{resource_prefix}-tg", f"{resource_prefix}-lb", elb_client) def endpoint(self): """ Gets the HTTP endpoint of the load balancer. :return: The endpoint. """ if self._endpoint is None: try: response = self.elb_client.describe_load_balancers(Names=[self.load_balancer_name]) self._endpoint = response['LoadBalancers'][0]['DNSName'] except ClientError as err: raise LoadBalancerError( f"Couldn't get the endpoint for load balancer {self.load_balancer_name}: {err}") return self._endpoint def create_target_group(self, protocol, port, vpc_id): """ Creates an Elastic Load Balancing target group. The target group specifies how the load balancer forward requests to instances in the group and how instance health is checked. To speed up this demo, the health check is configured with shortened times and lower thresholds. In production, you might want to decrease the sensitivity of your health checks to avoid unwanted failures. :param protocol: The protocol to use to forward requests, such as 'HTTP'. :param port: The port to use to forward requests, such as 80. :param vpc_id: The ID of the VPC in which the load balancer exists. :return: Data about the newly created target group. """ try: response = self.elb_client.create_target_group( Name=self.target_group_name, Protocol=protocol, Port=port, HealthCheckPath='/healthcheck', HealthCheckIntervalSeconds=10, HealthCheckTimeoutSeconds=5, HealthyThresholdCount=2, UnhealthyThresholdCount=2, VpcId=vpc_id) target_group = response['TargetGroups'][0] log.info("Created load balancing target group %s.", self.target_group_name) except ClientError as err: raise LoadBalancerError( f"Couldn't create load balancing target group {self.target_group_name}: {err}") else: return target_group def delete_target_group(self): """ Deletes the target group. """ done = False while not done: try: response = self.elb_client.describe_target_groups(Names=[self.target_group_name]) tg_arn = response['TargetGroups'][0]['TargetGroupArn'] self.elb_client.delete_target_group(TargetGroupArn=tg_arn) log.info("Deleted load balancing target group %s.", self.target_group_name) done = True except ClientError as err: if err.response['Error']['Code'] == 'TargetGroupNotFound': log.info("Load balancer target group %s not found, nothing to do.", self.target_group_name) done = True elif err.response['Error']['Code'] == 'ResourceInUse': log.info("Target group not yet released from load balancer, waiting...") time.sleep(10) else: raise LoadBalancerError( f"Couldn't delete load balancing target group {self.target_group_name}: {err}") def create_load_balancer(self, subnet_ids, target_group): """ Creates an Elastic Load Balancing load balancer that uses the specified subnets and forwards requests to the specified target group. :param subnet_ids: A list of subnets to associate with the load balancer. :param target_group: An existing target group that is added as a listener to the load balancer. :return: Data about the newly created load balancer. """ try: response = self.elb_client.create_load_balancer( Name=self.load_balancer_name, Subnets=subnet_ids) load_balancer = response['LoadBalancers'][0] log.info("Created load balancer %s.", self.load_balancer_name) waiter = self.elb_client.get_waiter('load_balancer_available') log.info("Waiting for load balancer to be available...") waiter.wait(Names=[self.load_balancer_name]) log.info("Load balancer is available!") self.elb_client.create_listener( LoadBalancerArn=load_balancer['LoadBalancerArn'], Protocol=target_group['Protocol'], Port=target_group['Port'], DefaultActions=[{'Type': 'forward', 'TargetGroupArn': target_group['TargetGroupArn']}]) log.info( "Created listener to forward traffic from load balancer %s to target group %s.", self.load_balancer_name, target_group['TargetGroupName']) except ClientError as err: raise LoadBalancerError( f"Failed to create load balancer {self.load_balancer_name}" f"and add a listener for target group {target_group['TargetGroupName']}: {err}") else: self._endpoint = load_balancer['DNSName'] return load_balancer def delete_load_balancer(self): """ Deletes a load balancer. """ try: response = self.elb_client.describe_load_balancers(Names=[self.load_balancer_name]) lb_arn = response['LoadBalancers'][0]['LoadBalancerArn'] self.elb_client.delete_load_balancer(LoadBalancerArn=lb_arn) log.info("Deleted load balancer %s.", self.load_balancer_name) waiter = self.elb_client.get_waiter('load_balancers_deleted') log.info("Waiting for load balancer to be deleted...") waiter.wait(Names=[self.load_balancer_name]) except ClientError as err: if err.response['Error']['Code'] == 'LoadBalancerNotFound': log.info("Load balancer %s does not exist, nothing to do.", self.load_balancer_name) else: raise LoadBalancerError( f"Couldn't delete load balancer {self.load_balancer_name}: {err}") def verify_load_balancer_endpoint(self): """ Verify this computer can successfully send a GET request to the load balancer endpoint. """ success = False retries = 3 while not success and retries > 0: try: lb_response = requests.get(f'http://{self.endpoint()}') log.info("Got response %s from load balancer endpoint.", lb_response.status_code) if lb_response.status_code == 200: success = True else: retries = 0 except requests.exceptions.ConnectionError: log.info("Got connection error from load balancer endpoint, retrying...") retries -= 1 time.sleep(10) return success def check_target_health(self): """ Checks the health of the instances in the target group. :return: The health status of the target group. """ try: tg_response = self.elb_client.describe_target_groups(Names=[self.target_group_name]) health_response = self.elb_client.describe_target_health( TargetGroupArn=tg_response['TargetGroups'][0]['TargetGroupArn']) except ClientError as err: raise LoadBalancerError( f"Couldn't check health of {self.target_group_name} targets: {err}") else: return health_response['TargetHealthDescriptions']
Create a class that uses DynamoDB to simulate a recommendation service.
class RecommendationService: """ Encapsulates a DynamoDB table to use as a service that recommends books, movies, and songs. """ def __init__(self, table_name, dynamodb_client): """ :param table_name: The name of the DynamoDB recommendations table. :param dynamodb_client: A Boto3 DynamoDB client. """ self.table_name = table_name self.dynamodb_client = dynamodb_client @classmethod def from_client(cls, table_name): """ Creates this class from a Boto3 client. :param table_name: The name of the DynamoDB recommendations table. """ ddb_client = boto3.client('dynamodb') return cls(table_name, ddb_client) def create(self): """ Creates a DynamoDB table to use a recommendation service. The table has a hash key named 'MediaType' that defines the type of media recommended, such as Book or Movie, and a range key named 'ItemId' that, combined with the MediaType, forms a unique identifier for the recommended item. :return: Data about the newly created table. """ try: response = self.dynamodb_client.create_table( TableName=self.table_name, AttributeDefinitions=[{ 'AttributeName': 'MediaType', 'AttributeType': 'S'},{ 'AttributeName': 'ItemId', 'AttributeType': 'N'}], KeySchema=[{ 'AttributeName': 'MediaType', 'KeyType': 'HASH'}, { 'AttributeName': 'ItemId', 'KeyType': 'RANGE'}], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}) log.info("Creating table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter('table_exists') waiter.wait(TableName=self.table_name) log.info("Table %s created.", self.table_name) except ClientError as err: if err.response['Error']['Code'] == 'ResourceInUseException': log.info("Table %s exists, nothing to be do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when creating table: {err}.") else: return response def populate(self, data_file): """ Populates the recommendations table from a JSON file. :param data_file: The path to the data file. """ try: with open(data_file) as data: items = json.load(data) batch = [{'PutRequest': {'Item': item}} for item in items] self.dynamodb_client.batch_write_item(RequestItems={self.table_name: batch}) log.info("Populated table %s with items from %s.", self.table_name, data_file) except ClientError as err: raise RecommendationServiceError( self.table_name, f"Couldn't populate table from {data_file}: {err}") def destroy(self): """ Deletes the recommendations table. """ try: self.dynamodb_client.delete_table(TableName=self.table_name) log.info("Deleting table %s...", self.table_name) waiter = self.dynamodb_client.get_waiter('table_not_exists') waiter.wait(TableName=self.table_name) log.info("Table %s deleted.", self.table_name) except ClientError as err: if err.response['Error']['Code'] == 'ResourceNotFoundException': log.info("Table %s does not exist, nothing to do.", self.table_name) else: raise RecommendationServiceError( self.table_name, f"ClientError when deleting table: {err}.")
Create a class that wraps Systems Manager actions.
class ParameterHelper: """ Encapsulates Systems Manager parameters. This example uses these parameters to drive the demonstration of resilient architecture, such as failure of a dependency or how the service responds to a health check. """ table = 'doc-example-resilient-architecture-table' failure_response = 'doc-example-resilient-architecture-failure-response' health_check = 'doc-example-resilient-architecture-health-check' def __init__(self, table_name, ssm_client): """ :param table_name: The name of the DynamoDB table that is used as a recommendation service. :param ssm_client: A Boto3 Systems Manager client. """ self.ssm_client = ssm_client self.table_name = table_name @classmethod def from_client(cls, table_name): ssm_client = boto3.client('ssm') return cls(table_name, ssm_client) def reset(self): """ Resets the Systems Manager parameters to starting values for the demo. These are the name of the DynamoDB recommendation table, no response when a dependency fails, and shallow health checks. """ self.put(self.table, self.table_name) self.put(self.failure_response, 'none') self.put(self.health_check, 'shallow') def put(self, name, value): """ Sets the value of a named Systems Manager parameter. :param name: The name of the parameter. :param value: The new value of the parameter. """ try: self.ssm_client.put_parameter(Name=name, Value=value, Overwrite=True) log.info("Setting demo parameter %s to '%s'.", name, value) except ClientError as err: raise ParameterHelperError( f"Couldn't set parameter {name} to {value}: {err}")
-
For API details, see the following topics in Amazon SDK for Python (Boto3) API Reference.
-
For a complete list of Amazon SDK developer guides and code examples, see Using this service with an Amazon SDK. This topic also includes information about getting started and details about previous SDK versions.