使用 Lambda 函数响应事件 - Amazon Certificate Manager
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

使用 Lambda 函数响应事件

此过程演示如何使用 Amazon Lambda 以监听 CloudWatch Events、使用 Amazon Simple Notification Service (SNS) 创建通知,并将结果发布到 Amazon Security Hub,为管理员和安全团队提供可见性。

设置 Lambda 函数和 IAM 角色

  1. 首先配置 Amazon Identity and Access Management (IAM) 角色并定义 Lambda 函数所需的权限。通过此安全性最佳实践,您可以灵活地指定谁有权调用该函数,并限制授予该用户的权限。不建议直接在用户账户下运行大多数Amazon操作,特别是不要在管理员账户下运行。

    通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 使用 JSON 策略编辑器创建在以下模板中定义的策略。提供您自己的区域和Amazon账户详细信息。有关更多信息,请参阅在“JSON”选项卡上创建策略

    { "Version":"2012-10-17", "Statement":[ { "Sid":"LambdaCertificateExpiryPolicy1", "Effect":"Allow", "Action":"logs:CreateLogGroup", "Resource":"arn:aws:logs:<region>:<AWS-ACCT-NUMBER>:*" }, { "Sid":"LambdaCertificateExpiryPolicy2", "Effect":"Allow", "Action":[ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource":[ "arn:aws:logs:<region>:<AWS-ACCT-NUMBER>:log-group:/aws/lambda/handle-expiring-certificates:*" ] }, { "Sid":"LambdaCertificateExpiryPolicy3", "Effect":"Allow", "Action":[ "acm:DescribeCertificate", "acm:GetCertificate", "acm:ListCertificates", "acm:ListTagsForCertificate" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy4", "Effect":"Allow", "Action":"SNS:Publish", "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy5", "Effect":"Allow", "Action":[ "SecurityHub:BatchImportFindings", "SecurityHub:BatchUpdateFindings", "SecurityHub:DescribeHub" ], "Resource":"*" }, { "Sid":"LambdaCertificateExpiryPolicy6", "Effect":"Allow", "Action":"cloudwatch:ListMetrics", "Resource":"*" } ] }
  3. 创建 IAM 角色并向其附加新策略。有关创建 IAM 角色和附加策略的信息,请参阅为Amazon服务创建角色(控制台)

  4. 打开 Amazon Lambda 控制台,地址:https://console.aws.amazon.com/lambda/

  5. 创建 Lambda 函数。有关更多信息,请参阅使用控制台创建 Lambda 函数。完成以下步骤:

    1. 创建函数页面上,选择 Author from scratch(从头开始创作)选项以创建函数。

    2. Function name(函数名称)字段中指定一个名称,如“handle-expiring-certificates”。

    3. Runtime(运行时)列表中,选择“Python 3.8”。

    4. 展开 Change default execution role(更改默认执行角色),然后选择 Use an existing role(使用现有角色)

    5. Existing role(现有角色)列表中选择您先前创建的角色。

    6. 选择 Create function(创建函数)。

    7. Function code(函数代码)下,插入以下代码。

      # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import json import boto3 import os from datetime import datetime, timedelta, timezone # ------------------------------------------- # setup global data # ------------------------------------------- utc = timezone.utc # make today timezone aware today = datetime.now().replace(tzinfo=utc) # set up time window for alert - default to 45 if its missing if os.environ.get('EXPIRY_DAYS') is None: expiry_days = 45 else: expiry_days = int(os.environ['EXPIRY_DAYS']) expiry_window = today + timedelta(days = expiry_days) def lambda_handler(event, context): # if this is coming from the ACM event, its for a single certificate if (event['detail-type'] == "ACM Certificate Approaching Expiration"): response = handle_single_cert(event, context.invoked_function_arn) return { 'statusCode': 200, 'body': response } def handle_single_cert(event, context_arn): cert_client = boto3.client('acm') cert_details = cert_client.describe_certificate(CertificateArn=event['resources'][0]) result = 'The following certificate is expiring within ' + str(expiry_days) + ' days: ' + cert_details['Certificate']['DomainName'] # check the expiry window before logging to Security Hub and sending an SNS if cert_details['Certificate']['NotAfter'] < expiry_window: # This call is the text going into the SNS notification result = result + ' (' + cert_details['Certificate']['CertificateArn'] + ') ' # this call is publishing to SH result = result + ' - ' + log_finding_to_sh(event, cert_details, context_arn) # if there's an SNS topic, publish a notification to it if os.environ.get('SNS_TOPIC_ARN') is None: response = result else: sns_client = boto3.client('sns') response = sns_client.publish(TopicArn=os.environ['SNS_TOPIC_ARN'], Message=result, Subject='Certificate Expiration Notification') return result def log_finding_to_sh(event, cert_details, context_arn): # setup for security hub sh_region = get_sh_region(event['region']) sh_hub_arn = "arn:aws:securityhub:{0}:{1}:hub/default".format(sh_region, event['account']) sh_product_arn = "arn:aws:securityhub:{0}:{1}:product/{1}/default".format(sh_region, event['account']) # check if security hub is enabled, and if the hub arn exists sh_client = boto3.client('securityhub', region_name = sh_region) try: sh_enabled = sh_client.describe_hub(HubArn = sh_hub_arn) # the previous command throws an error indicating the hub doesn't exist or lambda doesn't have rights to it so we'll stop attempting to use it except Exception as error: sh_enabled = None print ('Default Security Hub product doesn\'t exist') response = 'Security Hub disabled' # This is used to generate the URL to the cert in the Security Hub Findings to link directly to it cert_id = right(cert_details['Certificate']['CertificateArn'], 36) if sh_enabled: # set up a new findings list new_findings = [] # add expiring certificate to the new findings list new_findings.append({ "SchemaVersion": "2018-10-08", "Id": cert_id, "ProductArn": sh_product_arn, "GeneratorId": context_arn, "AwsAccountId": event['account'], "Types": [ "Software and Configuration Checks/AWS Config Analysis" ], "CreatedAt": event['time'], "UpdatedAt": event['time'], "Severity": { "Original": '89.0', "Label": 'HIGH' }, "Title": 'Certificate expiration', "Description": 'cert expiry', 'Remediation': { 'Recommendation': { 'Text': 'A new certificate for ' + cert_details['Certificate']['DomainName'] + ' should be imported to replace the existing imported certificate before expiration', 'Url': "https://console.aws.amazon.com/acm/home?region=" + event['region'] + "#/?id=" + cert_id } }, 'Resources': [ { 'Id': event['id'], 'Type': 'ACM Certificate', 'Partition': 'aws', 'Region': event['region'] } ], 'Compliance': {'Status': 'WARNING'} }) # push any new findings to security hub if new_findings: try: response = sh_client.batch_import_findings(Findings=new_findings) if response['FailedCount'] > 0: print("Failed to import {} findings".format(response['FailedCount'])) except Exception as error: print("Error: ", error) raise return json.dumps(response) # function to setup the sh region def get_sh_region(event_region): # security hub findings may need to go to a different region so set that here if os.environ.get('SECURITY_HUB_REGION') is None: sh_region_local = event_region else: sh_region_local = os.environ['SECURITY_HUB_REGION'] return sh_region_local # quick function to trim off right side of a string def right(value, count): # To get right part of string, use negative first index in slice. return value[-count:]
    8. Environment variables(环境变量)下,选择 Edit(编辑)并可选择添加以下变量。

      • (可选)EXPIRY_DAYS

        指定发送证书过期通知之前的准备时间(天数)。此函数默认为 45 天,但您可以指定自定义值。

      • (可选)SNS_TOPIC_ARN

        指定 Amazon SNS 的 ARN。以 arn:aws:sns:<区域>:<账户编号>:<主题名称> 格式提供完整的 ARN。

      • (可选)SECURITY_HUB_REGION

        在不同的区域指定一个 Amazon Security Hub。如果未指定此选项,则使用正在运行的 Lambda 函数的区域。如果该函数在多个区域中运行,则可能需要将所有证书消息都转到单个区域中的 Security Hub。

    9. Basic settings(基本设置)下,将 Timeout(超时)设置为 30 秒。

    10. 在页面顶部,选择 Deploy(部署)

完成以下过程中的任务以开始使用此解决方案。

自动发送过期电子邮件通知

在此示例中,我们在通过 CloudWatch Events 引发事件时,为每个即将过期的证书提供一个电子邮件。预设情况下,ACM 每天都会为距离过期日 45 天或更短时间的证书引发一个事件。(此时间段可以使用 ACM API 的 PutAccountConfiguration 操作自定义。) 这些事件中的每一个都会触发以下级联的自动操作:

ACM raises CloudWatch event → >>>>>>> events Event matches CloudWatch rule → Rule calls Lambda function → Function sends SNS email and logs a Finding in Security Hub
  1. 创建 Lambda 函数并配置权限。(已完成 – 请参阅 设置 Lambda 函数和 IAM 角色)。

  2. 为 Lambda 函数创建用于发送通知的标准 SNS 主题。有关更多信息,请参阅创建 Amazon SNS 主题

  3. 为任何感兴趣的人订阅新 SNS 主题。有关更多信息,请参阅订阅 Amazon SNS 主题

  4. 创建 CloudWatch Events 规则以触发 Lambda 函数。有关更多信息,请参阅创建触发事件的 CloudWatch Events 规则

    在 CloudWatch 控制台(位置:https://console.aws.amazon.com/cloudwatch/)中,导航到事件 > 规则页面,然后选择 Create rule(创建规则)。指定服务名称事件类型Lambda 函数。在事件模式预览编辑器中,粘贴以下代码:

    { "source": [ "aws.acm" ], "detail-type": [ "ACM Certificate Approaching Expiration" ] }

    显示示例事件下会显示如 Lambda 接收这样的事件:

    { "version": "0", "id": "9c95e8e4-96a4-ef3f-b739-b6aa5b193afb", "detail-type": "ACM Certificate Approaching Expiration", "source": "aws.acm", "account": "123456789012", "time": "2020-09-30T06:51:08Z", "region": "us-east-1", "resources": [ "arn:aws:acm:us-east-1:123456789012:certificate/61f50cd4-45b9-4259-b049-d0a53682fa4b" ], "detail": { "DaysToExpiry": 31, "CommonName": "My Awesome Service" } }

清理

一旦您不再需要示例配置或任何配置,最佳实践是删除该配置的所有痕迹,以避免安全问题和以后的意外费用:

  • IAM 策略和角色

  • Lambda 函数

  • CloudWatch Events 规则

  • 与 Lambda 关联的 CloudWatch Logs

  • SNS 主题