Amazon Aurora
Aurora 用户指南 (API 版本 2014-10-31)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将数据库活动流与 Aurora PostgreSQL 结合使用

监控数据库活动可以帮助您为数据库提供安全保障,并满足合规性和法规要求。借助 与 PostgreSQL 兼容的 Amazon Aurora 监控数据库活动的一种方式是使用数据库活动流。数据库活动流 提供关系数据库中数据库活动的近实时数据流。在将数据库活动流与第三方监控工具集成时,您可以监控和审核数据库活动。

除了外部安全威胁之外,托管数据库还需要提供保护以规避来自数据库管理员 (DBA) 的内部风险。数据库活动流通过控制 DBA 对数据库活动流的访问来保护您的数据库,使其免受内部威胁。因此,数据库活动流的收集、传输、存储和后续处理超出了管理数据库的 DBA 的访问权限。

来自 Aurora PostgreSQL 的数据库活动流将推送到代表您的数据库创建的 Amazon Kinesis 数据流。在 Kinesis 中,数据库活动流随后可由 Amazon CloudWatch 或应用程序使用以进行合规性管理。这些合规性应用程序包括 Imperva 的 SecureSphere Database Audit and Protection、McAfee 的 Data Center Security Suite 或 IBM 的 Infosphere Guardium。这些应用程序可以使用数据库活动流信息生成警报,并提供对 Amazon Aurora 数据库上所有活动的审核。

数据库活动流具有以下限制和要求:

  • 目前,这些流仅受 与 PostgreSQL 兼容的 Aurora 版本 2.3 支持,后者与 PostgreSQL 版本 10.7 兼容。

  • 它们在以下 AWS 区域内不受支持:

    • 中国(北京)区域、cn-north-1

    • 中国 (宁夏) 区域、cn-northwest-1

    • AWS GovCloud(美国东部)、us-gov-east-1

    • AWS GovCloud(美国西部)、us-gov-west-1

  • 它们需要使用 AWS Key Management Service (AWS KMS)。

启动数据库活动流

您在数据库集群级别启动数据库活动流,以监控集群的所有数据库实例的活动。还将自动监控添加到集群的任何数据库实例。

在启动数据库活动流时,每个数据库活动事件(如更改或访问)都会生成一个活动流事件。从 SQL 命令(例如 CONNECTSELECT)生成访问事件。从 SQL 命令(例如 CREATEINSERT)生成更改事件。要使每个活动流事件持久,您可以加密并存储它。您可以选择让数据库会话以同步或异步方式处理数据库活动事件:

  • 同步模式 – 在同步模式下,当数据库会话生成活动流事件时,会话将阻塞,直到该事件变得持久。如果因某个原因无法使事件持久,数据库会话将返回到正常活动。但会发送一个 RDS 事件来指示活动流记录可能会丢失一段时间。在系统恢复到正常运行状态后发送第二个 RDS 事件。

    同步模式可提高数据库活动流的准确性而非数据库性能。

  • 异步模式 – 在异步模式下,当数据库会话生成活动流事件时,会话将立即返回到正常活动。在后台,活动流事件将成为持久记录。如果后台任务出错,则将发送 RDS 事件。此事件指示活动流事件记录可能已丢失的任何时间段的开始和结束时间。

    异步模式可提高数据库性能而非数据库活动流的准确性。

控制台

启动数据库活动流

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择 Databases (数据库)

  3. 选择要修改的数据库集群。

  4. 对于 Actions (操作),选择 Start activity stream (启动活动流)。此时将显示 Database Activity Stream (数据库活动流) 窗口。

  5. Database Activity Stream (数据库活动流) 窗口中,输入以下设置:

    • 对于 Master key (主密钥),从 AWS KMS 密钥列表中选择一个密钥。

      主密钥用于加密密钥,而密钥反过来加密记录的数据库活动。您必须选择默认密钥以外的主密钥。有关加密密钥和 AWS KMS 的更多信息,请参阅 AWS Key Management Service Developer Guide 中的什么是 AWS Key Management Service?

    • 对于 Database activity stream mode (数据库活动流模式),选择 Asynchronous (异步)Synchronous (同步)

    • 选择 Apply immediately (立即应用)

      如果您选择 Schedule for the next maintenance window (下一个维护时段的计划),则数据库不会立即重启。相反,它将保持 PENDING REBOOT 状态。在此情况下,数据库活动流将不会启动,直到下一个维护时段或手动重新启动。

    在输入设置后,选择 Continue (继续)

    集群的数据库实例状态显示正在配置数据库活动流。

AWS CLI

要为数据库集群启动数据库活动流,请使用 start-activity-stream AWS CLI 命令配置数据库集群。使用 --region 参数标识数据库集群的 AWS 区域。--apply-immediately 参数是可选的。

针对 Linux、OS X 或 Unix:

aws rds --region us-west-2 \ start-activity-stream \ --mode sync \ --kms-key-id MY_KMS_KEY_ARN \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately \ --profile MY_PROFILE_CREDENTIALS

对于 Windows:

aws rds --region us-west-2 ^ start-activity-stream ^ --mode sync ^ --kms-key-id MY_KMS_KEY_ARN ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately ^ --profile MY_PROFILE_CREDENTIALS

获取数据库活动流的状态

您可以使用控制台或 AWS CLI 获取数据库活动流的状态。

控制台

获取数据库集群的数据库活动流的状态

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择 Databases (数据库),然后选择数据库集群。

  3. 选择 Configuration (配置) 选项卡,并选择 Database activity stream (数据库活动流) 以查看状态。

AWS CLI

您可以获取数据库集群的数据库活动流配置作为对 describe-db-clusters CLI 请求的响应。在以下示例中,查看 ActivityStreamKinesisStreamNameActivityStreamStatusActivityStreamKmsKeyIdActivityStreamMode 的值。

请求如下:

aws rds --region us-west-2 describe-db-clusters --db-cluster-identifier my-cluster --profile MY_PROFILE_CREDENTIALS

响应包括以下数据库活动流项:

{ "DBClusters": [ { "DBClusterIdentifier": "my-cluster", . . . "ActivityStreamKinesisStreamName": "aws-rds-das-cluster-A6TSYXITZCZXJHIRVFUBZ5LTWY", "ActivityStreamStatus": "starting", "ActivityStreamKmsKeyId": "12345678-abcd-efgh-ijkl-bd041f170262", "ActivityStreamMode": "sync", "DbClusterResourceId": "cluster-ABCD123456" . . . } ] }

停止数据库活动流

您可以使用控制台或 AWS CLI 停止数据库活动流。

控制台

禁用数据库活动流

  1. https://console.amazonaws.cn/rds/ 处打开 Amazon RDS 控制台。

  2. 在导航窗格中,选择 Databases (数据库)

  3. 选择要为其停止数据库活动流的数据库集群。

  4. 对于 Actions (操作),选择 Stop activity stream (停止活动流)。此时将显示 Database Activity Stream (数据库活动流) 窗口。

    1. 选择 Apply immediately (立即应用)

      如果您选择 Schedule for the next maintenance window (下一个维护时段的计划),则数据库不会立即重启。相反,它将保持 PENDING REBOOT 状态。在此情况下,不会禁用数据库活动流,直到下一个维护时段或手动重新启动。

    2. 选择 Continue (继续)

AWS CLI

要为数据库集群停止数据库活动流,请使用 AWS CLI 命令 stop-activity-stream 配置数据库集群。使用 --region 参数标识数据库集群的 AWS 区域。--apply-immediately 参数是可选的。

针对 Linux、OS X 或 Unix:

aws rds --region us-west-2 \ stop-activity-stream \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately \ --profile MY_PROFILE_CREDENTIALS

对于 Windows:

aws rds --region us-west-2 ^ stop-activity-stream ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately ^ --profile MY_PROFILE_CREDENTIALS

监控数据库活动流

数据库活动流监控并报告数据库上的所有活动。使用 Amazon Kinesis 收集数据流并将其传输到安全服务器。从 Kinesis 中,其他服务和应用程序可以监控或稍后使用数据流以进行进一步分析。

监控以下类别的活动并将其放入数据库活动流审核日志中:

  • SQL 命令 – 将审核所有 SQL 命令,并准备语句、PostgreSQL 函数和采用 SQL 过程语言 (PL/SQL) 的函数。

  • 其他数据库信息 – 受监控的活动包括完整的 SQL 语句、参数、绑定变量、来自 DML 命令的受影响行的行数、访问的对象以及唯一的数据库名称。

  • 连接信息 – 受监控的活动包括会话和网络信息、服务器进程 ID 和退出代码。

如果数据库活动流在监控数据库实例时出现故障,则会使用 RDS 事件通知您。如果发生故障,您可以决定是关闭数据库实例还是让它继续。

从 Kinesis 访问数据库活动流

在为数据库集群启用数据库活动流时,会为您创建 Kinesis 流。您可以从 Kinesis 实时监控数据库活动。要进一步分析数据库活动,您可以将 Kinesis 流连接到使用者应用程序(例如 Amazon CloudWatch)。您还可以将该流连接到来自 Imperva、McAfee 或 IBM 的合规性管理应用程序。

从 Kinesis 访问数据库活动流

  1. 通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 从 Kinesis 流列表中选择您的数据库活动流。

    数据库活动流的名称包含前缀 aws-rds-das-,后跟数据库集群的资源 ID。以下是示例。

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    要使用 Amazon RDS 控制台查找数据库集群的资源 ID,请从数据库列表中选择您的数据库集群,然后选择 Configuration (配置) 选项卡。

    要使用 AWS CLI 查找数据库活动流的完整 Kinesis 流名称,请使用 describe-db-clusters CLI 请求并记下响应中的 DBActivityStreamKinesisStreamName 的值。

  3. 选择 Monitoring (监控) 以开始观察数据库活动。

有关使用 Amazon Kinesis 的更多信息,请参阅什么是 Amazon Kinesis Data Streams?

审核日志内容和示例

受监控的数据库活动事件在 Kinesis 活动流中表示为 JSON 字符串。结构包含一个 JSON 对象,该对象包含一个 DatabaseActivityMonitoringRecord,后者反过来包含活动事件的 databaseActivityEventList 阵列。

审核日志示例

以下是活动事件记录的已解密 JSON 审核日志示例。

例 CONNECT SQL 语句的活动事件记录

以下是通过 psql 客户端 (clientApplication) 使用 CONNECT SQL 语句 (command) 进行的登录的活动事件记录。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "logTime": "2019-05-23 01:31:28.610198+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record" } ] }

例 CREATE TABLE 语句的活动事件记录

以下是 CREATE TABLE 事件的示例。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record" } ] }

例 SELECT 语句的活动事件记录

以下是 SELECT 事件的示例。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record" } ] }

活动事件记录

审核日志活动事件记录是包含以下信息的 JSON 对象。

JSON 字段 数据类型 描述

type

字符串

JSON 记录的类型。值为 DatabaseActivityMonitoringRecord

clusterId 字符串 数据库集群 ID
instanceId 字符串 数据库实例 ID。

databaseActivityEventList

字符串

一个加密为 base64 字节数组的 JSON 对象。执行以下步骤可解密此内容:

  1. 使用数据库活动流的主密钥解密密钥 JSON 字段中的值。

  2. 使用步骤 1 中的解密密钥来解密 databaseActivityEventList 对象。

databaseActivityEventList JSON 数组

审核日志负载是解密的 databaseActivityEventList JSON 数组。下表按字母顺序列出了审核日志的解密 DatabaseActivityEventList 数组中每个活动事件的字段。

字段 数据类型 描述
class 字符串

活动事件的类。有效值如下所示:

  • ALL

  • CONNECT – 连接或断开连接事件。

  • DDL – 未包含在 ROLE 类的语句列表中的 DDL 语句。

  • FUNCTION – 函数调用或 DO 块。

  • MISC – 其他命令,例如 DISCARDFETCHCHECKPOINTVACUUM

  • NONE

  • READSELECTCOPY 语句(当源为关系或查询时)。

  • ROLE – 与角色或权限关联的语句,包括 GRANTREVOKECREATE/ALTER/DROP ROLE

  • WRITEINSERTUPDATEDELETETRUNCATECOPY 语句(当目标为关系时)。

clientApplication 字符串 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息,因此值可以为 null。
command 字符串 不带任何命令详细信息的 SQL 命令的名称。
commandText 字符串

用户传入的实际 SQL 语句。此字段用于除连接或断开连接记录之外的所有类型的记录,在这种情况下,该值为 null。

敏感数据

可以看到完整 SQL 文本,包括任何敏感数据。但是,如果可以从上下文中确定数据库用户密码,则会对该密码进行修订,如下面的 SQL 语句所示。

ALTER ROLE role-name WITH password
databaseName 字符串 用户连接到的数据库。
dbProtocol 字符串 数据库协议。
dbUserName 字符串 客户端对其进行身份验证的数据库用户。
exitCode int 用于会话退出记录的值。在干净的出口,这包含退出代码。在某些故障场景中,无法始终获得退出代码。例如,如果 PostgreSQL 执行 exit() 或操作者执行 kill -9 等命令。
logTime 字符串 审核代码路径中记录的时间戳。
netProtocol 字符串 网络通信协议。
objectName 字符串 数据库对象的名称(如果正在对一个数据库对象运行 SQL 语句)。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。
objectType 字符串 数据库对象类型,例如表、索引、视图等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句,则此值为 null。包括下列有效值:
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList 字符串 传递给 SQL 语句的逗号分隔的参数数组。如果 SQL 语句没有参数,则此值为空数组。
pid int 为向客户端连接提供服务而分配的后端进程的进程 ID。
remoteHost 字符串 客户端 IP 地址或主机名,具体取决于数据库的 log_hostname 参数设置。
remotePort 字符串 客户端的端口号。
rowCount int SQL 语句所影响或检索的表行的数目。此字段仅用于作为数据操作语言 (DML) 语句的 SQL 语句。如果 SQL 语句不是 DML 语句,则此值为 null。
serverHost 字符串 数据库服务器主机 IP 地址。
serverType 字符串 数据库服务器类型,例如 PostgreSQL
serverVersion 字符串 数据库服务器版本。
serviceName 字符串 服务名称,例如 Amazon Aurora PostgreSQL-Compatible edition
sessionId int 伪唯一会话标识符。
statementId int 客户端的 SQL 语句的 ID。计数器处于会话级别,并随客户端输入的每个 SQL 语句递增。
substatementId int SQL 子语句的 ID。此值计算 statementId 字段标识的每个 SQL 语句的包含的子语句。
type 字符串 事件类型。有效值为 recordheartbeat

使用 AWS 开发工具包处理数据库活动流

您可以使用 AWS 开发工具包以编程方式处理数据库活动流。以下是有关如何处理 Kinesis 数据流的功能完善的 Java 和 Python 示例。

JavaPython
Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String statementId; String substatementId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import zlib import boto3 import base64 import json import aws_encryption_sdk from Crypto.Cipher import AES from aws_encryption_sdk import DefaultCryptoMaterialsManager from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType aws_access_key_id="YOUR_ACCESS_KEY" aws_secret_access_key="YOUR_SECRET_KEY" key_id = "your_key_id" stream_name = "YOUR_KINESIS_STREAM_NAME" region_name = 'YOUR_REGION_NAME' cluster_id = "YOUR_CLUSTER_ID" class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, wrapping_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = wrapping_key def _get_raw_key(self, key_id): return self.wrapping_key def decrypt(decoded, plaintext): wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC) my_key_provider = MyRawMasterKeyProvider(wrapping_key) my_key_provider.add_master_key("DataKey") with aws_encryption_sdk.stream( mode='d', source=decoded, materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider) ) as decryptor: for chunk in decryptor: d = zlib.decompressobj(16 + zlib.MAX_WBITS) decompressed_database_stream = d.decompress(chunk) print decompressed_database_stream if __name__ == '__main__': session = boto3.session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) kms = session.client('kms', region_name=region_name) client = session.client('kinesis', region_name=region_name) response = client.describe_stream(StreamName=stream_name) shard_it = {} for idx, shard in enumerate(response['StreamDescription']['Shards']): shared_iterator = client.get_shard_iterator( StreamName=stream_name, ShardId=response['StreamDescription']['Shards'][idx]['ShardId'], ShardIteratorType='LATEST', )["ShardIterator"] shard_it[idx] = shared_iterator while True: rows = [] for shared_iterator in shard_it: response = client.get_records(ShardIterator=shard_it[shared_iterator], Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) key = record_data['key'] decoded = base64.b64decode(record_data['DatabaseActivityEvents']) decoded_data_key = base64.b64decode(record_data['key']) decrypt_result = kms.decrypt(CiphertextBlob=decoded_data_key, EncryptionContext={"aws:rds:dbc-id": cluster_id}) plaintext = decrypt_result[u'Plaintext'] cipher = AES.new(plaintext, AES.MODE_GCM) decrypt(decoded, decrypt_result[u'Plaintext']) shard_it[shared_iterator] = response["NextShardIterator"]

管理对数据库活动流的访问

对数据库活动流具有适当 AWS Identity and Access Management (IAM) 角色权限的任何用户都可以创建、启动、停止和修改数据库集群的数据库活动流设置。这些操作包含在流的审核日志中。对于最佳合规性实践,我们建议您不要向 DBA 提供这些权限。

您可以使用 IAM 策略设置对数据库活动流的访问权限。有关 Aurora 身份验证的更多信息,请参阅 Amazon Aurora 中的 Identity and Access Management。有关创建 IAM 策略的更多信息,请参阅 创建和使用适用于 IAM 数据库访问的 IAM 策略

例 允许配置数据库活动流的策略

要向用户提供精细访问权限以修改数据库活动流,请在 IAM 策略中使用特定于服务的操作上下文密钥 rds:ConfigureDBActivityStreams。以下 IAM 策略示例允许用户或角色配置数据库活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"ConfigureActivityStreams", "Effect":"Allow", "Action": [ "rds:StartActivityStream", "rds:StopActivityStream" ], "Resource":"*", } ] }

例 允许启动数据库活动流的策略

以下 IAM 策略示例允许用户或角色启动数据库活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStartActivityStreams", "Effect":"Allow", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 允许停止数据库活动流的策略

以下 IAM 策略示例允许用户或角色停止数据库活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStopActivityStreams", "Effect":"Allow", "Action":"rds:StopActivityStream", "Resource":"*" } ] }

例 拒绝启动数据库活动流的策略

以下 IAM 策略示例拒绝用户或角色启动数据库活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStartActivityStreams", "Effect":"Deny", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 拒绝停止数据库活动流的策略

以下 IAM 策略示例拒绝用户或角色停止数据库活动流。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStopActivityStreams", "Effect":"Deny", "Action":"rds:StopActivityStream", "Resource":"*" } ] }