Example Amazon Lambda Functions for Amazon Config Rules (Python) - Amazon Config
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Example Amazon Lambda Functions for Amazon Config Rules (Python)

Amazon Lambda executes functions in response to events that are published by Amazon services. The function for an Amazon Config Custom Lambda rule receives an event that is published by Amazon Config, and the function then uses data that it receives from the event and that it retrieves from the Amazon Config API to evaluate the compliance of the rule. The operations in a function for a Config rule differ depending on whether it performs an evaluation that is triggered by configuration changes or triggered periodically.

For information about common patterns within Amazon Lambda functions, see Programming Model in the Amazon Lambda Developer Guide.

Example Function for Evaluations Triggered by Configuration Changes

Amazon Config will invoke a function like the following example when it detects a configuration change for a resource that is within a custom rule's scope.

If you use the Amazon Config console to create a rule that is associated with a function like this example, choose Configuration changes as the trigger type. If you use the Amazon Config API or Amazon CLI to create the rule, set the MessageType attribute to ConfigurationItemChangeNotification and OversizedConfigurationItemChangeNotification. These settings enable your rule to be triggered whenever Amazon Config generates a configuration item or an oversized configuration item as a result of a resource change.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Function Operations

The function performs the following operations at runtime:

  1. The function runs when Amazon Lambda passes the event object to the handler function. In this example, the function accepts the optional callback parameter, which it uses to return information to the caller. Amazon Lambda also passes a context object, which contains information and methods that the function can use while it runs. Note that in newer versions of Lambda, context is no longer used.

  2. The function checks whether the messageType for the event is a configuration item or an oversized configuration item, and then returns the configuration item.

  3. The handler calls the isApplicable function to determine whether the resource was deleted.

    Note

    Rules reporting on deleted resources should return the evaluation result of NOT_APPLICABLE in order to avoid unnecessary rule evaluations.

  4. The handler calls the evaluateChangeNotificationCompliance function and passes the configurationItem and ruleParameters objects that Amazon Config published in the event.

    The function first evaluates whether the resource is an EC2 instance. If the resource is not an EC2 instance, the function returns a compliance value of NOT_APPLICABLE.

    The function then evaluates whether the instanceType attribute in the configuration item is equal to the desiredInstanceType parameter value. If the values are equal, the function returns COMPLIANT. If the values are not equal, the function returns NON_COMPLIANT.

  5. The handler prepares to send the evaluation results to Amazon Config by initializing the putEvaluationsRequest object. This object includes the Evaluations parameter, which identifies the compliance result, the resource type, and the ID of the resource that was evaluated. The putEvaluationsRequest object also includes the result token from the event, which identifies the rule and the event for Amazon Config.

  6. The handler sends the evaluation results to Amazon Config by passing the object to the putEvaluations method of the config client.

Example Function for Periodic Evaluations

Amazon Config will invoke a function like the following example for periodic evaluations. Periodic evaluations occur at the frequency that you specify when you define the rule in Amazon Config.

If you use the Amazon Config console to create a rule that is associated with a function like this example, choose Periodic as the trigger type. If you use the Amazon Config API or Amazon CLI to create the rule, set the MessageType attribute to ScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Function Operations

The function performs the following operations at runtime:

  1. The function runs when Amazon Lambda passes the event object to the handler function. In this example, the function accepts the optional callback parameter, which it uses to return information to the caller. Amazon Lambda also passes a context object, which contains information and methods that the function can use while it runs. Note that in newer versions of Lambda, context is no longer used.

  2. To count the resources of the specified type, the handler calls the countResourceTypes function, and it passes the applicableResourceType parameter that it received from the event. The countResourceTypes function calls the listDiscoveredResources method of the config client, which returns a list of identifiers for the applicable resources. The function uses the length of this list to determine the number of applicable resources, and it returns this count to the handler.

  3. The handler prepares to send the evaluation results to Amazon Config by initializing the putEvaluationsRequest object. This object includes the Evaluations parameter, which identifies the compliance result and the Amazon Web Services account that was published in the event. You can use the Evaluations parameter to apply the result to any resource type that is supported by Amazon Config. The putEvaluationsRequest object also includes the result token from the event, which identifies the rule and the event for Amazon Config.

  4. Within the putEvaluationsRequest object, the handler calls the evaluateCompliance function. This function tests whether the number of applicable resources exceeds the maximum assigned to the maxCount parameter, which was provided by the event. If the number of resources exceeds the maximum, the function returns NON_COMPLIANT. If the number of resources does not exceed the maximum, the function returns COMPLIANT.

  5. The handler sends the evaluation results to Amazon Config by passing the object to the putEvaluations method of the config client.