将数据库活动流与 Amazon Aurora 结合使用 - Amazon Aurora
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将数据库活动流与 Amazon Aurora 结合使用

监控数据库活动可以帮助您为数据库提供安全保障,并帮助满足合规性和法规要求。使用 Amazon Aurora 监控数据库活动的一种方法是使用数据库活动流功能。数据库活动流可提供关系数据库中数据库活动的近实时数据流。在将数据库活动流与监控工具集成在一起时,您可以监控和审核数据库活动。

除了外部安全威胁之外,托管数据库还需要提供保护以规避来自数据库管理员 (DBA) 的内部风险。数据库活动流可通过控制 DBA 对数据库活动流的访问来帮助保护您的数据库,使其免受内部威胁。因此,数据库活动流的收集、传输、存储和后续处理超出了管理数据库的 DBA 的访问权限。

数据库活动流从 Aurora 推送到代表您的 Aurora 数据库集群创建的 Amazon Kinesis 数据流。在 Kinesis 中,Amazon Kinesis Data Firehose 和 AWS Lambda 等 AWS 服务或应用程序可以随后使用活动流以进行合规性管理。对于使用 Aurora PostgreSQL 的数据库活动流,这些合规性应用程序包括 IBM 的 Infosphere Guardium、McAfee 的 Data Center Security Suite 和 Imperva 的 SecureSphere Database Audit and Protection。此类应用程序可以使用活动流信息生成警报,并提供对 Aurora 数据库集群上所有活动的审核。

提示

检查网站是否有要与数据库活动流一起使用的特定应用程序。确认对要使用的特定 Aurora 数据库引擎和引擎版本的支持。

数据库活动流具有以下限制和要求:

  • 目前,以下 与 PostgreSQL 兼容的 Aurora 和 Aurora MySQL 版本支持数据库活动流。

    • 对于 Aurora PostgreSQL,数据库活动流需要 2.3 版,该版本与 PostgreSQL 10.7 版兼容。

    • 对于 Aurora MySQL,数据库活动流需要 2.08 版或更高版本,该版本与 MySQL 5.7 版兼容。

  • 数据库活动流支持所有可用 Aurora 数据库实例类的硬件规格中为 Aurora 列出的数据库实例类,但有一些例外:

    • 对于 Aurora PostgreSQL,不能将流与 db.t3.medium 实例类一起使用。

    • 对于 Aurora MySQL,不能将流与任何 db.t2 或 db.t3 实例类一起使用。

  • 以下 AWS 区域不支持数据库活动流:

    • 中国(北京)区域、cn-north-1

    • 中国 (宁夏) 区域、cn-northwest-1

    • AWS GovCloud(美国东部)、us-gov-east-1

    • AWS GovCloud(美国西部)、us-gov-west-1

    • 亚太区域 (大阪当地)、ap-northeast-3

    • 欧洲(米兰)区域、eu-south-1

    • 非洲(开普敦)区域、af-south-1

    • 中东(巴林)区域、me-south-1

  • 数据库活动流需要使用 AWS Key Management Service (AWS KMS)。AWS KMS 是必需的,因为活动流始终是加密的。

Aurora MySQL 数据库活动流的网络先决条件

要将数据库活动流与 Aurora MySQL 结合使用,在 VPC 中使用数据库活动流的所有数据库实例都必须能够访问 AWS KMS 终端节点。在为 Aurora MySQL 集群启用数据库活动流之前,请确保满足此要求。

重要

如果 Aurora MySQL 数据库集群无法访问 AWS KMS 终端节点,活动流将停止运行。在这种情况下,Aurora 会使用 RDS 事件通知您此问题。

如果 Aurora 集群公开可用,则会自动满足此要求。

如果您收到通知,指出您的 Aurora MySQL 集群因无法访问 AWS KMS 终端节点而无法使用数据库活动流,请检查您的 Aurora 数据库集群是公有的还是私有的。如果您的 Aurora 数据库集群是私有的,您必须将其配置为允许连接。

对于要设为公有的 Aurora 数据库集群,必须将其标记为可公开访问。在这种情况下,如果在 AWS 管理控制台中查看数据库集群的详细信息,就会发现公开访问设置为。数据库集群还必须在 Amazon VPC 公有子网中。有关可公开访问的数据库实例的更多信息,请参阅在 VPC 中使用数据库实例。有关公有 Amazon VPC 子网的更多信息,请参阅您的 VPC 和子网

如果您的 Aurora 数据库集群不可公开访问且位于 VPC 公有子网中,则它是私有的。您可能保持 Aurora MySQL 集群是私有的,并将其与数据库活动流一起使用。如果是这样,请配置集群,以便它可以通过网络地址转换 (NAT) 连接到 Internet 地址。有关在 VPC 中配置 NAT 的更多信息,请参阅 NAT 网关

有关配置 VPC 终端节点的更多信息,请参阅 VPC 终端节点

启动数据库活动流

您可以在数据库集群级别启动活动流,以监控集群的所有数据库实例的数据库活动。还将自动监控添加到集群的任何数据库实例。

在启动活动流时,每个数据库活动事件(如更改或访问)都会生成一个活动流事件。从 SQL 命令(例如 CONNECTSELECT)生成访问事件。从 SQL 命令(例如 CREATEINSERT)生成更改事件。为使每个活动流事件持久,Aurora 加密并存储它。

您可以选择让数据库会话以异步或同步方式处理数据库活动事件:

  • 异步模式 – 在异步模式下,当数据库会话生成活动流事件时,会话将立即返回到正常活动。在后台,活动流事件将成为持久记录。如果后台任务出错,则将发送 RDS 事件。此事件指示活动流事件记录可能已丢失的任何时间段的开始和结束时间。

    异步模式可提高数据库性能,而不是活动流的准确性。

    注意

    异步模式适用于 Aurora PostgreSQL 和 Aurora MySQL。

  • 同步模式 – 在同步模式下,当数据库会话生成活动流事件时,会话将阻塞其他活动,直到该事件变得持久。如果因某个原因无法使事件持久,数据库会话将返回到正常活动。但会发送一个 RDS 事件来指示活动流记录可能会丢失一段时间。在系统恢复到正常运行状态后发送第二个 RDS 事件。

    同步模式可提高活动流的准确性,而不是数据库性能。

    注意

    同步模式适用于 Aurora PostgreSQL。您不能将同步模式用于 Aurora MySQL。

启动活动流

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择数据库

  3. 选择要修改的数据库集群。

  4. 对于 Actions (操作),选择 Start activity stream (启动活动流)。此时将显示 Database Activity Stream (数据库活动流) 窗口。

  5. Database Activity Stream (数据库活动流) 窗口中,输入以下设置:

    • 对于 Master key (主密钥),从 AWS KMS 密钥列表中选择一个密钥。

      注意

      如果您的 Aurora MySQL 集群无法访问 AWS KMS 密钥,请先按照 Aurora MySQL 数据库活动流的网络先决条件中的说明启用此类访问。

      主密钥用于加密密钥,而密钥反过来加密记录的数据库活动。您必须选择默认密钥以外的主密钥。有关加密密钥和 AWS KMS 的更多信息,请参阅 AWS Key Management Service Developer Guide 中的什么是 AWS Key Management Service?

    • 对于 Database activity stream mode (数据库活动流模式),选择 Asynchronous (异步)Synchronous (同步)

      注意

      目前,此选项仅适用于 Aurora PostgreSQL。对于 Aurora MySQL,您只能使用异步模式。

    • 选择立即应用

      如果您选择 Schedule for the next maintenance window (下一个维护时段的计划),则数据库不会立即重启。它仍然处于 PENDING REBOOT 状态。在这种情况下,数据库活动流不会启动,直到下一个维护时段。

    在输入设置后,选择 Continue (继续)

    集群的数据库实例状态显示正在配置数据库活动流。

要为数据库集群启动数据库活动流,请使用 start-activity-stream AWS CLI 命令配置数据库集群。使用 --region 参数标识数据库集群的 AWS 区域。--apply-immediately 参数是可选的。

对于 Linux、macOS 或 Unix:

aws rds --region MY_REGION \ start-activity-stream \ --mode [sync | async] \ --kms-key-id MY_KMS_KEY_ARN \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately

对于 Windows:

aws rds --region MY_REGION ^ start-activity-stream ^ --mode [sync | async] ^ --kms-key-id MY_KMS_KEY_ARN ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately
注意

--mode 参数是必需的。对于 Aurora PostgreSQL,您可以选择任一值。对于 Aurora MySQL,您必须始终指定 async

获取活动流的状态

您可以使用控制台或 AWS CLI 获取活动流的状态。

获取数据库集群的活动流状态

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择 Databases (数据库),然后选择数据库集群。

  3. 选择 Configuration (配置) 选项卡,并选择 Database activity stream (数据库活动流) 以查看状态。

您可以获取数据库集群的活动流配置以作为对 describe-db-clusters CLI 请求的响应。在以下示例中,查看 ActivityStreamKinesisStreamNameActivityStreamStatusActivityStreamKmsKeyIdActivityStreamMode 的值。

请求如下。

aws rds --region MY_REGION describe-db-clusters --db-cluster-identifier my-cluster

响应包括以下数据库活动流项。

下面的示例介绍一个 JSON 响应。这些字段对于 Aurora PostgreSQL 和 Aurora MySQL 是相同的,只是对于 Aurora MySQL,ActivityStreamMode 总是 async,而对于 Aurora PostgreSQL,它可能是 syncasync

{ "DBClusters": [ { "DBClusterIdentifier": "my-cluster", ... "ActivityStreamKinesisStreamName": "aws-rds-das-cluster-A6TSYXITZCZXJHIRVFUBZ5LTWY", "ActivityStreamStatus": "starting", "ActivityStreamKmsKeyId": "12345678-abcd-efgh-ijkl-bd041f170262", "ActivityStreamMode": "async", "DbClusterResourceId": "cluster-ABCD123456" ... } ] }

停止活动流

您可以使用控制台或 AWS CLI 停止活动流。

如果您删除数据库集群,则活动流将停止,并且会自动删除底层 Amazon Kinesis 流。

关闭活动流

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择数据库

  3. 选择要为其停止数据库活动流的数据库集群。

  4. 对于 Actions (操作),选择 Stop activity stream (停止活动流)。此时将显示 Database Activity Stream (数据库活动流) 窗口。

    1. 选择立即应用

      如果您选择 Schedule for the next maintenance window (下一个维护时段的计划),则数据库不会立即重启。它仍然处于 PENDING REBOOT 状态。在这种情况下,不会禁用数据活动流,直到下一个维护时段。

    2. 选择继续

要为数据库集群停止数据库活动流,请使用 AWS CLI 命令 stop-activity-stream 配置数据库集群。使用 --region 参数标识数据库集群的 AWS 区域。--apply-immediately 参数是可选的。

对于 Linux、macOS 或 Unix:

aws rds --region MY_REGION \ stop-activity-stream \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately

对于 Windows:

aws rds --region MY_REGION ^ stop-activity-stream ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately

监控数据库活动流

数据库活动流监控并报告数据库上的活动,如下所述。

收集活动流并将其传输到 Amazon Kinesis。从 Kinesis 中,您可以监控活动流,或者其他服务和应用程序可以使用活动流进行进一步分析。您可以使用 AWS CLI 命令 describe-db-clusters 或 RDS API DescribeDBClusters 操作查找底层 Kinesis 流名称。

Aurora 为您管理 Kinesis 流。

  • 您不使用现有的 Kinesis 流。Aurora 自动创建具有 24 小时保留期的 Kinesis 流。

  • 如有必要,Aurora 会扩展 Kinesis 流。

  • Aurora 将重新创建 Kinesis 流(如果您删除了它)。

  • 如果您停止了数据库活动流或删除了数据库集群,则 Aurora 将删除 Kinesis 流。

监控以下类别的活动并将其放入活动流审核日志中:

  • SQL 命令 – 将审核所有 SQL 命令,以及预编译语句、内置函数和采用 SQL 过程语言 (PL/SQL) 的函数。将审核对存储过程的调用。还将审核在存储过程或函数中发出的任何 SQL 语句。

  • 其他数据库信息 – 受监控的活动包括完整的 SQL 语句、来自 DML 命令的受影响行的行数、访问的对象以及唯一的数据库名称。对于 Aurora PostgreSQL,数据库活动流还监控绑定变量和存储过程参数。

    重要

    每个语句的完整 SQL 文本在活动流审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

    ALTER ROLE role-name WITH password
  • 连接信息 – 受监控的活动包括会话和网络信息、服务器进程 ID 和退出代码。

如果在监控数据库实例时活动流发生故障,则会使用 RDS 事件通知您。如果发生故障,您可以关闭数据库实例或让它继续。

从 Kinesis 中访问活动流

在为数据库集群启用活动流时,将会为您创建一个 Kinesis 流。您可以从 Kinesis 实时监控数据库活动。要进一步分析数据库活动,您可以将 Kinesis 流连接到使用者应用程序。您还可以将该流连接到合规性管理应用程序。

从 Kinesis 中访问活动流

  1. 通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 从 Kinesis 流列表中选择您的活动流。

    活动流的名称包含 aws-rds-das- 前缀,后跟数据库集群的资源 ID。以下是示例。

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    要使用 Amazon RDS 控制台查找数据库集群的资源 ID,请从数据库列表中选择您的数据库集群,然后选择 Configuration (配置) 选项卡。

    要使用 AWS CLI 查找活动流的完整 Kinesis 流名称,请使用 describe-db-clusters CLI 请求并记下响应中的 ActivityStreamKinesisStreamName 值。

  3. 选择监控以开始观察数据库活动。

有关使用 Amazon Kinesis 的更多信息,请参阅什么是 Amazon Kinesis Data Streams?

审核日志内容和示例

受监控的数据库活动事件在 Kinesis 活动流中表示为 JSON 字符串。结构包含一个 JSON 对象,该对象包含一个 DatabaseActivityMonitoringRecord,后者反过来包含活动事件的 databaseActivityEventList 阵列。

数据库活动流审核日志的示例

以下是活动事件记录的已解密 JSON 审核日志示例。

例 Aurora PostgreSQL CONNECT SQL 语句的活动事件记录

以下是通过 psql 客户端 (clientApplication) 使用 CONNECT SQL 语句 (command) 进行的登录的活动事件记录。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CONNECT SQL 语句的活动事件记录

以下是通过 mysql 客户端 (clientApplication) 使用 CONNECT SQL 语句 (command) 进行的登录的活动事件记录。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

例 Aurora PostgreSQL CREATE TABLE 语句的活动事件记录

以下是 Aurora PostgreSQL 的 CREATE TABLE 事件的示例。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CREATE TABLE 语句的活动事件记录

以下是 Aurora MySQL 的 CREATE TABLE 语句的示例。操作表示为两个单独的事件记录。一个事件具有 "class":"MAIN"。另一个事件具有 "class":"AUX"。消息可能按任何顺序到达。MAIN 事件的 logTime 字段始终早于任何对应 AUX 事件的 logTime 字段。

以下示例显示 class 值为 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

以下示例显示 class 值为 AUX 的相应事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

例 Aurora PostgreSQL SELECT 语句的活动事件记录

以下是 SELECT 事件的示例。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL SELECT 语句的活动事件记录

以下是 SELECT 事件的示例。

以下示例显示 class 值为 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

以下示例显示 class 值为 AUX 的相应事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

数据库活动监视记录 JSON 对象

数据库活动事件记录位于包含以下信息的 JSON 对象中。

JSON 字段 数据类型 说明

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecords

version 字符串 数据库活动监控记录的版本。所生成数据库活动记录的版本取决于数据库集群的引擎版本。
  • 对于运行引擎版本 10.10 和更高次要版本以及引擎版本 11.5 和更高版本的 Aurora PostgreSQL 数据库集群,生成版本 1.1 的数据库活动记录。

  • 对于运行引擎版本 10.7 和 11.4 的 Aurora PostgreSQL 数据库集群,生成版本 1.0 的数据库活动记录。

除非另有注明,否则版本 1.0 和版本 1.1 中均包含以下所有字段。

databaseActivityEvents

字符串

包含活动事件的 JSON 对象。

key 字符串 用于解密 databaseActivityEventList databaseActivityEventList JSON 数组的加密密钥。

databaseActivityEvents JSON 对象

databaseActivityEvents JSON 对象包含以下信息。

JSON 记录中的顶级字段

审核日志中的每个事件都包装在 JSON 格式的记录中。此记录包含以下字段。

type

此字段始终具有值 DatabaseActivityMonitoringRecords

version

此字段表示 DAS 数据协议或合同的版本。它定义哪些字段可用。版本 1.0 表示对 Aurora PostgreSQL 10.7 和 11.4 版的原始数据活动流支持。版本 1.1 表示对 Aurora PostgreSQL 10.10 版及更高版本和 Aurora PostgreSQL 11.5 版及更高版本的数据活动流支持。版本 1.1 包括其他字段 errorMessagestartTime。版本 1.2 表示对 Aurora MySQL 2.08 及更高版本的数据活动流支持。版本 1.2 包括其他字段 endTimetransactionId

databaseActivityEvents

表示一个或多个活动事件的加密字符串。它表示为 base64 字节数组。解密字符串时,结果是 JSON 格式的记录,其中包含字段,如本节中的示例所示。

key

用于加密 databaseActivityEvents 字符串的加密数据密钥。这与您在启动数据库活动流时提供的 AWS KMS 密钥相同。

以下示例显示了此记录的格式。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

执行以下步骤来解密 databaseActivityEvents 字段的内容:

  1. 使用您在启动数据库活动流时提供的 AWS KMS 密钥解密 key JSON 字段中的值。这样做将以明文形式返回数据加密密钥。

  2. databaseActivityEvents JSON 字段中的值进行 Base64 解码,以获取审核负载的二进制格式的密文。

  3. 使用您在第一步中解码的数据加密密钥解密二进制密文。

  4. 解压解已解密的负载。

    • 已加密的负载在 databaseActivityEvents 字段中。

    • databaseActivityEventList 字段包含审核记录数组。此数组中的 type 字段可以是 recordheartbeat

审核日志活动事件记录是包含以下信息的 JSON 对象。

JSON 字段 数据类型 说明

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecord

clusterId 字符串 数据库集群资源标识符。它对应于数据库集群属性 DbClusterResourceId
instanceId 字符串 数据库实例资源标识符。它对应于数据库实例属性 DbiResourceId

databaseActivityEventList

字符串

活动审核记录或检测信号消息的数组。

databaseActivityEventList JSON 数组

审核日志负载是解密的 databaseActivityEventList JSON 数组。下表按字母顺序列出了审核日志的解密 DatabaseActivityEventList 数组中每个活动事件的字段。这些字段会有所不同,具体取决于您使用的是 Aurora PostgreSQL 还是 Aurora MySQL。请参阅适用于您的数据库引擎的表。

重要

事件结构可能会发生变化。Aurora 可能会将新字段添加到未来的活动事件中。在解析 JSON 数据的应用程序中,请确保您的代码可以忽略未知字段名称或对其采取适当操作。

Aurora PostgreSQL 的 databaseActivityEventList 字段
字段 数据类型 说明
class 字符串

活动事件的类。Aurora PostgreSQL 的有效值如下所示:

  • ALL

  • CONNECT – 连接或断开连接事件。

  • DDL – 未包含在 ROLE 类的语句列表中的 DDL 语句。

  • FUNCTION – 函数调用或 DO 块。

  • MISC – 其他命令,例如 DISCARDFETCHCHECKPOINTVACUUM

  • NONE

  • READSELECTCOPY 语句(当源为关系或查询时)。

  • ROLE – 与角色或权限关联的语句,包括 GRANTREVOKECREATE/ALTER/DROP ROLE

  • WRITEINSERTUPDATEDELETETRUNCATECOPY 语句(当目标为关系时)。

clientApplication 字符串 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。
command 字符串 不带任何命令详细信息的 SQL 命令的名称。
commandText 字符串

用户传入的实际 SQL 语句。对于 Aurora PostgreSQL,该值与原始 SQL 语句相同。此字段用于除连接或断开连接记录之外的所有类型的记录,在这种情况下,该值为 null。

重要

每个语句的完整 SQL 文本在活动流审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

ALTER ROLE role-name WITH password
databaseName 字符串 用户连接到的数据库。
dbProtocol 字符串 数据库协议,例如 Postgres 3.0
dbUserName 字符串 客户端对其进行身份验证的数据库用户。
errorMessage

(仅版本 1.1 数据库活动记录)

字符串

如果出现任何错误,则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句,errorMessage 值为 null。

错误定义为生成客户端可见 PostgreSQL 错误日志事件(其严重性级别为 ERROR 或更高)的任何活动。有关更多信息,请参阅 PostgreSQL 消息严重性级别。例如,语法错误和查询取消会生成错误消息。

内部 PostgreSQL 服务器错误(例如后台检查指针进程错误)不会生成错误消息。但是,无论如何设置日志严重性级别,此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。

另请参阅 exitCode 字段。

exitCode int 用于会话退出记录的值。在干净的出口,这包含退出代码。在某些故障场景中,无法始终获得退出代码。例如,如果 PostgreSQL 执行 exit() 或操作者执行 kill -9 等命令。

如果存在任何错误,则 exitCode 字段将显示 SQL 错误代码 SQLSTATE,如 PostgreSQL 错误代码中所列。

另请参阅 errorMessage 字段。

logTime 字符串 审核代码路径中记录的时间戳。这表示 SQL 语句执行结束时间。另请参阅 startTime 字段。
netProtocol 字符串 网络通信协议。
objectName 字符串 数据库对象的名称(如果正在对一个数据库对象运行 SQL 语句)。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。
objectType 字符串 数据库对象类型,例如表、索引、视图等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。包括下列有效值:
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList 字符串 传递给 SQL 语句的逗号分隔的参数数组。如果 SQL 语句没有参数,则此值为空数组。
pid int 为向客户端连接提供服务而分配的后端进程的进程 ID。
remoteHost 字符串 客户端 IP 地址或主机名。对于 Aurora PostgreSQL,使用哪一个取决于数据库的 log_hostname 参数设置。
remotePort 字符串 客户端的端口号。
rowCount int SQL 语句所影响或检索的表行的数目。此字段仅用于作为数据操作语言 (DML) 语句的 SQL 语句。如果 SQL 语句不是 DML 语句,则此值为 null。
serverHost 字符串 数据库服务器主机 IP 地址。
serverType 字符串 数据库服务器类型,例如 PostgreSQL
serverVersion 字符串 数据库服务器版本,例如对于 Aurora PostgreSQL 为 2.3.1
serviceName 字符串 服务名称,例如 Amazon Aurora PostgreSQL-Compatible edition
sessionId int 伪唯一会话标识符。
startTime

(仅版本 1.1 数据库活动记录)

字符串

SQL 语句开始执行的时间。

要计算 SQL 语句的近似执行时间,请使用 logTime – startTime。另请参阅 logTime 字段。

statementId int 客户端的 SQL 语句的标识符。计数器处于会话级别,并随客户端输入的每个 SQL 语句递增。
substatementId int SQL 子语句的标识符。此值计算 statementId 字段标识的每个 SQL 语句的包含的子语句。
type 字符串 事件类型。有效值为 recordheartbeat
Aurora MySQL 的 databaseActivityEventList 字段
字段 数据类型 说明
class 字符串

活动事件的类。

Aurora MySQL 的有效值如下所示:

  • MAIN – 表示 SQL 语句的主事件。

  • AUX – 包含其他详细信息的补充事件。例如,重命名对象的语句可能具有反映新名称的类为 AUX 的事件。

    要查找对应于同一语句的 MAINAUX 事件,请检查具有相同的 pid 字段值和 statementId 字段值的不同事件。

clientApplication 字符串 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。
command 字符串

SQL 语句的常规类别。此字段的值取决于 class 的值。

classMAIN 时的值包括以下值:

  • CONNECT – 连接客户端会话时。

  • QUERY – SQL 语句。附带一个或多个 class 值为 AUX 的事件。

  • DISCONNECT – 客户端会话断开连接时。

  • FAILED_CONNECT – 客户端尝试连接但无法连接时。

  • CHANGEUSER – 属于 MySQL 网络协议的一部分但不是来自您发出的语句的状态更改。

classAUX 时的值包括以下值:

  • READSELECTCOPY 语句(当源为关系或查询时)。

  • WRITEINSERTUPDATEDELETETRUNCATECOPY 语句(当目标为关系时)。

  • DROP – 删除对象。

  • CREATE – 创建对象。

  • RENAME – 重命名对象。

  • ALTER – 更改对象的属性。

commandText 字符串

对于 class 值为 MAIN 的事件,此字段表示用户传入的实际 SQL 语句。此字段用于除连接或断开连接记录之外的所有类型的记录,在这种情况下,该值为 null。

对于 class 值为 AUX 的事件,此字段包含有关事件中涉及的对象的补充信息。

对于 Aurora MySQL,字符(如引号)前面有反斜杠,表示转义字符。

重要

每个语句的完整 SQL 文本在审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

mysql> SET PASSWORD = 'my-password';
databaseName 字符串 用户连接到的数据库。
dbProtocol 字符串 数据库协议。目前,对于 Aurora MySQL,此值始终为 MySQL
dbUserName 字符串 客户端对其进行身份验证的数据库用户。
endTime

(仅版本 1.2 数据库活动记录)

字符串

SQL 语句执行结束的时间。它以协调世界时 (UTC) 格式表示。

要计算 SQL 语句的执行时间,请使用 endTime – startTime。另请参阅 startTime 字段。

errorMessage

(仅版本 1.1 数据库活动记录)

字符串

如果出现任何错误,则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句,errorMessage 值为 null。

错误定义为生成客户端可见 MySQL 错误日志事件(其严重性级别为 ERROR 或更高)的任何活动。有关更多信息,请参阅 MySQL 参考手册 中的错误日志。例如,语法错误和查询取消会生成错误消息。

内部 MySQL 服务器错误(例如后台检查指针进程错误)不会生成错误消息。但是,无论如何设置日志严重性级别,此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。

另请参阅 exitCode 字段。

exitCode int 用于会话退出记录的值。在干净的出口,这包含退出代码。在某些故障场景中,无法始终获得退出代码。在此类情况下,此值可能为零,也可能为空。
logTime 字符串 审核代码路径中记录的时间戳。它以协调世界时 (UTC) 格式表示。有关计算语句持续时间的最准确方法,请参阅 startTimeendTime 字段。
netProtocol 字符串 网络通信协议。目前,对于 Aurora MySQL,此值始终为 TCP
objectName 字符串 数据库对象的名称(如果正在对一个数据库对象运行 SQL 语句)。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为空。要构造对象的完全限定名称,请将 databaseNameobjectName 组合起来。如果查询涉及多个对象,则此字段可以是名称的逗号分隔列表。
objectType 字符串

数据库对象类型,例如表、索引等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。

Aurora MySQL 的有效值包括:

  • INDEX

  • TABLE

  • UNKNOWN

paramList 字符串 此字段不用于 Aurora MySQL 且始终为 null。
pid int 为向客户端连接提供服务而分配的后端进程的进程 ID。当数据库服务器重新启动时,pid 会发生更改,并且 statementId 字段的计数器会重新开始。
remoteHost 字符串 发出 SQL 语句的客户端的 IP 地址或主机名。对于 Aurora MySQL,使用哪一个取决于数据库的 skip_name_resolve 参数设置。值 localhost 指示来自 rdsadmin 特殊用户的活动。
remotePort 字符串 客户端的端口号。
rowCount int SQL 语句所影响或检索的表行的数目。此字段仅用于作为数据操作语言 (DML) 语句的 SQL 语句。如果 SQL 语句不是 DML 语句,则此值为 null。
serverHost 字符串 数据库服务器实例标识符。对于 Aurora MySQL 与 Aurora PostgreSQL,此值的表示方式不同。Aurora PostgreSQL 使用 IP 地址而不是标识符。
serverType 字符串 数据库服务器类型,例如 MySQL
serverVersion 字符串 数据库服务器版本。目前,对于 Aurora MySQL,此值始终为 MySQL 5.7.12
serviceName 字符串 服务的名称。目前,对于 Aurora MySQL,此值始终为 Amazon Aurora MySQL
sessionId int 伪唯一会话标识符。
startTime

(仅版本 1.1 数据库活动记录)

字符串

SQL 语句开始执行的时间。它以协调世界时 (UTC) 格式表示。

要计算 SQL 语句的执行时间,请使用 endTime – startTime。另请参阅 endTime 字段。

statementId int 客户端的 SQL 语句的标识符。计数器随客户端输入的每个 SQL 语句递增。在重新启动数据库实例时,将重置计数器。
substatementId int SQL 子语句的标识符。对于类为 MAIN 的事件,此值为 1;对于类为 AUX 的事件,此值为 2。使用 statementId 字段标识同一语句生成的所有事件。
transactionId

(仅版本 1.2 数据库活动记录)

int 事务的标识符。
type 字符串 事件类型。有效值为 recordheartbeat

使用 AWS 开发工具包处理活动流

您可以使用 AWS 开发工具包以编程方式处理活动流。以下是有关如何处理 Kinesis 数据流的功能完善的 Java 和 Python 示例。

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = aws_encryption_sdk.decrypt( source=payload, materials_manager=aws_encryption_sdk.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()

管理数据库活动流访问

具有数据库活动流的相应 AWS Identity and Access Management (IAM) 角色权限的任何用户均可以创建、启动、停止和修改数据库集群的活动流设置。这些操作包含在流的审核日志中。对于最佳合规性实践,我们建议您不要向 DBA 提供这些权限。

您可以使用 IAM 策略设置对数据库活动流的访问权限。有关 Aurora 身份验证的更多信息,请参阅 Amazon Aurora 中的 Identity and Access Management。有关创建 IAM 策略的更多信息,请参阅 创建和使用适用于 IAM 数据库访问的 IAM 策略

例 允许配置数据库活动流的策略

要为用户提供精细访问权限以修改活动流,请在 IAM 策略中使用服务特定的操作上下文密钥 rds:StartActivityStreamrds:StopActivityStream 。以下 IAM 策略示例允许用户或角色配置活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"ConfigureActivityStreams", "Effect":"Allow", "Action": [ "rds:StartActivityStream", "rds:StopActivityStream" ], "Resource":"*", } ] }

例 允许启动数据库活动流的策略

以下 IAM 策略示例允许用户或角色启动活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStartActivityStreams", "Effect":"Allow", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 允许停止数据库活动流的策略

以下 IAM 策略示例允许用户或角色停止活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStopActivityStreams", "Effect":"Allow", "Action":"rds:StopActivityStream", "Resource":"*" } ] }

例 拒绝启动数据库活动流的策略

以下 IAM 策略示例阻止用户或角色启动活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStartActivityStreams", "Effect":"Deny", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 拒绝停止数据库活动流的策略

以下 IAM 策略示例阻止用户或角色停止活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStopActivityStreams", "Effect":"Deny", "Action":"rds:StopActivityStream", "Resource":"*" } ] }