Example Amazon Lambda Functions for Amazon Config Rules (Python)
Amazon Lambda executes functions in response to events that are published by Amazon services. The function for an Amazon Config Custom Lambda rule receives an event that is published by Amazon Config, and the function then uses data that it receives from the event and that it retrieves from the Amazon Config API to evaluate the compliance of the rule. The operations in a function for a Config rule differ depending on whether it performs an evaluation that is triggered by configuration changes or triggered periodically.
For information about common patterns within Amazon Lambda functions, see Programming Model in the Amazon Lambda Developer Guide.
Contents
Example Function for Evaluations Triggered by Configuration Changes
Amazon Config will invoke a function like the following example when it detects a configuration change for a resource that is within a custom rule's scope.
If you use the Amazon Config console to create a rule that is associated with a function like
this example, choose Configuration changes as the trigger type. If
you use the Amazon Config API or Amazon CLI to create the rule, set the MessageType
attribute to ConfigurationItemChangeNotification
and
OversizedConfigurationItemChangeNotification
. These settings enable
your rule to be triggered whenever Amazon Config generates a configuration item or an oversized
configuration item as a result of a resource change.
import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Function Operations
The function performs the following operations at runtime:
-
The function runs when Amazon Lambda passes the
event
object to thehandler
function. In this example, the function accepts the optionalcallback
parameter, which it uses to return information to the caller. Amazon Lambda also passes acontext
object, which contains information and methods that the function can use while it runs. Note that in newer versions of Lambda, context is no longer used. -
The function checks whether the
messageType
for the event is a configuration item or an oversized configuration item, and then returns the configuration item. -
The handler calls the
isApplicable
function to determine whether the resource was deleted.Note
Rules reporting on deleted resources should return the evaluation result of
NOT_APPLICABLE
in order to avoid unnecessary rule evaluations. -
The handler calls the
evaluateChangeNotificationCompliance
function and passes theconfigurationItem
andruleParameters
objects that Amazon Config published in the event.The function first evaluates whether the resource is an EC2 instance. If the resource is not an EC2 instance, the function returns a compliance value of
NOT_APPLICABLE
.The function then evaluates whether the
instanceType
attribute in the configuration item is equal to thedesiredInstanceType
parameter value. If the values are equal, the function returnsCOMPLIANT
. If the values are not equal, the function returnsNON_COMPLIANT
. -
The handler prepares to send the evaluation results to Amazon Config by initializing the
putEvaluationsRequest
object. This object includes theEvaluations
parameter, which identifies the compliance result, the resource type, and the ID of the resource that was evaluated. TheputEvaluationsRequest
object also includes the result token from the event, which identifies the rule and the event for Amazon Config. -
The handler sends the evaluation results to Amazon Config by passing the object to the
putEvaluations
method of theconfig
client.
Example Function for Periodic Evaluations
Amazon Config will invoke a function like the following example for periodic evaluations. Periodic evaluations occur at the frequency that you specify when you define the rule in Amazon Config.
If you use the Amazon Config console to create a rule that is associated with a function like
this example, choose Periodic as the trigger type. If you use the
Amazon Config API or Amazon CLI to create the rule, set the MessageType
attribute to
ScheduledNotification
.
import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Function Operations
The function performs the following operations at runtime:
-
The function runs when Amazon Lambda passes the
event
object to thehandler
function. In this example, the function accepts the optionalcallback
parameter, which it uses to return information to the caller. Amazon Lambda also passes acontext
object, which contains information and methods that the function can use while it runs. Note that in newer versions of Lambda, context is no longer used. -
To count the resources of the specified type, the handler calls the
countResourceTypes
function, and it passes theapplicableResourceType
parameter that it received from the event. ThecountResourceTypes
function calls thelistDiscoveredResources
method of theconfig
client, which returns a list of identifiers for the applicable resources. The function uses the length of this list to determine the number of applicable resources, and it returns this count to the handler. -
The handler prepares to send the evaluation results to Amazon Config by initializing the
putEvaluationsRequest
object. This object includes theEvaluations
parameter, which identifies the compliance result and the Amazon Web Services account that was published in the event. You can use theEvaluations
parameter to apply the result to any resource type that is supported by Amazon Config. TheputEvaluationsRequest
object also includes the result token from the event, which identifies the rule and the event for Amazon Config. -
Within the
putEvaluationsRequest
object, the handler calls theevaluateCompliance
function. This function tests whether the number of applicable resources exceeds the maximum assigned to themaxCount
parameter, which was provided by the event. If the number of resources exceeds the maximum, the function returnsNON_COMPLIANT
. If the number of resources does not exceed the maximum, the function returnsCOMPLIANT
. -
The handler sends the evaluation results to Amazon Config by passing the object to the
putEvaluations
method of theconfig
client.