用于 Amazon Config 规则 (Python) 的示例 Amazon Lambda 函数 - Amazon Config
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于 Amazon Config 规则 (Python) 的示例 Amazon Lambda 函数

Amazon Lambda 将执行函数来响应由 Amazon 服务发布的事件。用于 Amazon Config 自定义 Lambda 规则的函数收到一个由 Amazon Config 发布的事件,然后该函数使用从该事件接收以及它通过 Amazon Config API 检索的数据,来评估是否符合规则。用于 Config 规则的函数的运作方式会因其执行的评估是由配置更改触发还是定期触发而有所不同。

有关 Amazon Lambda 函数中常见模式的信息,请参阅《Amazon Lambda 开发人员指南》中的编程模型

评估由配置更改触发时的示例函数

Amazon Config 检测到自定义规则范围内的资源发生配置更改时,会调用函数示例如下。

如果您使用 Amazon Config 控制台创建与类似此示例的函数关联的规则,请选择 Configuration changes (配置更改) 作为触发器类型。如果您使用 Amazon Config API 或 Amazon CLI 创建规则,请将 MessageType 属性设置为 ConfigurationItemChangeNotificationOversizedConfigurationItemChangeNotification。这些设置可使您的规则在每次 Amazon Config 生成配置项或资源更改导致过大配置项时触发。

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
函数运作

本函数在运行时执行以下操作:

  1. 此函数在 Amazon Lambda 将 event 对象传递给 handler 函数时运行。在此示例中,该函数接受可选 callback 参数,该参数用于向调用方返回信息。Amazon Lambda 还会传递一个 context 对象,其中包含函数在运行时可以使用的信息和方法。请注意,在新版本的 Lambda 中,不再使用上下文。

  2. 此函数检查事件的 messageType 是配置项还是过大配置项,然后返回配置项。

  3. 处理程序调用 isApplicable 函数来确定资源是否已删除。

    注意

    报告已删除资源的规则应返回 NOT_APPLICABLE 的评估结果,以避免不必要的规则评估。

  4. 处理程序调用 evaluateChangeNotificationCompliance 函数并传递 Amazon Config 在事件中发布的 configurationItemruleParameters 对象。

    函数首先评估资源是否为 EC2 实例。如果资源不是 EC2 实例,函数会返回 NOT_APPLICABLE 这一合规性值。

    然后,函数评估配置项中的 instanceType 属性是否与 desiredInstanceType 参数值一致。如果值相等,该函数将返回 COMPLIANT。如果值不相等,该函数将返回 NON_COMPLIANT

  5. 处理程序通过初始化 putEvaluationsRequest 对象来做好向 Amazon Config 发送评估结果的准备。该对象包含 Evaluations 参数,这一参数用于识别受评估资源的合规性结果、资源类型和 ID。putEvaluationsRequest 对象还包含来自事件的结果令牌,该令牌可标识 Amazon Config 的规则和事件。

  6. 处理程序通过向 config 客户端的 putEvaluations 方法传递对象来向 Amazon Config 发送评估结果。

定期评估时的示例函数

Amazon Config 针对定期评估调用的函数示例如下。定期评估按您在 Amazon Config 中定义规则时指定的频率进行。

如果您使用 Amazon Config 控制台创建与类似此示例的函数关联的规则,请选择 Periodic (定期) 作为触发器类型。如果您使用 Amazon Config API 或 Amazon CLI 创建规则,请将 MessageType 属性设置为 ScheduledNotification

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
函数运作

本函数在运行时执行以下操作:

  1. 此函数在 Amazon Lambda 将 event 对象传递给 handler 函数时运行。在此示例中,该函数接受可选 callback 参数,该参数用于向调用方返回信息。Amazon Lambda 还会传递一个 context 对象,其中包含函数在运行时可以使用的信息和方法。请注意,在新版本的 Lambda 中,不再使用上下文。

  2. 为计数指定类型的资源,处理程序会调用 countResourceTypes 函数,而且它传递其从事件收到的 applicableResourceType 参数。countResourceTypes 函数调用 listDiscoveredResources 客户端的 config 方法,该方法返回适用资源的标识符列表。该函数使用此列表的长度来确定适用资源的数量,而且它将此计数返回到处理程序。

  3. 处理程序通过初始化 putEvaluationsRequest 对象来做好向 Amazon Config 发送评估结果的准备。该对象包含 Evaluations 参数,该参数可以识别合规性结果和在事件中发布的 Amazon Web Services 账户。您可以使用 Evaluations 参数将结果应用于 Amazon Config 支持的任何资源类型。putEvaluationsRequest 对象还包含来自事件的结果令牌,该令牌可标识 Amazon Config 的规则和事件。

  4. putEvaluationsRequest 对象中,处理程序调用 evaluateCompliance 函数。此函数测试适用资源的数量是否超出分配给事件所提供的 maxCount 参数的最大值。如果资源的数量超出最大值,函数将返回 NON_COMPLIANT。如果资源的数量没有超出最大值,函数将返回 COMPLIANT

  5. 处理程序通过向 config 客户端的 putEvaluations 方法传递对象来向 Amazon Config 发送评估结果。