监控数据库活动流 - Amazon Aurora
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

监控数据库活动流

数据库活动流监控并报告数据库上的活动,如下所述。

收集活动流并将其传输到 Amazon Kinesis。从 Kinesis 中,您可以监控活动流,或者其他服务和应用程序可以使用活动流进行进一步分析。您可以使用 Amazon CLI 命令 describe-db-clusters 或 RDS API DescribeDBClusters 操作查找底层 Kinesis 流名称。

Aurora 为您管理 Kinesis 流。

  • 您不使用现有的 Kinesis 流。Aurora 自动创建具有 24 小时保留期的 Kinesis 流。

  • 如有必要,Aurora 会扩展 Kinesis 流。

  • 如果您停止了数据库活动流或删除了数据库集群,则 Aurora 将删除 Kinesis 流。

监控以下类别的活动并将其放入活动流审核日志中:

  • SQL 命令 – 将审核所有 SQL 命令,以及预编译语句、内置函数和采用 SQL 过程语言 (PL/SQL) 的函数。将审核对存储过程的调用。还将审核在存储过程或函数中发出的任何 SQL 语句。

  • 其他数据库信息 – 受监控的活动包括完整的 SQL 语句、来自 DML 命令的受影响行的行数、访问的对象以及唯一的数据库名称。对于 Aurora PostgreSQL,数据库活动流还监控绑定变量和存储过程参数。

    重要

    每个语句的完整 SQL 文本在活动流审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

    ALTER ROLE role-name WITH password
  • 连接信息 – 受监控的活动包括会话和网络信息、服务器进程 ID 和退出代码。

如果在监控数据库实例时活动流发生故障,则会使用 RDS 事件通知您。如果发生故障,您可以关闭数据库实例或让它继续。

从 Kinesis 中访问活动流

在为数据库集群启用活动流时,将会为您创建一个 Kinesis 流。您可以从 Kinesis 实时监控数据库活动。要进一步分析数据库活动,您可以将 Kinesis 流连接到使用者应用程序。您还可以将该流连接到合规性管理应用程序。

从 Kinesis 中访问活动流

  1. 通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 从 Kinesis 流列表中选择您的活动流。

    活动流的名称包含 aws-rds-das- 前缀,后跟数据库集群的资源 ID。以下是示例。

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    要使用 Amazon RDS 控制台查找数据库集群的资源 ID,请从数据库列表中选择您的数据库集群,然后选择 Configuration (配置) 选项卡。

    要使用 Amazon CLI 查找活动流的完整 Kinesis 流名称,请使用 describe-db-clusters CLI 请求并记下响应中的 ActivityStreamKinesisStreamName 值。

  3. 选择监控以开始观察数据库活动。

有关使用 Amazon Kinesis 的更多信息,请参阅什么是 Amazon Kinesis Data Streams?

审计日志内容和示例

受监控的数据库活动事件在 Kinesis 活动流中表示为 JSON 字符串。结构包含一个 JSON 对象,该对象包含一个 DatabaseActivityMonitoringRecord,后者反过来包含活动事件的 databaseActivityEventList 阵列。

数据库活动流审计日志的示例

以下是活动事件记录的已解密 JSON 审核日志示例。

例 Aurora PostgreSQL CONNECT SQL 语句的活动事件记录

以下是通过 psql 客户端 (clientApplication) 使用 CONNECT SQL 语句 (command) 进行的登录的活动事件记录。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CONNECT SQL 语句的活动事件记录

以下是通过 mysql 客户端 (clientApplication) 使用 CONNECT SQL 语句 (command) 进行的登录的活动事件记录。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

例 Aurora PostgreSQL CREATE TABLE 语句的活动事件记录

以下是 Aurora PostgreSQL 的 CREATE TABLE 事件的示例。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CREATE TABLE 语句的活动事件记录

以下是 Aurora MySQL 的 CREATE TABLE 语句的示例。操作表示为两个单独的事件记录。一个事件具有 "class":"MAIN"。另一个事件具有 "class":"AUX"。消息可能按任何顺序到达。MAIN 事件的 logTime 字段始终早于任何对应 AUX 事件的 logTime 字段。

以下示例显示 class 值为 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

以下示例显示 class 值为 AUX 的相应事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

例 Aurora PostgreSQL SELECT 语句的活动事件记录

以下是 SELECT 事件的示例。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL SELECT 语句的活动事件记录

以下是 SELECT 事件的示例。

以下示例显示 class 值为 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

以下示例显示 class 值为 AUX 的相应事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

数据库活动监控记录 JSON 对象

数据库活动事件记录位于包含以下信息的 JSON 对象中。

JSON 字段 数据类型 描述

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecords

version 字符串 数据库活动监控记录的版本。所生成数据库活动记录的版本取决于数据库集群的引擎版本。
  • 对于运行引擎版本 10.10 和更高次要版本以及引擎版本 11.5 和更高版本的 Aurora PostgreSQL 数据库集群,生成版本 1.1 的数据库活动记录。

  • 对于运行引擎版本 10.7 和 11.4 的 Aurora PostgreSQL 数据库集群,生成版本 1.0 的数据库活动记录。

除非另有注明,否则版本 1.0 和版本 1.1 中均包含以下所有字段。

databaseActivityEvents

字符串

包含活动事件的 JSON 对象。

key 字符串 用于解密 databaseActivityEventList databaseActivityEventList JSON 数组的加密密钥。

databaseActivityEvents JSON 对象

databaseActivityEvents JSON 对象包含以下信息。

JSON 记录中的顶级字段

审核日志中的每个事件都包装在 JSON 格式的记录中。此记录包含以下字段。

类型

此字段始终具有值 DatabaseActivityMonitoringRecords

版本

此字段表示数据库活动流数据协议或合同的版本。它定义哪些字段可用。版本 1.0 表示对 Aurora PostgreSQL 10.7 和 11.4 版的原始数据活动流支持。版本 1.1 表示对 Aurora PostgreSQL 10.10 版及更高版本和 Aurora PostgreSQL 11.5 版及更高版本的数据活动流支持。版本 1.1 包括其他字段 errorMessagestartTime。版本 1.2 表示对 Aurora MySQL 2.08 及更高版本的数据活动流支持。版本 1.2 包括其他字段 endTimetransactionId

databaseActivityEvents

表示一个或多个活动事件的加密字符串。它表示为 base64 字节数组。解密字符串时,结果是 JSON 格式的记录,其中包含字段,如本节中的示例所示。

key

用于加密 databaseActivityEvents 字符串的加密数据密钥。这与您在启动数据库活动流时提供的 Amazon KMS 客户主密钥 (CMK) 相同。

以下示例显示了此记录的格式。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

执行以下步骤来解密 databaseActivityEvents 字段的内容:

  1. 使用您在启动数据库活动流时提供的 Amazon KMS CMK 解密 key JSON 字段中的值。这样做将以明文形式返回数据加密密钥。

  2. databaseActivityEvents JSON 字段中的值进行 Base64 解码,以获取审核负载的二进制格式的密文。

  3. 使用您在第一步中解码的数据加密密钥解密二进制密文。

  4. 解压解已解密的负载。

    • 已加密的负载在 databaseActivityEvents 字段中。

    • databaseActivityEventList 字段包含审核记录数组。此数组中的 type 字段可以是 recordheartbeat

审核日志活动事件记录是包含以下信息的 JSON 对象。

JSON 字段 数据类型 描述

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecord

clusterId 字符串 数据库集群资源标识符。它对应于数据库集群属性 DbClusterResourceId
instanceId 字符串 数据库实例资源标识符。它对应于数据库实例属性 DbiResourceId

databaseActivityEventList

字符串

活动审核记录或检测信号消息的数组。

databaseActivityEventList JSON 数组

审核日志负载是解密的 databaseActivityEventList JSON 数组。下表按字母顺序列出了审核日志的解密 DatabaseActivityEventList 数组中每个活动事件的字段。这些字段会有所不同,具体取决于您使用的是 Aurora PostgreSQL 还是 Aurora MySQL。请参阅适用于您的数据库引擎的表。

重要

事件结构可能会发生变化。Aurora 可能会将新字段添加到未来的活动事件中。在解析 JSON 数据的应用程序中,请确保您的代码可以忽略未知字段名称或对其采取适当操作。

Aurora PostgreSQL 的 databaseActivityEventList 字段
字段 数据类型 描述
class 字符串

活动事件的类。Aurora PostgreSQL 的有效值如下所示:

  • ALL

  • CONNECT – 连接或断开连接事件。

  • DDL – 未包含在 ROLE 类的语句列表中的 DDL 语句。

  • FUNCTION – 函数调用或 DO 块。

  • MISC – 其他命令,例如 DISCARDFETCHCHECKPOINTVACUUM

  • NONE

  • READSELECTCOPY 语句(当源为关系或查询时)。

  • ROLE – 与角色或权限关联的语句,包括 GRANTREVOKECREATE/ALTER/DROP ROLE

  • WRITEINSERTUPDATEDELETETRUNCATECOPY 语句(当目标为关系时)。

clientApplication 字符串 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。
command 字符串 不带任何命令详细信息的 SQL 命令的名称。
commandText 字符串

用户传入的实际 SQL 语句。对于 Aurora PostgreSQL,该值与原始 SQL 语句相同。此字段用于除连接或断开连接记录之外的所有类型的记录,在这种情况下,该值为 null。

重要

每个语句的完整 SQL 文本在活动流审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

ALTER ROLE role-name WITH password
databaseName 字符串 用户连接到的数据库。
dbProtocol 字符串 数据库协议,例如 Postgres 3.0
dbUserName 字符串 客户端对其进行身份验证的数据库用户。
errorMessage 字符串

如果出现任何错误,则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句,errorMessage 值为 null。仅 1.1 版数据库活动记录中使用此字段。

错误定义为生成客户端可见 PostgreSQL 错误日志事件(其严重性级别为 ERROR 或更高)的任何活动。有关更多信息,请参阅 PostgreSQL 消息严重性级别。例如,语法错误和查询取消会生成错误消息。

内部 PostgreSQL 服务器错误(例如后台检查指针进程错误)不会生成错误消息。但是,无论如何设置日志严重性级别,此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。

另请参阅 exitCode 字段。

exitCode int 用于会话退出记录的值。在干净的出口,这包含退出代码。在某些故障场景中,无法始终获得退出代码。例如,如果 PostgreSQL 执行 exit() 或操作者执行 kill -9 等命令。

如果存在任何错误,则 exitCode 字段将显示 SQL 错误代码 SQLSTATE,如 PostgreSQL 错误代码中所列。

另请参阅 errorMessage 字段。

logTime 字符串 审核代码路径中记录的时间戳。这表示 SQL 语句执行结束时间。另请参阅 startTime 字段。
netProtocol 字符串 网络通信协议。
objectName 字符串 数据库对象的名称(如果正在对一个数据库对象运行 SQL 语句)。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。
objectType 字符串 数据库对象类型,例如表、索引、视图等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。包括下列有效值:
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList 字符串 传递给 SQL 语句的逗号分隔的参数数组。如果 SQL 语句没有参数,则此值为空数组。
pid int 为向客户端连接提供服务而分配的后端进程的进程 ID。
remoteHost 字符串 客户端 IP 地址或主机名。对于 Aurora PostgreSQL,使用哪一个取决于数据库的 log_hostname 参数设置。
remotePort 字符串 客户端的端口号。
rowCount int SQL 语句返回的行数。例如,如果 SELECT 语句返回 10 行,则 rowCount 为 10。对于 INSERT 或 UPDATE 语句,则 rowCount 为 0。
serverHost 字符串 数据库服务器主机 IP 地址。
serverType 字符串 数据库服务器类型,例如 PostgreSQL
serverVersion 字符串 数据库服务器版本,例如对于 Aurora PostgreSQL 为 2.3.1
serviceName 字符串 服务名称,例如 Amazon Aurora PostgreSQL-Compatible edition
sessionId int 伪唯一会话标识符。
startTime 字符串

SQL 语句开始执行的时间。仅 1.1 版数据库活动记录中使用此字段。

要计算 SQL 语句的近似执行时间,请使用 logTime - startTime。另请参阅 logTime 字段。

statementId int 客户端的 SQL 语句的标识符。计数器处于会话级别,并随客户端输入的每个 SQL 语句递增。
substatementId int SQL 子语句的标识符。此值计算 statementId 字段标识的每个 SQL 语句的包含的子语句。
type 字符串 事件类型。有效值为 recordheartbeat
Aurora MySQL 的 databaseActivityEventList 字段
字段 数据类型 描述
class 字符串

活动事件的类。

Aurora MySQL 的有效值如下所示:

  • MAIN – 表示 SQL 语句的主事件。

  • AUX – 包含其他详细信息的补充事件。例如,重命名对象的语句可能具有反映新名称的类为 AUX 的事件。

    要查找对应于同一语句的 MAINAUX 事件,请检查具有相同的 pid 字段值和 statementId 字段值的不同事件。

clientApplication 字符串 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。
command 字符串

SQL 语句的常规类别。此字段的值取决于 class 的值。

classMAIN 时的值包括以下值:

  • CONNECT – 连接客户端会话时。

  • QUERY – SQL 语句。附带一个或多个 class 值为 AUX 的事件。

  • DISCONNECT – 客户端会话断开连接时。

  • FAILED_CONNECT – 客户端尝试连接但无法连接时。

  • CHANGEUSER – 属于 MySQL 网络协议的一部分但不是来自您发出的语句的状态更改。

classAUX 时的值包括以下值:

  • READSELECTCOPY 语句(当源为关系或查询时)。

  • WRITEINSERTUPDATEDELETETRUNCATECOPY 语句(当目标为关系时)。

  • DROP – 删除对象。

  • CREATE – 创建对象。

  • RENAME – 重命名对象。

  • ALTER – 更改对象的属性。

commandText 字符串

对于 class 值为 MAIN 的事件,此字段表示用户传入的实际 SQL 语句。此字段用于除连接或断开连接记录之外的所有类型的记录,在这种情况下,该值为 null。

对于 class 值为 AUX 的事件,此字段包含有关事件中涉及的对象的补充信息。

对于 Aurora MySQL,字符(如引号)前面有反斜杠,表示转义字符。

重要

每个语句的完整 SQL 文本在审核日志中可见,包括任何敏感数据。但是,如果 Aurora 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

mysql> SET PASSWORD = 'my-password';
databaseName 字符串 用户连接到的数据库。
dbProtocol 字符串 数据库协议。目前,对于 Aurora MySQL,此值始终为 MySQL
dbUserName 字符串 客户端对其进行身份验证的数据库用户。
endTime 字符串

SQL 语句执行结束的时间。它以协调世界时 (UTC) 格式表示。仅 1.2 版数据库活动记录中存在此字段。

要计算 SQL 语句的执行时间,请使用 endTime - startTime。另请参阅 startTime 字段。

errorMessage 字符串

如果出现任何错误,则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句,errorMessage 值为 null。仅 1.1 版数据库活动记录中使用此字段。

错误定义为生成客户端可见 MySQL 错误日志事件(其严重性级别为 ERROR 或更高)的任何活动。有关更多信息,请参阅 MySQL 参考手册 中的错误日志。例如,语法错误和查询取消会生成错误消息。

内部 MySQL 服务器错误(例如后台检查指针进程错误)不会生成错误消息。但是,无论如何设置日志严重性级别,此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。

另请参阅 exitCode 字段。

exitCode int 用于会话退出记录的值。在干净的出口,这包含退出代码。在某些故障场景中,无法始终获得退出代码。在此类情况下,此值可能为零,也可能为空。
logTime 字符串 审核代码路径中记录的时间戳。它以协调世界时 (UTC) 格式表示。有关计算语句持续时间的最准确方法,请参阅 startTimeendTime 字段。
netProtocol 字符串 网络通信协议。目前,对于 Aurora MySQL,此值始终为 TCP
objectName 字符串 数据库对象的名称(如果正在对一个数据库对象运行 SQL 语句)。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为空。要构造对象的完全限定名称,请将 databaseNameobjectName 组合起来。如果查询涉及多个对象,则此字段可以是名称的逗号分隔列表。
objectType 字符串

数据库对象类型,例如表、索引等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。

Aurora MySQL 的有效值包括:

  • INDEX

  • TABLE

  • UNKNOWN

paramList 字符串 此字段不用于 Aurora MySQL 且始终为 null。
pid int 为向客户端连接提供服务而分配的后端进程的进程 ID。当数据库服务器重新启动时,pid 会发生更改,并且 statementId 字段的计数器会重新开始。
remoteHost 字符串 发出 SQL 语句的客户端的 IP 地址或主机名。对于 Aurora MySQL,使用哪一个取决于数据库的 skip_name_resolve 参数设置。值 localhost 指示来自 rdsadmin 特殊用户的活动。
remotePort 字符串 客户端的端口号。
rowCount int SQL 语句所影响或检索的表行的数目。此字段仅用于作为数据操作语言 (DML) 语句的 SQL 语句。如果 SQL 语句不是 DML 语句,则此值为 null。
serverHost 字符串 数据库服务器实例标识符。对于 Aurora MySQL 与 Aurora PostgreSQL,此值的表示方式不同。Aurora PostgreSQL 使用 IP 地址而不是标识符。
serverType 字符串 数据库服务器类型,例如 MySQL
serverVersion 字符串 数据库服务器版本。目前,对于 Aurora MySQL,此值始终为 MySQL 5.7.12
serviceName 字符串 服务的名称。目前,对于 Aurora MySQL,此值始终为 Amazon Aurora MySQL
sessionId int 伪唯一会话标识符。
startTime 字符串

SQL 语句开始执行的时间。它以协调世界时 (UTC) 格式表示。仅 1.1 版数据库活动记录中使用此字段。

要计算 SQL 语句的执行时间,请使用 endTime - startTime。另请参阅 endTime 字段。

statementId int 客户端的 SQL 语句的标识符。计数器随客户端输入的每个 SQL 语句递增。在重新启动数据库实例时,将重置计数器。
substatementId int SQL 子语句的标识符。对于类为 MAIN 的事件,此值为 1;对于类为 AUX 的事件,此值为 2。使用 statementId 字段标识同一语句生成的所有事件。
transactionId 字符串 事务的标识符。仅 1.2 版数据库活动记录中使用此字段。
type 字符串 事件类型。有效值为 recordheartbeat

使用 Amazon 开发工具包处理活动流

您可以使用 Amazon 开发工具包以编程方式处理活动流。以下是有关如何处理 Kinesis 数据流的功能完善的 Java 和 Python 示例。

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print decrypt_decompress((payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()