监控数据库活动流 - Amazon Relational Database Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

监控数据库活动流

数据库活动流监控并报告活动。收集活动流并将其传输到 Amazon Kinesis。从 Kinesis 中,您可以监控活动流,或者其他服务和应用程序可以使用活动流进行进一步分析。您可以使用 Amazon CLI 命令 describe-db-instances 或 RDS API DescribeDBInstances 操作查找底层 Kinesis 流名称。

Amazon RDS 为您管理 Kinesis 流,如下所示:

  • Amazon RDS 自动创建具有 24 小时保留期的 Kinesis 流。

  • 如有必要,Amazon RDS 会扩展 Kinesis 流。

  • 如果停止了数据库活动流或删除了数据库实例Amazon RDS 将删除 Kinesis 流。

监控以下类别的活动并将其放入活动流审核日志中:

  • SQL 命令 – 将审计所有 SQL 命令,以及预编译语句、内置函数和采用 PL/SQL 的函数。将审核对存储过程的调用。还将审核在存储过程或函数中发出的任何 SQL 语句。

  • 其他数据库信息 – 受监控的活动包括完整的 SQL 语句、来自 DML 命令的受影响行的行数、访问的对象以及唯一的数据库名称。数据库活动流还监控绑定变量和存储过程参数。

    重要

    每个语句的完整 SQL 文本在活动流审核日志中可见,包括任何敏感数据。但是,如果 Oracle 可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

    ALTER ROLE role-name WITH password
  • 连接信息 – 受监控的活动包括会话和网络信息、服务器进程 ID 和退出代码。

如果在监控数据库实例时活动流发生故障,则会使用 RDS 事件通知您。

从 Kinesis 中访问活动流

在为数据库启用活动流时,将会为您创建一个 Kinesis 流。您可以从 Kinesis 实时监控数据库活动。要进一步分析数据库活动,您可以将 Kinesis 流连接到使用者应用程序。您还可以将流连接到合规性管理应用程序,例如 IBM 的 Security Guardium 或 Imperva 的 SecureSphere Database Audit and Protection

您可以从 RDS 控制台或 Kinesis 控制台访问您的 Kinesis 流。

使用 RDS 控制台从 Kinesis 中访问活动流
  1. 通过以下网址打开 Amazon RDS 控制台:https://console.aws.amazon.com/rds/

  2. 在导航窗格中,选择 Databases(数据库)。

  3. 选择已启动活动流的 Amazon RDS 数据库实例

  4. 选择配置

  5. Database activity stream(数据库活动流)下,选择 Kinesis stream(Kinesis 流)下的链接。

  6. 在 Kinesis 控制台中,选择 Monitoring(监控)以开始观察数据库活动。

使用 Kinesis 控制台从 Kinesis 中访问活动流
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 从 Kinesis 流列表中选择您的活动流。

    活动流的名称包含前缀 aws-rds-das-db-,后跟数据库的资源 ID。以下是示例。

    aws-rds-das-db-NHVOV4PCLWHGF52NP

    要使用 Amazon RDS 控制台查找数据库的资源 ID,请从数据库列表中选择数据库实例,然后选择 Configuration(配置)选项卡。

    要使用 Amazon CLI 查找活动流的完整 Kinesis 流名称,请使用 describe-db-instances CLI 请求并记下响应中的 ActivityStreamKinesisStreamName 值。

  3. 选择监控以开始观察数据库活动。

有关使用 Amazon Kinesis 的更多信息,请参阅什么是 Amazon Kinesis Data Streams?

审计日志内容和示例

受监控的事件在数据库活动流中表示为 JSON 字符串。结构包含一个 JSON 对象,该对象包含一个 DatabaseActivityMonitoringRecord,后者反过来包含活动事件的 databaseActivityEventList 阵列。

活动流的审计日志示例

以下是活动事件记录的已解密 JSON 审核日志示例。

CONNECT SQL 语句的活动事件记录

以下活动事件记录显示 JDBC 瘦客户端clientApplication)使用 CONNECT SQL 语句(command针对 Oracle 数据库进行的登录。

{ "class": "Standard", "clientApplication": "JDBC Thin Client", "command": "LOGON", "commandText": null, "dbid": "0123456789", "databaseName": "ORCL", "dbProtocol": "oracle", "dbUserName": "TEST", "endTime": null, "errorMessage": null, "exitCode": 0, "logTime": "2021-01-15 00:15:36.233787", "netProtocol": "tcp", "objectName": null, "objectType": null, "paramList": [], "pid": 17904, "remoteHost": "123.456.789.012", "remotePort": "25440", "rowCount": null, "serverHost": "987.654.321.098", "serverType": "oracle", "serverVersion": "19.0.0.0.ru-2020-01.rur-2020-01.r1.EE.3", "serviceName": "oracle-ee", "sessionId": 987654321, "startTime": null, "statementId": 1, "substatementId": null, "transactionId": "0000000000000000", "engineNativeAuditFields": { "UNIFIED_AUDIT_POLICIES": "TEST_POL_EVERYTHING", "FGA_POLICY_NAME": null, "DV_OBJECT_STATUS": null, "SYSTEM_PRIVILEGE_USED": "CREATE SESSION", "OLS_LABEL_COMPONENT_TYPE": null, "XS_SESSIONID": null, "ADDITIONAL_INFO": null, "INSTANCE_ID": 1, "DBID": 123456789 "DV_COMMENT": null, "RMAN_SESSION_STAMP": null, "NEW_NAME": null, "DV_ACTION_NAME": null, "OLS_PROGRAM_UNIT_NAME": null, "OLS_STRING_LABEL": null, "RMAN_SESSION_RECID": null, "OBJECT_PRIVILEGES": null, "OLS_OLD_VALUE": null, "XS_TARGET_PRINCIPAL_NAME": null, "XS_NS_ATTRIBUTE": null, "XS_NS_NAME": null, "DBLINK_INFO": null, "AUTHENTICATION_TYPE": "(TYPE\u003d(DATABASE));(CLIENT ADDRESS\u003d((ADDRESS\u003d(PROTOCOL\u003dtcp)(HOST\u003d205.251.233.183)(PORT\u003d25440))));", "OBJECT_EDITION": null, "OLS_PRIVILEGES_GRANTED": null, "EXCLUDED_USER": null, "DV_ACTION_OBJECT_NAME": null, "OLS_LABEL_COMPONENT_NAME": null, "EXCLUDED_SCHEMA": null, "DP_TEXT_PARAMETERS1": null, "XS_USER_NAME": null, "XS_ENABLED_ROLE": null, "XS_NS_ATTRIBUTE_NEW_VAL": null, "DIRECT_PATH_NUM_COLUMNS_LOADED": null, "AUDIT_OPTION": null, "DV_EXTENDED_ACTION_CODE": null, "XS_PACKAGE_NAME": null, "OLS_NEW_VALUE": null, "DV_RETURN_CODE": null, "XS_CALLBACK_EVENT_TYPE": null, "USERHOST": "a1b2c3d4e5f6.amazon.com", "GLOBAL_USERID": null, "CLIENT_IDENTIFIER": null, "RMAN_OPERATION": null, "TERMINAL": "unknown", "OS_USERNAME": "sumepate", "OLS_MAX_READ_LABEL": null, "XS_PROXY_USER_NAME": null, "XS_DATASEC_POLICY_NAME": null, "DV_FACTOR_CONTEXT": null, "OLS_MAX_WRITE_LABEL": null, "OLS_PARENT_GROUP_NAME": null, "EXCLUDED_OBJECT": null, "DV_RULE_SET_NAME": null, "EXTERNAL_USERID": null, "EXECUTION_ID": null, "ROLE": null, "PROXY_SESSIONID": 0, "DP_BOOLEAN_PARAMETERS1": null, "OLS_POLICY_NAME": null, "OLS_GRANTEE": null, "OLS_MIN_WRITE_LABEL": null, "APPLICATION_CONTEXTS": null, "XS_SCHEMA_NAME": null, "DV_GRANTEE": null, "XS_COOKIE": null, "DBPROXY_USERNAME": null, "DV_ACTION_CODE": null, "OLS_PRIVILEGES_USED": null, "RMAN_DEVICE_TYPE": null, "XS_NS_ATTRIBUTE_OLD_VAL": null, "TARGET_USER": null, "XS_ENTITY_TYPE": null, "ENTRY_ID": 1, "XS_PROCEDURE_NAME": null, "XS_INACTIVITY_TIMEOUT": null, "RMAN_OBJECT_TYPE": null, "SYSTEM_PRIVILEGE": null, "NEW_SCHEMA": null, "SCN": 5124715 } }

以下活动事件记录显示 SQL Server 数据库的登录失败。

{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "LOGIN", "clientApplication": "Microsoft SQL Server Management Studio", "command": "LOGIN FAILED", "commandText": "Login failed for user 'test'. Reason: Password did not match that for the login provided. [CLIENT: local-machine]", "databaseName": "", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 0, "logTime": "2022-10-06 21:34:42.7113072+00", "netProtocol": null, "objectName": "", "objectType": "LOGIN", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 0, "startTime": null, "statementId": "0x1eb0d1808d34a94b9d3dcf5432750f02", "substatementId": 1, "transactionId": "0", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 0, "user_defined_information": "", "response_rows": 0, "database_principal_name": "", "target_server_principal_name": "", "schema_name": "", "is_column_permission": false, "object_id": 0, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "<action_info "xmlns=\"http://schemas.microsoft.com/sqlserver/2008/sqlaudit_data\"><pooled_connection>0</pooled_connection><error>0x00004818</error><state>8</state><address>local machine</address><PasswordFirstNibbleHash>B</PasswordFirstNibbleHash></action_info>"-->, "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000000", "data_sensitivity_information": "", "session_server_principal_name": "", "connection_id": "98B4F537-0F82-49E3-AB08-B9D33B5893EF", "audit_schema_version": 1, "database_principal_id": 0, "server_principal_sid": null, "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
注意

如果未启用数据库活动流,则 JSON 文档中的最后一个字段为 "engineNativeAuditFields": { }

CREATE TABLE 语句的活动事件记录

以下示例显示 Oracle 数据库CREATE TABLE 事件。

{ "class": "Standard", "clientApplication": "sqlplus@ip-12-34-5-678 (TNS V1-V3)", "command": "CREATE TABLE", "commandText": "CREATE TABLE persons(\n person_id NUMBER GENERATED BY DEFAULT AS IDENTITY,\n first_name VARCHAR2(50) NOT NULL,\n last_name VARCHAR2(50) NOT NULL,\n PRIMARY KEY(person_id)\n)", "dbid": "0123456789", "databaseName": "ORCL", "dbProtocol": "oracle", "dbUserName": "TEST", "endTime": null, "errorMessage": null, "exitCode": 0, "logTime": "2021-01-15 00:22:49.535239", "netProtocol": "beq", "objectName": "PERSONS", "objectType": "TEST", "paramList": [], "pid": 17687, "remoteHost": "123.456.789.0", "remotePort": null, "rowCount": null, "serverHost": "987.654.321.01", "serverType": "oracle", "serverVersion": "19.0.0.0.ru-2020-01.rur-2020-01.r1.EE.3", "serviceName": "oracle-ee", "sessionId": 1234567890, "startTime": null, "statementId": 43, "substatementId": null, "transactionId": "090011007F0D0000", "engineNativeAuditFields": { "UNIFIED_AUDIT_POLICIES": "TEST_POL_EVERYTHING", "FGA_POLICY_NAME": null, "DV_OBJECT_STATUS": null, "SYSTEM_PRIVILEGE_USED": "CREATE SEQUENCE, CREATE TABLE", "OLS_LABEL_COMPONENT_TYPE": null, "XS_SESSIONID": null, "ADDITIONAL_INFO": null, "INSTANCE_ID": 1, "DV_COMMENT": null, "RMAN_SESSION_STAMP": null, "NEW_NAME": null, "DV_ACTION_NAME": null, "OLS_PROGRAM_UNIT_NAME": null, "OLS_STRING_LABEL": null, "RMAN_SESSION_RECID": null, "OBJECT_PRIVILEGES": null, "OLS_OLD_VALUE": null, "XS_TARGET_PRINCIPAL_NAME": null, "XS_NS_ATTRIBUTE": null, "XS_NS_NAME": null, "DBLINK_INFO": null, "AUTHENTICATION_TYPE": "(TYPE\u003d(DATABASE));(CLIENT ADDRESS\u003d((PROTOCOL\u003dbeq)(HOST\u003d123.456.789.0)));", "OBJECT_EDITION": null, "OLS_PRIVILEGES_GRANTED": null, "EXCLUDED_USER": null, "DV_ACTION_OBJECT_NAME": null, "OLS_LABEL_COMPONENT_NAME": null, "EXCLUDED_SCHEMA": null, "DP_TEXT_PARAMETERS1": null, "XS_USER_NAME": null, "XS_ENABLED_ROLE": null, "XS_NS_ATTRIBUTE_NEW_VAL": null, "DIRECT_PATH_NUM_COLUMNS_LOADED": null, "AUDIT_OPTION": null, "DV_EXTENDED_ACTION_CODE": null, "XS_PACKAGE_NAME": null, "OLS_NEW_VALUE": null, "DV_RETURN_CODE": null, "XS_CALLBACK_EVENT_TYPE": null, "USERHOST": "ip-10-13-0-122", "GLOBAL_USERID": null, "CLIENT_IDENTIFIER": null, "RMAN_OPERATION": null, "TERMINAL": "pts/1", "OS_USERNAME": "rdsdb", "OLS_MAX_READ_LABEL": null, "XS_PROXY_USER_NAME": null, "XS_DATASEC_POLICY_NAME": null, "DV_FACTOR_CONTEXT": null, "OLS_MAX_WRITE_LABEL": null, "OLS_PARENT_GROUP_NAME": null, "EXCLUDED_OBJECT": null, "DV_RULE_SET_NAME": null, "EXTERNAL_USERID": null, "EXECUTION_ID": null, "ROLE": null, "PROXY_SESSIONID": 0, "DP_BOOLEAN_PARAMETERS1": null, "OLS_POLICY_NAME": null, "OLS_GRANTEE": null, "OLS_MIN_WRITE_LABEL": null, "APPLICATION_CONTEXTS": null, "XS_SCHEMA_NAME": null, "DV_GRANTEE": null, "XS_COOKIE": null, "DBPROXY_USERNAME": null, "DV_ACTION_CODE": null, "OLS_PRIVILEGES_USED": null, "RMAN_DEVICE_TYPE": null, "XS_NS_ATTRIBUTE_OLD_VAL": null, "TARGET_USER": null, "XS_ENTITY_TYPE": null, "ENTRY_ID": 12, "XS_PROCEDURE_NAME": null, "XS_INACTIVITY_TIMEOUT": null, "RMAN_OBJECT_TYPE": null, "SYSTEM_PRIVILEGE": null, "NEW_SCHEMA": null, "SCN": 5133083 } }

以下示例显示 SQL Server 数据库的 CREATE TABLE 事件。

{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "SCHEMA", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "ALTER", "commandText": "Create table [testDB].[dbo].[TestTable2](\r\ntextA varchar(6000),\r\n textB varchar(6000)\r\n)", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:44:38.4120677+00", "netProtocol": null, "objectName": "dbo", "objectType": "SCHEMA", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 84, "startTime": null, "statementId": "0x5178d33d56e95e419558b9607158a5bd", "substatementId": 1, "transactionId": "4561864", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "", "is_column_permission": false, "object_id": 1, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000000", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "EE1FE3FD-EF2C-41FD-AF45-9051E0CD983A", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
SELECT 语句的活动事件记录

以下示例显示 Oracle 数据库的 SELECT 事件。

{ "class": "Standard", "clientApplication": "sqlplus@ip-12-34-5-678 (TNS V1-V3)", "command": "SELECT", "commandText": "select count(*) from persons", "databaseName": "1234567890", "dbProtocol": "oracle", "dbUserName": "TEST", "endTime": null, "errorMessage": null, "exitCode": 0, "logTime": "2021-01-15 00:25:18.850375", "netProtocol": "beq", "objectName": "PERSONS", "objectType": "TEST", "paramList": [], "pid": 17687, "remoteHost": "123.456.789.0", "remotePort": null, "rowCount": null, "serverHost": "987.654.321.09", "serverType": "oracle", "serverVersion": "19.0.0.0.ru-2020-01.rur-2020-01.r1.EE.3", "serviceName": "oracle-ee", "sessionId": 1080639707, "startTime": null, "statementId": 44, "substatementId": null, "transactionId": null, "engineNativeAuditFields": { "UNIFIED_AUDIT_POLICIES": "TEST_POL_EVERYTHING", "FGA_POLICY_NAME": null, "DV_OBJECT_STATUS": null, "SYSTEM_PRIVILEGE_USED": null, "OLS_LABEL_COMPONENT_TYPE": null, "XS_SESSIONID": null, "ADDITIONAL_INFO": null, "INSTANCE_ID": 1, "DV_COMMENT": null, "RMAN_SESSION_STAMP": null, "NEW_NAME": null, "DV_ACTION_NAME": null, "OLS_PROGRAM_UNIT_NAME": null, "OLS_STRING_LABEL": null, "RMAN_SESSION_RECID": null, "OBJECT_PRIVILEGES": null, "OLS_OLD_VALUE": null, "XS_TARGET_PRINCIPAL_NAME": null, "XS_NS_ATTRIBUTE": null, "XS_NS_NAME": null, "DBLINK_INFO": null, "AUTHENTICATION_TYPE": "(TYPE\u003d(DATABASE));(CLIENT ADDRESS\u003d((PROTOCOL\u003dbeq)(HOST\u003d123.456.789.0)));", "OBJECT_EDITION": null, "OLS_PRIVILEGES_GRANTED": null, "EXCLUDED_USER": null, "DV_ACTION_OBJECT_NAME": null, "OLS_LABEL_COMPONENT_NAME": null, "EXCLUDED_SCHEMA": null, "DP_TEXT_PARAMETERS1": null, "XS_USER_NAME": null, "XS_ENABLED_ROLE": null, "XS_NS_ATTRIBUTE_NEW_VAL": null, "DIRECT_PATH_NUM_COLUMNS_LOADED": null, "AUDIT_OPTION": null, "DV_EXTENDED_ACTION_CODE": null, "XS_PACKAGE_NAME": null, "OLS_NEW_VALUE": null, "DV_RETURN_CODE": null, "XS_CALLBACK_EVENT_TYPE": null, "USERHOST": "ip-12-34-5-678", "GLOBAL_USERID": null, "CLIENT_IDENTIFIER": null, "RMAN_OPERATION": null, "TERMINAL": "pts/1", "OS_USERNAME": "rdsdb", "OLS_MAX_READ_LABEL": null, "XS_PROXY_USER_NAME": null, "XS_DATASEC_POLICY_NAME": null, "DV_FACTOR_CONTEXT": null, "OLS_MAX_WRITE_LABEL": null, "OLS_PARENT_GROUP_NAME": null, "EXCLUDED_OBJECT": null, "DV_RULE_SET_NAME": null, "EXTERNAL_USERID": null, "EXECUTION_ID": null, "ROLE": null, "PROXY_SESSIONID": 0, "DP_BOOLEAN_PARAMETERS1": null, "OLS_POLICY_NAME": null, "OLS_GRANTEE": null, "OLS_MIN_WRITE_LABEL": null, "APPLICATION_CONTEXTS": null, "XS_SCHEMA_NAME": null, "DV_GRANTEE": null, "XS_COOKIE": null, "DBPROXY_USERNAME": null, "DV_ACTION_CODE": null, "OLS_PRIVILEGES_USED": null, "RMAN_DEVICE_TYPE": null, "XS_NS_ATTRIBUTE_OLD_VAL": null, "TARGET_USER": null, "XS_ENTITY_TYPE": null, "ENTRY_ID": 13, "XS_PROCEDURE_NAME": null, "XS_INACTIVITY_TIMEOUT": null, "RMAN_OBJECT_TYPE": null, "SYSTEM_PRIVILEGE": null, "NEW_SCHEMA": null, "SCN": 5136972 } }

以下示例显示 SQL Server 数据库的 SELECT 事件。

{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "TABLE", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "SELECT", "commandText": "select * from [testDB].[dbo].[TestTable]", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:24:59.9422268+00", "netProtocol": null, "objectName": "TestTable", "objectType": "TABLE", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 62, "startTime": null, "statementId": "0x03baed90412f564fad640ebe51f89b99", "substatementId": 1, "transactionId": "4532935", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "dbo", "is_column_permission": true, "object_id": 581577110, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000001", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }

DatabaseActivityMonitoringRecords JSON 对象

数据库活动事件记录位于包含以下信息的 JSON 对象中。

JSON 字段 数据类型 描述

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecords

version 字符串 数据库活动监控记录的版本。Oracle 数据库使用版本 1.3,SQL Server 使用版本 1.4。这些引擎版本引入了 engineNativeAuditFields JSON 对象。

databaseActivityEvents

字符串

包含活动事件的 JSON 对象。

key 字符串 用于解密 databaseActivityEventList 的加密密钥

databaseActivityEvents JSON 对象

databaseActivityEvents JSON 对象包含以下信息。

JSON 记录中的顶级字段

审核日志中的每个事件都包装在 JSON 格式的记录中。此记录包含以下字段。

类型

此字段始终具有值 DatabaseActivityMonitoringRecords

版本

此字段表示数据库活动流数据协议或合同的版本。它定义了哪些字段可用。

databaseActivityEvents

表示一个或多个活动事件的加密字符串。它表示为 base64 字节数组。解密字符串时,结果是 JSON 格式的记录,其中包含字段,如本节中的示例所示。

key

用于加密 databaseActivityEvents 字符串的加密数据密钥。这与您在启动数据库活动流时提供的 Amazon KMS key 密钥相同。

以下示例显示了此记录的格式。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.3", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }
"type":"DatabaseActivityMonitoringRecords", "version":"1.4", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key"

执行以下步骤来解密 databaseActivityEvents 字段的内容:

  1. 使用您在启动数据库活动流时提供的 KMS 密钥解密 key JSON 字段中的值。这样做将以明文形式返回数据加密密钥。

  2. databaseActivityEvents JSON 字段中的值进行 Base64 解码,以获取审核负载的二进制格式的密文。

  3. 使用您在第一步中解码的数据加密密钥解密二进制密文。

  4. 解压解已解密的负载。

    • 已加密的负载在 databaseActivityEvents 字段中。

    • databaseActivityEventList 字段包含审核记录数组。此数组中的 type 字段可以是 recordheartbeat

审核日志活动事件记录是包含以下信息的 JSON 对象。

JSON 字段 数据类型 描述

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecord

instanceId 字符串 数据库实例资源标识符。它对应于数据库实例属性 DbiResourceId

databaseActivityEventList

字符串

活动审核记录或检测信号消息的数组。

databaseActivityEventList JSON 数组

审核日志负载是解密的 databaseActivityEventList JSON 数组。以下表列表按字母顺序列出了审计日志的解密 DatabaseActivityEventList 数组中每个活动事件的字段。

如果在 Oracle 数据库中启用了统一审计,审计记录将填充在此新的审计跟踪中。UNIFIED_AUDIT_TRAIL 视图通过从审计跟踪中检索审计记录以表格形式显示审计记录。启动数据库活动流时,UNIFIED_AUDIT_TRAIL 中的一列映射到 databaseActivityEventList 数组中的一个字段。

重要

事件结构可能会发生变化。Amazon RDS 可能会将新字段添加到未来的活动事件中。在解析 JSON 数据的应用程序中,请确保您的代码可以忽略未知字段名称或对其采取适当操作。

Amazon RDS for Oracle 的 databaseActivityEventList 字段
字段 数据类型 描述

class

字符串

UNIFIED_AUDIT_TRAIL 中的 AUDIT_TYPE

活动事件的类。这对应于 UNIFIED_AUDIT_TRAIL 视图中的 AUDIT_TYPE 列。Amazon RDS for Oracle 的有效值如下:

  • Standard

  • FineGrainedAudit

  • XS

  • Database Vault

  • Label Security

  • RMAN_AUDIT

  • Datapump

  • Direct path API

有关更多信息,请参阅 Oracle 文档中的 UNIFIED_AUDIT_TRAIL

clientApplication

字符串

CLIENT_PROGRAM_NAME中的UNIFIED_AUDIT_TRAIL

客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。示例值为 JDBC Thin Client

command

字符串

UNIFIED_AUDIT_TRAIL 中的 ACTION_NAME

用户执行的操作名称。要了解完整操作,请同时阅读命令名称和 AUDIT_TYPE 值。示例值为 ALTER DATABASE

commandText

字符串

UNIFIED_AUDIT_TRAIL 中的 SQL_TEXT

与事件关联的 SQL 语句。示例值为 ALTER DATABASE BEGIN BACKUP

databaseName

字符串

V$DATABASE 中的 NAME

数据库的名称。

dbid

number

UNIFIED_AUDIT_TRAIL 中的 DBID

数据库的数字标识符。示例值为 1559204751

dbProtocol

字符串

不适用

数据库协议。在该测试版中,值为 oracle

dbUserName

字符串

UNIFIED_AUDIT_TRAIL 中的 DBUSERNAME

已审核其操作的数据库用户的名称。示例值为 RDSADMIN

endTime

字符串

不适用

此字段不用于 RDS for Oracle 且始终为 Null。

engineNativeAuditFields

object

UNIFIED_AUDIT_TRAIL

默认情况下,此对象为空。当您使用 --engine-native-audit-fields-included 选项启动活动流时,此对象包括以下列及其值:

ADDITIONAL_INFO APPLICATION_CONTEXTS AUDIT_OPTION AUTHENTICATION_TYPE CLIENT_IDENTIFIER CURRENT_USER DBLINK_INFO DBPROXY_USERNAME DIRECT_PATH_NUM_COLUMNS_LOADED DP_BOOLEAN_PARAMETERS1 DP_TEXT_PARAMETERS1 DV_ACTION_CODE DV_ACTION_NAME DV_ACTION_OBJECT_NAME DV_COMMENT DV_EXTENDED_ACTION_CODE DV_FACTOR_CONTEXT DV_GRANTEE DV_OBJECT_STATUS DV_RETURN_CODE DV_RULE_SET_NAME ENTRY_ID EXCLUDED_OBJECT EXCLUDED_SCHEMA EXCLUDED_USER EXECUTION_ID EXTERNAL_USERID FGA_POLICY_NAME GLOBAL_USERID INSTANCE_ID KSACL_SERVICE_NAME KSACL_SOURCE_LOCATION KSACL_USER_NAME NEW_NAME NEW_SCHEMA OBJECT_EDITION OBJECT_PRIVILEGES OLS_GRANTEE OLS_LABEL_COMPONENT_NAME OLS_LABEL_COMPONENT_TYPE OLS_MAX_READ_LABEL OLS_MAX_WRITE_LABEL OLS_MIN_WRITE_LABEL OLS_NEW_VALUE OLS_OLD_VALUE OLS_PARENT_GROUP_NAME OLS_POLICY_NAME OLS_PRIVILEGES_GRANTED OLS_PRIVILEGES_USED OLS_PROGRAM_UNIT_NAME OLS_STRING_LABEL OS_USERNAME PROTOCOL_ACTION_NAME PROTOCOL_MESSAGE PROTOCOL_RETURN_CODE PROTOCOL_SESSION_ID PROTOCOL_USERHOST PROXY_SESSIONID RLS_INFO RMAN_DEVICE_TYPE RMAN_OBJECT_TYPE RMAN_OPERATION RMAN_SESSION_RECID RMAN_SESSION_STAMP ROLE SCN SYSTEM_PRIVILEGE SYSTEM_PRIVILEGE_USED TARGET_USER TERMINAL UNIFIED_AUDIT_POLICIES USERHOST XS_CALLBACK_EVENT_TYPE XS_COOKIE XS_DATASEC_POLICY_NAME XS_ENABLED_ROLE XS_ENTITY_TYPE XS_INACTIVITY_TIMEOUT XS_NS_ATTRIBUTE XS_NS_ATTRIBUTE_NEW_VAL XS_NS_ATTRIBUTE_OLD_VAL XS_NS_NAME XS_PACKAGE_NAME XS_PROCEDURE_NAME XS_PROXY_USER_NAME XS_SCHEMA_NAME XS_SESSIONID XS_TARGET_PRINCIPAL_NAME XS_USER_NAME

有关更多信息,请参阅 Oracle 数据库文档中的 UNIFIED_AUDIT_TRAIL

errorMessage

字符串

不适用

此字段不用于 RDS for Oracle 且始终为 Null。

exitCode

number

UNIFIED_AUDIT_TRAIL 中的 RETURN_CODE

操作生成的 Oracle 数据库错误代码。如果操作成功,则值为 0

logTime

字符串

UNIFIED_AUDIT_TRAIL 中的 EVENT_TIMESTAMP_UTC

创建审计跟踪条目的时间戳。示例值为 2020-11-27 06:56:14.981404

netProtocol

字符串

UNIFIED_AUDIT_TRAIL 中的 AUTHENTICATION_TYPE

网络通信协议。示例值为 TCP

objectName

字符串

UNIFIED_AUDIT_TRAIL 中的 OBJECT_NAME

受操作影响的对象名称。示例值为 employees

objectType

字符串

UNIFIED_AUDIT_TRAIL 中的 OBJECT_SCHEMA

受操作影响的对象架构名称。示例值为 hr

paramList

list

UNIFIED_AUDIT_TRAIL 中的 SQL_BINDS

SQL_TEXT 关联的绑定变量列表(如有)。示例值为 parameter_1,parameter_2

pid

number

UNIFIED_AUDIT_TRAIL 中的 OS_PROCESS

Oracle 数据库进程的操作系统进程标识符。示例值为 22396

remoteHost

字符串

UNIFIED_AUDIT_TRAIL 中的 AUTHENTICATION_TYPE

客户端 IP 地址或生成会话的主机的名称。示例值为 123.456.789.123

remotePort

字符串

UNIFIED_AUDIT_TRAIL 中的 AUTHENTICATION_TYPE

客户端的端口号。Oracle 数据库环境中的典型值是 1521

rowCount

number

不适用

此字段不用于 RDS for Oracle 且始终为 Null。

serverHost

字符串

数据库主机

数据库服务器主机的 IP 地址。示例值为 123.456.789.123

serverType

字符串

不适用

数据库服务器类型。此值始终为 ORACLE

serverVersion

字符串

数据库主机

Amazon RDS for Oracle 版本、发布更新(RU)和版本更新修订(RUR)。示例值为 19.0.0.0.ru-2020-01.rur-2020-01.r1.EE.3

serviceName

字符串

数据库主机

服务的名称。示例值为 oracle-ee

sessionId

number

UNIFIED_AUDIT_TRAIL 中的 SESSIONID

审计的会话标识符。示例是 1894327130

startTime

字符串

不适用

此字段不用于 RDS for Oracle 且始终为 Null。

statementId

number

UNIFIED_AUDIT_TRAIL 中的 STATEMENT_ID

每个语句运行的数字 ID。一个语句可能会导致多个操作。示例值为 142197

substatementId

不适用

不适用

此字段不用于 RDS for Oracle 且始终为 Null。

transactionId

字符串

UNIFIED_AUDIT_TRAIL 中的 TRANSACTION_ID

在其中修改对象事务的标识符。示例值为 02000800D5030000

Amazon RDS for SQL Server 的 databaseActivityEventList 字段
字段 数据类型 描述

class

字符串

sys.fn_get_audit_file.class_type 映射到 sys.dm_audit_class_type_map.class_type_desc

活动事件的类。有关更多信息,请参阅 Microsoft 文档中的 SQL Server 审计(数据库引擎)

clientApplication

字符串

sys.fn_get_audit_file.application_name

客户端报告的客户端连接的应用程序(SQL Server 版本 14 及更高版本)。在 SQL Server 版本 13 中,此字段为空。

command

字符串

sys.fn_get_audit_file.action_id 映射到 sys.dm_audit_actions.name

SQL 语句的常规类别。此字段的值取决于类的值。

commandText

字符串

sys.fn_get_audit_file.statement

此字段指示 SQL 语句。

databaseName

字符串

sys.fn_get_audit_file.database_name

数据库的名称。

dbProtocol

字符串

不适用

数据库协议。该值为 SQLSERVER

dbUserName

字符串

sys.fn_get_audit_file.server_principal_name

客户端身份验证的数据库用户。

endTime

字符串

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

engineNativeAuditFields

object

sys.fn_get_audit_file 中此列未列出的每个字段。

默认情况下,此对象为空。当您使用 --engine-native-audit-fields-included 选项启动活动流时,此对象包括其他原生引擎审计字段,此 JSON 映射不返回这些字段。

errorMessage

字符串

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

exitCode

整数

sys.fn_get_audit_file.succeeded

表示启动事件的操作是否成功。此字段不能为空。对于除登录事件以外的所有事件,此字段报告权限检查是成功还是失败,但不报告操作是成功还是失败。

值包括:

  • 0 – 失败

  • 1 – 成功

logTime

字符串

sys.fn_get_audit_file.event_time

由 SQL Server 记录的事件时间戳。

netProtocol

字符串

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

objectName

字符串

sys.fn_get_audit_file.object_name

数据库对象的名称(如果正在对某个对象运行 SQL 语句)。

objectType

字符串

sys.fn_get_audit_file.class_type 映射到 sys.dm_audit_class_type_map.class_type_desc

数据库对象类型(如果正在对某个对象类型运行 SQL 语句)。

paramList

字符串

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

pid

整数

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

remoteHost

字符串

sys.fn_get_audit_file.client_ip

发出 SQL 语句的客户端的 IP 地址或主机名(SQL Server 版本 14 及更高版本)。在 SQL Server 版本 13 中,此字段为空。

remotePort

整数

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

rowCount

整数

sys.fn_get_audit_file.affected_rows

SQL 语句所影响的表行的数量(SQL Server 版本 14 及更高版本)。此字段位于 SQL Server 版本 13 中。

serverHost

字符串

数据库主机

主机数据库服务器的 IP 地址。

serverType

字符串

不适用

数据库服务器类型。值为 SQLSERVER

serverVersion

字符串

数据库主机

数据库服务器版本,例如,对于 SQL Server 2017,为 15.00.4073.23.v1.R1。

serviceName

字符串

数据库主机

服务的名称。示例值为 sqlserver-ee

sessionId

整数

sys.fn_get_audit_file.session_id

会话的唯一标识符。

startTime

字符串

不适用

Amazon RDS for SQL Server 未使用此字段,值为空。

statementId

字符串

sys.fn_get_audit_file.sequence_group_id

客户端的 SQL 语句的唯一标识符。对于生成的每个事件,标识符都不同。示例值为 0x38eaf4156267184094bb82071aaab644

substatementId

整数

sys.fn_get_audit_file.sequence_number

用于确定语句的序列号的标识符。当大型记录拆分为多条记录时,此标识符会有所帮助。

transactionId

整数

sys.fn_get_audit_file.transaction_id

事务的标识符。如果没有任何活动的事务,则该值为零。

type

字符串

生成的数据库活动流

事件类型。值为 recordheartbeat

使用 Amazon 开发工具包处理数据库活动流

您可以使用 Amazon 开发工具包以编程方式处理活动流。以下功能完善的 Java 和 Python 示例是关于使用数据库活动流记录实现基于实例的实施。

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[instance-external-resource-id]"; // aws-rds-das-db-ABCD123456 private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String RESOURCE_ID = "[external-resource-id]"; // db-ABCD123456 private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; // note that clusterId will contain an empty string on RDS Oracle and RDS SQL Server String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:db-id", RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # db-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-db-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:db-id': RESOURCE_ID}) print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()