Namespace Amazon.CDK.AWS.MSK.Alpha
Amazon Managed Streaming for Apache Kafka Construct Library
---The APIs of higher level constructs in this module are experimental and under active development.
They are subject to non-backward compatible changes or removal in any future version. These are
not subject to the <a href="https://semver.org/">Semantic Versioning</a> model and breaking changes will be
announced in the release notes. This means that while you may use them, you may need to update
your source code when upgrading to a newer version of this package.
Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data.
The following example creates an MSK Cluster.
Vpc vpc;
var cluster = new Cluster(this, "Cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc
});
Allowing Connections
To control who can access the Cluster, use the .connections
attribute. For a list of ports used by MSK, refer to the MSK documentation.
Vpc vpc;
var cluster = new Cluster(this, "Cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc
});
cluster.Connections.AllowFrom(Peer.Ipv4("1.2.3.4/8"), Port.Tcp(2181));
cluster.Connections.AllowFrom(Peer.Ipv4("1.2.3.4/8"), Port.Tcp(9094));
Cluster Endpoints
You can use the following attributes to get a list of the Kafka broker or ZooKeeper node endpoints
Cluster cluster;
new CfnOutput(this, "BootstrapBrokers", new CfnOutputProps { Value = cluster.BootstrapBrokers });
new CfnOutput(this, "BootstrapBrokersTls", new CfnOutputProps { Value = cluster.BootstrapBrokersTls });
new CfnOutput(this, "BootstrapBrokersSaslScram", new CfnOutputProps { Value = cluster.BootstrapBrokersSaslScram });
new CfnOutput(this, "BootstrapBrokerStringSaslIam", new CfnOutputProps { Value = cluster.BootstrapBrokersSaslIam });
new CfnOutput(this, "ZookeeperConnection", new CfnOutputProps { Value = cluster.ZookeeperConnectionString });
new CfnOutput(this, "ZookeeperConnectionTls", new CfnOutputProps { Value = cluster.ZookeeperConnectionStringTls });
Importing an existing Cluster
To import an existing MSK cluster into your CDK app use the .fromClusterArn()
method.
var cluster = Cluster.FromClusterArn(this, "Cluster", "arn:aws:kafka:us-west-2:1234567890:cluster/a-cluster/11111111-1111-1111-1111-111111111111-1");
Client Authentication
MSK supports the following authentication mechanisms.
TLS
To enable client authentication with TLS set the certificateAuthorityArns
property to reference your ACM Private CA. More info on Private CAs.
using Amazon.CDK.AWS.ACMPCA;
Vpc vpc;
var cluster = new Cluster(this, "Cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc,
EncryptionInTransit = new EncryptionInTransitConfig {
ClientBroker = ClientBrokerEncryption.TLS
},
ClientAuthentication = ClientAuthentication.Tls(new TlsAuthProps {
CertificateAuthorities = new [] { CertificateAuthority.FromCertificateAuthorityArn(this, "CertificateAuthority", "arn:aws:acm-pca:us-west-2:1234567890:certificate-authority/11111111-1111-1111-1111-111111111111") }
})
});
SASL/SCRAM
Enable client authentication with SASL/SCRAM:
Vpc vpc;
var cluster = new Cluster(this, "cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc,
EncryptionInTransit = new EncryptionInTransitConfig {
ClientBroker = ClientBrokerEncryption.TLS
},
ClientAuthentication = ClientAuthentication.Sasl(new SaslAuthProps {
Scram = true
})
});
SASL/IAM
Enable client authentication with IAM:
Vpc vpc;
var cluster = new Cluster(this, "cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc,
EncryptionInTransit = new EncryptionInTransitConfig {
ClientBroker = ClientBrokerEncryption.TLS
},
ClientAuthentication = ClientAuthentication.Sasl(new SaslAuthProps {
Iam = true
})
});
SASL/IAM + TLS
Enable client authentication with IAM
as well as enable client authentication with TLS by setting the certificateAuthorityArns
property to reference your ACM Private CA. More info on Private CAs.
using Amazon.CDK.AWS.ACMPCA;
Vpc vpc;
var cluster = new Cluster(this, "Cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc,
EncryptionInTransit = new EncryptionInTransitConfig {
ClientBroker = ClientBrokerEncryption.TLS
},
ClientAuthentication = ClientAuthentication.SaslTls(new SaslTlsAuthProps {
Iam = true,
CertificateAuthorities = new [] { CertificateAuthority.FromCertificateAuthorityArn(this, "CertificateAuthority", "arn:aws:acm-pca:us-west-2:1234567890:certificate-authority/11111111-1111-1111-1111-111111111111") }
})
});
Logging
You can deliver Apache Kafka broker logs to one or more of the following destination types: Amazon CloudWatch Logs, Amazon S3, Amazon Kinesis Data Firehose.
To configure logs to be sent to an S3 bucket, provide a bucket in the logging
config.
Vpc vpc;
IBucket bucket;
var cluster = new Cluster(this, "cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V2_8_1,
Vpc = vpc,
Logging = new BrokerLogging {
S3 = new S3LoggingConfiguration {
Bucket = bucket
}
}
});
When the S3 destination is configured, AWS will automatically create an S3 bucket policy
that allows the service to write logs to the bucket. This makes it impossible to later update
that bucket policy. To have CDK create the bucket policy so that future updates can be made,
the @aws-cdk/aws-s3:createDefaultLoggingPolicy
feature flag can be used. This can be set
in the cdk.json
file.
{
"context": {
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true
}
}
Storage Mode
You can configure an MSK cluster storage mode using the storageMode
property.
Tiered storage is a low-cost storage tier for Amazon MSK that scales to virtually unlimited storage, making it cost-effective to build streaming data applications.
Visit <a href="https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html">Tiered storage</a>
to see the list of compatible Kafka versions and for more details.
Vpc vpc;
IBucket bucket;
var cluster = new Cluster(this, "cluster", new ClusterProps {
ClusterName = "myCluster",
KafkaVersion = KafkaVersion.V3_6_0,
Vpc = vpc,
StorageMode = StorageMode.TIERED
});
Classes
BrokerLogging | (experimental) Configuration details related to broker logs. |
ClientAuthentication | (experimental) Configuration properties for client authentication. |
ClientBrokerEncryption | (experimental) Indicates the encryption setting for data in transit between clients and brokers. |
Cluster | (experimental) Create a MSK Cluster. |
ClusterConfigurationInfo | (experimental) The Amazon MSK configuration to use for the cluster. |
ClusterMonitoringLevel | (experimental) The level of monitoring for the MSK cluster. |
ClusterProps | (experimental) Properties for a MSK Cluster. |
EbsStorageInfo | (experimental) EBS volume information. |
EncryptionInTransitConfig | (experimental) The settings for encrypting data in transit. |
KafkaVersion | (experimental) Kafka cluster version. |
MonitoringConfiguration | (experimental) Monitoring Configuration. |
S3LoggingConfiguration | (experimental) Details of the Amazon S3 destination for broker logs. |
SaslAuthProps | (experimental) SASL authentication properties. |
SaslTlsAuthProps | (experimental) SASL + TLS authentication properties. |
StorageMode | (experimental) The storage mode for the cluster brokers. |
TlsAuthProps | (experimental) TLS authentication properties. |
Interfaces
IBrokerLogging | (experimental) Configuration details related to broker logs. |
ICluster | (experimental) Represents a MSK Cluster. |
IClusterConfigurationInfo | (experimental) The Amazon MSK configuration to use for the cluster. |
IClusterProps | (experimental) Properties for a MSK Cluster. |
IEbsStorageInfo | (experimental) EBS volume information. |
IEncryptionInTransitConfig | (experimental) The settings for encrypting data in transit. |
IMonitoringConfiguration | (experimental) Monitoring Configuration. |
IS3LoggingConfiguration | (experimental) Details of the Amazon S3 destination for broker logs. |
ISaslAuthProps | (experimental) SASL authentication properties. |
ISaslTlsAuthProps | (experimental) SASL + TLS authentication properties. |
ITlsAuthProps | (experimental) TLS authentication properties. |