Getting started with Amazon MSK
The following code example shows how to:
Create an MSK cluster
Create IAM permissions for MSK access
Create a client machine
Get bootstrap brokers
Set up the client machine
Create a topic and produce/consume data
Clean up resources
- Bash
-
- Amazon CLI with Bash script
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Sample developer tutorials
repository. #!/bin/bash # Amazon MSK Getting Started Tutorial Script - Version 8 # This script automates the steps in the Amazon MSK Getting Started tutorial # It creates an MSK cluster, sets up IAM permissions, creates a client machine, # and configures the client to interact with the cluster # Set up logging LOG_FILE="msk_tutorial_$(date +%Y%m%d_%H%M%S).log" exec > >(tee -a "$LOG_FILE") 2>&1 echo "Starting Amazon MSK Getting Started Tutorial Script - Version 8" echo "Logging to $LOG_FILE" echo "==============================================" # Function to handle errors handle_error() { echo "ERROR: $1" echo "Resources created so far:" if [ -n "$CLUSTER_ARN" ]; then echo "- MSK Cluster: $CLUSTER_ARN"; fi if [ -n "$POLICY_ARN" ]; then echo "- IAM Policy: $POLICY_ARN"; fi if [ -n "$ROLE_NAME" ]; then echo "- IAM Role: $ROLE_NAME"; fi if [ -n "$INSTANCE_PROFILE_NAME" ]; then echo "- IAM Instance Profile: $INSTANCE_PROFILE_NAME"; fi if [ -n "$CLIENT_SG_ID" ]; then echo "- Client Security Group: $CLIENT_SG_ID"; fi if [ -n "$INSTANCE_ID" ]; then echo "- EC2 Instance: $INSTANCE_ID"; fi if [ -n "$KEY_NAME" ]; then echo "- Key Pair: $KEY_NAME"; fi echo "Attempting to clean up resources..." cleanup_resources exit 1 } # Function to check if a resource exists resource_exists() { local resource_type="$1" local resource_id="$2" case "$resource_type" in "cluster") aws kafka describe-cluster --cluster-arn "$resource_id" &>/dev/null ;; "policy") aws iam get-policy --policy-arn "$resource_id" &>/dev/null ;; "role") aws iam get-role --role-name "$resource_id" &>/dev/null ;; "instance-profile") aws iam get-instance-profile --instance-profile-name "$resource_id" &>/dev/null ;; "security-group") aws ec2 describe-security-groups --group-ids "$resource_id" &>/dev/null ;; "instance") aws ec2 describe-instances --instance-ids "$resource_id" --query 'Reservations[0].Instances[0].State.Name' --output text | grep -v "terminated" &>/dev/null ;; "key-pair") aws ec2 describe-key-pairs --key-names "$resource_id" &>/dev/null ;; esac } # Function to remove security group references remove_security_group_references() { local sg_id="$1" if [ -z "$sg_id" ]; then echo "No security group ID provided for reference removal" return fi echo "Removing security group references for $sg_id" # Get all security groups in the VPC that might reference our client security group local vpc_security_groups=$(aws ec2 describe-security-groups \ --filters "Name=vpc-id,Values=$DEFAULT_VPC_ID" \ --query 'SecurityGroups[].GroupId' \ --output text 2>/dev/null) if [ -n "$vpc_security_groups" ]; then for other_sg in $vpc_security_groups; do if [ "$other_sg" != "$sg_id" ]; then echo "Checking security group $other_sg for references to $sg_id" # Get the security group details in JSON format local sg_details=$(aws ec2 describe-security-groups \ --group-ids "$other_sg" \ --output json 2>/dev/null) if [ -n "$sg_details" ]; then # Check if our security group is referenced in inbound rules local has_inbound_ref=$(echo "$sg_details" | grep -o "\"GroupId\": \"$sg_id\"" | head -1) if [ -n "$has_inbound_ref" ]; then echo "Found inbound rules in $other_sg referencing $sg_id, removing them..." # Try to remove common rule types echo "Attempting to remove all-traffic rule" aws ec2 revoke-security-group-ingress \ --group-id "$other_sg" \ --protocol all \ --source-group "$sg_id" 2>/dev/null || echo "No all-traffic rule to remove" # Try to remove TCP rules on common ports for port in 22 80 443 9092 9094 9096; do aws ec2 revoke-security-group-ingress \ --group-id "$other_sg" \ --protocol tcp \ --port "$port" \ --source-group "$sg_id" 2>/dev/null || true done # Try to remove UDP rules aws ec2 revoke-security-group-ingress \ --group-id "$other_sg" \ --protocol udp \ --source-group "$sg_id" 2>/dev/null || true fi # Check for outbound rules (less common but possible) local has_outbound_ref=$(echo "$sg_details" | grep -A 20 "IpPermissionsEgress" | grep -o "\"GroupId\": \"$sg_id\"" | head -1) if [ -n "$has_outbound_ref" ]; then echo "Found outbound rules in $other_sg referencing $sg_id, removing them..." aws ec2 revoke-security-group-egress \ --group-id "$other_sg" \ --protocol all \ --source-group "$sg_id" 2>/dev/null || echo "No outbound all-traffic rule to remove" fi fi fi done fi echo "Completed security group reference removal for $sg_id" } # Function to clean up resources cleanup_resources() { echo "Cleaning up resources..." # Delete EC2 instance if it exists if [ -n "$INSTANCE_ID" ] && resource_exists "instance" "$INSTANCE_ID"; then echo "Terminating EC2 instance: $INSTANCE_ID" aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" || echo "Failed to terminate instance" echo "Waiting for instance to terminate..." aws ec2 wait instance-terminated --instance-ids "$INSTANCE_ID" || echo "Failed to wait for instance termination" fi # Delete MSK cluster first (to remove dependencies on security group) if [ -n "$CLUSTER_ARN" ] && resource_exists "cluster" "$CLUSTER_ARN"; then echo "Deleting MSK cluster: $CLUSTER_ARN" aws kafka delete-cluster --cluster-arn "$CLUSTER_ARN" || echo "Failed to delete cluster" # Wait a bit for the cluster deletion to start echo "Waiting 30 seconds for cluster deletion to begin..." sleep 30 fi # Remove security group references before attempting deletion if [ -n "$CLIENT_SG_ID" ] && resource_exists "security-group" "$CLIENT_SG_ID"; then remove_security_group_references "$CLIENT_SG_ID" echo "Deleting security group: $CLIENT_SG_ID" # Try multiple times with longer delays to ensure dependencies are removed for i in {1..10}; do if aws ec2 delete-security-group --group-id "$CLIENT_SG_ID"; then echo "Security group deleted successfully" break fi echo "Failed to delete security group (attempt $i/10), retrying in 30 seconds..." sleep 30 done fi # Delete key pair if it exists if [ -n "$KEY_NAME" ] && resource_exists "key-pair" "$KEY_NAME"; then echo "Deleting key pair: $KEY_NAME" aws ec2 delete-key-pair --key-name "$KEY_NAME" || echo "Failed to delete key pair" fi # Remove role from instance profile if [ -n "$ROLE_NAME" ] && [ -n "$INSTANCE_PROFILE_NAME" ] && resource_exists "instance-profile" "$INSTANCE_PROFILE_NAME"; then echo "Removing role from instance profile" aws iam remove-role-from-instance-profile \ --instance-profile-name "$INSTANCE_PROFILE_NAME" \ --role-name "$ROLE_NAME" || echo "Failed to remove role from instance profile" fi # Delete instance profile if [ -n "$INSTANCE_PROFILE_NAME" ] && resource_exists "instance-profile" "$INSTANCE_PROFILE_NAME"; then echo "Deleting instance profile: $INSTANCE_PROFILE_NAME" aws iam delete-instance-profile \ --instance-profile-name "$INSTANCE_PROFILE_NAME" || echo "Failed to delete instance profile" fi # Detach policy from role if [ -n "$ROLE_NAME" ] && [ -n "$POLICY_ARN" ] && resource_exists "role" "$ROLE_NAME"; then echo "Detaching policy from role" aws iam detach-role-policy \ --role-name "$ROLE_NAME" \ --policy-arn "$POLICY_ARN" || echo "Failed to detach policy" fi # Delete role if [ -n "$ROLE_NAME" ] && resource_exists "role" "$ROLE_NAME"; then echo "Deleting role: $ROLE_NAME" aws iam delete-role --role-name "$ROLE_NAME" || echo "Failed to delete role" fi # Delete policy if [ -n "$POLICY_ARN" ] && resource_exists "policy" "$POLICY_ARN"; then echo "Deleting policy: $POLICY_ARN" aws iam delete-policy --policy-arn "$POLICY_ARN" || echo "Failed to delete policy" fi echo "Cleanup completed" } # Function to find a suitable subnet and instance type combination find_suitable_subnet_and_instance_type() { local vpc_id="$1" local -a subnet_array=("${!2}") # List of instance types to try, in order of preference local instance_types=("t3.micro" "t2.micro" "t3.small" "t2.small") echo "Finding suitable subnet and instance type combination..." for instance_type in "${instance_types[@]}"; do echo "Trying instance type: $instance_type" for subnet_id in "${subnet_array[@]}"; do # Get the availability zone for this subnet local az=$(aws ec2 describe-subnets \ --subnet-ids "$subnet_id" \ --query 'Subnets[0].AvailabilityZone' \ --output text) echo " Checking subnet $subnet_id in AZ $az" # Check if this instance type is available in this AZ local available=$(aws ec2 describe-instance-type-offerings \ --location-type availability-zone \ --filters "Name=location,Values=$az" "Name=instance-type,Values=$instance_type" \ --query 'InstanceTypeOfferings[0].InstanceType' \ --output text 2>/dev/null) if [ "$available" = "$instance_type" ]; then echo " ✓ Found suitable combination: $instance_type in $az (subnet: $subnet_id)" SELECTED_SUBNET_ID="$subnet_id" SELECTED_INSTANCE_TYPE="$instance_type" return 0 else echo " ✗ $instance_type not available in $az" fi done done echo "ERROR: Could not find any suitable subnet and instance type combination" return 1 } # Generate unique identifiers RANDOM_SUFFIX=$(LC_ALL=C tr -dc 'a-z0-9' < /dev/urandom | fold -w 8 | head -n 1) CLUSTER_NAME="MSKTutorialCluster-${RANDOM_SUFFIX}" POLICY_NAME="msk-tutorial-policy-${RANDOM_SUFFIX}" ROLE_NAME="msk-tutorial-role-${RANDOM_SUFFIX}" INSTANCE_PROFILE_NAME="msk-tutorial-profile-${RANDOM_SUFFIX}" SG_NAME="MSKClientSecurityGroup-${RANDOM_SUFFIX}" echo "Using the following resource names:" echo "- Cluster Name: $CLUSTER_NAME" echo "- Policy Name: $POLICY_NAME" echo "- Role Name: $ROLE_NAME" echo "- Instance Profile Name: $INSTANCE_PROFILE_NAME" echo "- Security Group Name: $SG_NAME" echo "==============================================" # Step 1: Create an MSK Provisioned cluster echo "Step 1: Creating MSK Provisioned cluster" # Get the default VPC ID first echo "Getting default VPC..." DEFAULT_VPC_ID=$(aws ec2 describe-vpcs \ --filters "Name=is-default,Values=true" \ --query "Vpcs[0].VpcId" \ --output text) if [ -z "$DEFAULT_VPC_ID" ] || [ "$DEFAULT_VPC_ID" = "None" ]; then handle_error "Could not find default VPC. Please ensure you have a default VPC in your region." fi echo "Default VPC ID: $DEFAULT_VPC_ID" # Get available subnets in the default VPC echo "Getting available subnets in the default VPC..." SUBNETS=$(aws ec2 describe-subnets \ --filters "Name=vpc-id,Values=$DEFAULT_VPC_ID" "Name=default-for-az,Values=true" \ --query "Subnets[0:3].SubnetId" \ --output text) # Convert space-separated subnet IDs to an array read -r -a SUBNET_ARRAY <<< "$SUBNETS" if [ ${#SUBNET_ARRAY[@]} -lt 3 ]; then handle_error "Not enough subnets available in the default VPC. Need at least 3 subnets, found ${#SUBNET_ARRAY[@]}." fi # Get default security group for the default VPC echo "Getting default security group for the default VPC..." DEFAULT_SG=$(aws ec2 describe-security-groups \ --filters "Name=group-name,Values=default" "Name=vpc-id,Values=$DEFAULT_VPC_ID" \ --query "SecurityGroups[0].GroupId" \ --output text) if [ -z "$DEFAULT_SG" ] || [ "$DEFAULT_SG" = "None" ]; then handle_error "Could not find default security group for VPC $DEFAULT_VPC_ID" fi echo "Creating MSK cluster: $CLUSTER_NAME" echo "Using VPC: $DEFAULT_VPC_ID" echo "Using subnets: ${SUBNET_ARRAY[0]} ${SUBNET_ARRAY[1]} ${SUBNET_ARRAY[2]}" echo "Using security group: $DEFAULT_SG" # Create the MSK cluster with proper error handling CLUSTER_RESPONSE=$(aws kafka create-cluster \ --cluster-name "$CLUSTER_NAME" \ --broker-node-group-info "{\"InstanceType\": \"kafka.t3.small\", \"ClientSubnets\": [\"${SUBNET_ARRAY[0]}\", \"${SUBNET_ARRAY[1]}\", \"${SUBNET_ARRAY[2]}\"], \"SecurityGroups\": [\"$DEFAULT_SG\"]}" \ --kafka-version "3.6.0" \ --number-of-broker-nodes 3 \ --encryption-info "{\"EncryptionInTransit\": {\"InCluster\": true, \"ClientBroker\": \"TLS\"}}" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create MSK cluster: $CLUSTER_RESPONSE" fi # Extract the cluster ARN using grep CLUSTER_ARN=$(echo "$CLUSTER_RESPONSE" | grep -o '"ClusterArn": "[^"]*' | cut -d'"' -f4) if [ -z "$CLUSTER_ARN" ]; then handle_error "Failed to extract cluster ARN from response: $CLUSTER_RESPONSE" fi echo "MSK cluster creation initiated. ARN: $CLUSTER_ARN" echo "Waiting for cluster to become active (this may take 15-20 minutes)..." # Wait for the cluster to become active while true; do CLUSTER_STATUS=$(aws kafka describe-cluster --cluster-arn "$CLUSTER_ARN" --query "ClusterInfo.State" --output text 2>/dev/null) if [ $? -ne 0 ]; then echo "Failed to get cluster status. Retrying in 30 seconds..." sleep 30 continue fi echo "Current cluster status: $CLUSTER_STATUS" if [ "$CLUSTER_STATUS" = "ACTIVE" ]; then echo "Cluster is now active!" break elif [ "$CLUSTER_STATUS" = "FAILED" ]; then handle_error "Cluster creation failed" fi echo "Still waiting for cluster to become active... (checking again in 60 seconds)" sleep 60 done echo "==============================================" # Step 2: Create an IAM role granting access to create topics on the Amazon MSK cluster echo "Step 2: Creating IAM policy and role" # Get account ID and region ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text) REGION=$(aws configure get region) if [ -z "$REGION" ]; then REGION=$(aws ec2 describe-availability-zones --query 'AvailabilityZones[0].RegionName' --output text) fi if [ -z "$ACCOUNT_ID" ] || [ -z "$REGION" ]; then handle_error "Could not determine AWS account ID or region" fi echo "Account ID: $ACCOUNT_ID" echo "Region: $REGION" # Create IAM policy echo "Creating IAM policy: $POLICY_NAME" POLICY_DOCUMENT="{ \"Version\": \"2012-10-17\", \"Statement\": [ { \"Effect\": \"Allow\", \"Action\": [ \"kafka-cluster:Connect\", \"kafka-cluster:AlterCluster\", \"kafka-cluster:DescribeCluster\" ], \"Resource\": [ \"$CLUSTER_ARN\" ] }, { \"Effect\": \"Allow\", \"Action\": [ \"kafka-cluster:*Topic*\", \"kafka-cluster:WriteData\", \"kafka-cluster:ReadData\" ], \"Resource\": [ \"arn:aws:kafka:$REGION:$ACCOUNT_ID:topic/$CLUSTER_NAME/*\" ] }, { \"Effect\": \"Allow\", \"Action\": [ \"kafka-cluster:AlterGroup\", \"kafka-cluster:DescribeGroup\" ], \"Resource\": [ \"arn:aws:kafka:$REGION:$ACCOUNT_ID:group/$CLUSTER_NAME/*\" ] } ] }" POLICY_RESPONSE=$(aws iam create-policy \ --policy-name "$POLICY_NAME" \ --policy-document "$POLICY_DOCUMENT" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create IAM policy: $POLICY_RESPONSE" fi # Extract the policy ARN using grep POLICY_ARN=$(echo "$POLICY_RESPONSE" | grep -o '"Arn": "[^"]*' | cut -d'"' -f4) if [ -z "$POLICY_ARN" ]; then handle_error "Failed to extract policy ARN from response: $POLICY_RESPONSE" fi echo "IAM policy created. ARN: $POLICY_ARN" # Create IAM role for EC2 echo "Creating IAM role: $ROLE_NAME" TRUST_POLICY="{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"ec2.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" ROLE_RESPONSE=$(aws iam create-role \ --role-name "$ROLE_NAME" \ --assume-role-policy-document "$TRUST_POLICY" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create IAM role: $ROLE_RESPONSE" fi echo "IAM role created: $ROLE_NAME" # Attach policy to role echo "Attaching policy to role" ATTACH_RESPONSE=$(aws iam attach-role-policy \ --role-name "$ROLE_NAME" \ --policy-arn "$POLICY_ARN" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to attach policy to role: $ATTACH_RESPONSE" fi echo "Policy attached to role" # Create instance profile and add role to it echo "Creating instance profile: $INSTANCE_PROFILE_NAME" PROFILE_RESPONSE=$(aws iam create-instance-profile \ --instance-profile-name "$INSTANCE_PROFILE_NAME" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create instance profile: $PROFILE_RESPONSE" fi echo "Instance profile created" echo "Adding role to instance profile" ADD_ROLE_RESPONSE=$(aws iam add-role-to-instance-profile \ --instance-profile-name "$INSTANCE_PROFILE_NAME" \ --role-name "$ROLE_NAME" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to add role to instance profile: $ADD_ROLE_RESPONSE" fi echo "Role added to instance profile" # Wait a moment for IAM propagation echo "Waiting 10 seconds for IAM propagation..." sleep 10 echo "==============================================" # Step 3: Create a client machine echo "Step 3: Creating client machine" # Find a suitable subnet and instance type combination if ! find_suitable_subnet_and_instance_type "$DEFAULT_VPC_ID" SUBNET_ARRAY[@]; then handle_error "Could not find a suitable subnet and instance type combination" fi echo "Selected subnet: $SELECTED_SUBNET_ID" echo "Selected instance type: $SELECTED_INSTANCE_TYPE" # Verify the subnet is in the same VPC we're using SUBNET_VPC_ID=$(aws ec2 describe-subnets \ --subnet-ids "$SELECTED_SUBNET_ID" \ --query 'Subnets[0].VpcId' \ --output text) if [ "$SUBNET_VPC_ID" != "$DEFAULT_VPC_ID" ]; then handle_error "Subnet VPC ($SUBNET_VPC_ID) does not match default VPC ($DEFAULT_VPC_ID)" fi echo "VPC ID: $SUBNET_VPC_ID" # Get security group ID from the MSK cluster echo "Getting security group ID from the MSK cluster" MSK_SG_ID=$(aws kafka describe-cluster \ --cluster-arn "$CLUSTER_ARN" \ --query 'ClusterInfo.BrokerNodeGroupInfo.SecurityGroups[0]' \ --output text) if [ -z "$MSK_SG_ID" ] || [ "$MSK_SG_ID" = "None" ]; then handle_error "Failed to get security group ID from cluster" fi echo "MSK security group ID: $MSK_SG_ID" # Create security group for client echo "Creating security group for client: $SG_NAME" CLIENT_SG_RESPONSE=$(aws ec2 create-security-group \ --group-name "$SG_NAME" \ --description "Security group for MSK client" \ --vpc-id "$DEFAULT_VPC_ID" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create security group: $CLIENT_SG_RESPONSE" fi # Extract the security group ID using grep CLIENT_SG_ID=$(echo "$CLIENT_SG_RESPONSE" | grep -o '"GroupId": "[^"]*' | cut -d'"' -f4) if [ -z "$CLIENT_SG_ID" ]; then handle_error "Failed to extract security group ID from response: $CLIENT_SG_RESPONSE" fi echo "Client security group created. ID: $CLIENT_SG_ID" # Allow SSH access to client from your IP only echo "Getting your public IP address" MY_IP=$(curl -s https://checkip.amazonaws.com 2>/dev/null) if [ -z "$MY_IP" ]; then echo "Warning: Could not determine your IP address. Using 0.0.0.0/0 (not recommended for production)" MY_IP="0.0.0.0/0" else MY_IP="$MY_IP/32" echo "Your IP address: $MY_IP" fi echo "Adding SSH ingress rule to client security group" SSH_RULE_RESPONSE=$(aws ec2 authorize-security-group-ingress \ --group-id "$CLIENT_SG_ID" \ --protocol tcp \ --port 22 \ --cidr "$MY_IP" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then echo "Warning: Failed to add SSH ingress rule: $SSH_RULE_RESPONSE" echo "You may need to manually add SSH access to security group $CLIENT_SG_ID" fi echo "SSH ingress rule added" # Update MSK security group to allow traffic from client security group echo "Adding ingress rule to MSK security group to allow traffic from client" MSK_RULE_RESPONSE=$(aws ec2 authorize-security-group-ingress \ --group-id "$MSK_SG_ID" \ --protocol all \ --source-group "$CLIENT_SG_ID" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then echo "Warning: Failed to add ingress rule to MSK security group: $MSK_RULE_RESPONSE" echo "You may need to manually add ingress rule to security group $MSK_SG_ID" fi echo "Ingress rule added to MSK security group" # Create key pair KEY_NAME="MSKKeyPair-${RANDOM_SUFFIX}" echo "Creating key pair: $KEY_NAME" KEY_RESPONSE=$(aws ec2 create-key-pair --key-name "$KEY_NAME" --query 'KeyMaterial' --output text 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to create key pair: $KEY_RESPONSE" fi # Save the private key to a file KEY_FILE="${KEY_NAME}.pem" echo "$KEY_RESPONSE" > "$KEY_FILE" chmod 400 "$KEY_FILE" echo "Key pair created and saved to $KEY_FILE" # Get the latest Amazon Linux 2 AMI echo "Getting latest Amazon Linux 2 AMI ID" AMI_ID=$(aws ec2 describe-images \ --owners amazon \ --filters "Name=name,Values=amzn2-ami-hvm-*-x86_64-gp2" "Name=state,Values=available" \ --query "sort_by(Images, &CreationDate)[-1].ImageId" \ --output text 2>/dev/null) if [ -z "$AMI_ID" ] || [ "$AMI_ID" = "None" ]; then handle_error "Failed to get Amazon Linux 2 AMI ID" fi echo "Using AMI ID: $AMI_ID" # Launch EC2 instance with the selected subnet and instance type echo "Launching EC2 instance" echo "Instance type: $SELECTED_INSTANCE_TYPE" echo "Subnet: $SELECTED_SUBNET_ID" INSTANCE_RESPONSE=$(aws ec2 run-instances \ --image-id "$AMI_ID" \ --instance-type "$SELECTED_INSTANCE_TYPE" \ --key-name "$KEY_NAME" \ --security-group-ids "$CLIENT_SG_ID" \ --subnet-id "$SELECTED_SUBNET_ID" \ --iam-instance-profile "Name=$INSTANCE_PROFILE_NAME" \ --tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=MSKTutorialClient-${RANDOM_SUFFIX}}]" 2>&1) # Check if the command was successful if [ $? -ne 0 ]; then handle_error "Failed to launch EC2 instance: $INSTANCE_RESPONSE" fi # Extract the instance ID using grep INSTANCE_ID=$(echo "$INSTANCE_RESPONSE" | grep -o '"InstanceId": "[^"]*' | head -1 | cut -d'"' -f4) if [ -z "$INSTANCE_ID" ]; then handle_error "Failed to extract instance ID from response: $INSTANCE_RESPONSE" fi echo "EC2 instance launched successfully. ID: $INSTANCE_ID" echo "Waiting for instance to be running..." # Wait for the instance to be running aws ec2 wait instance-running --instance-ids "$INSTANCE_ID" if [ $? -ne 0 ]; then handle_error "Instance failed to reach running state" fi # Wait a bit more for the instance to initialize echo "Instance is running. Waiting 30 seconds for initialization..." sleep 30 # Get public DNS name of instance CLIENT_DNS=$(aws ec2 describe-instances \ --instance-ids "$INSTANCE_ID" \ --query 'Reservations[0].Instances[0].PublicDnsName' \ --output text) if [ -z "$CLIENT_DNS" ] || [ "$CLIENT_DNS" = "None" ]; then echo "Warning: Could not get public DNS name for instance. Trying public IP..." CLIENT_DNS=$(aws ec2 describe-instances \ --instance-ids "$INSTANCE_ID" \ --query 'Reservations[0].Instances[0].PublicIpAddress' \ --output text) if [ -z "$CLIENT_DNS" ] || [ "$CLIENT_DNS" = "None" ]; then handle_error "Failed to get public DNS name or IP address for instance" fi fi echo "Client instance DNS/IP: $CLIENT_DNS" echo "==============================================" # Get bootstrap brokers with improved logic echo "Getting bootstrap brokers" MAX_RETRIES=10 RETRY_COUNT=0 BOOTSTRAP_BROKERS="" AUTH_METHOD="" while [ -z "$BOOTSTRAP_BROKERS" ] || [ "$BOOTSTRAP_BROKERS" = "None" ]; do # Get the full bootstrap brokers response BOOTSTRAP_RESPONSE=$(aws kafka get-bootstrap-brokers \ --cluster-arn "$CLUSTER_ARN" 2>/dev/null) if [ $? -eq 0 ] && [ -n "$BOOTSTRAP_RESPONSE" ]; then # Try to get IAM authentication brokers first using grep BOOTSTRAP_BROKERS=$(echo "$BOOTSTRAP_RESPONSE" | grep -o '"BootstrapBrokerStringSaslIam": "[^"]*' | cut -d'"' -f4) if [ -n "$BOOTSTRAP_BROKERS" ]; then AUTH_METHOD="IAM" else # Fall back to TLS authentication BOOTSTRAP_BROKERS=$(echo "$BOOTSTRAP_RESPONSE" | grep -o '"BootstrapBrokerStringTls": "[^"]*' | cut -d'"' -f4) if [ -n "$BOOTSTRAP_BROKERS" ]; then AUTH_METHOD="TLS" fi fi fi RETRY_COUNT=$((RETRY_COUNT + 1)) if [ "$RETRY_COUNT" -ge "$MAX_RETRIES" ]; then echo "Warning: Could not get bootstrap brokers after $MAX_RETRIES attempts." echo "You may need to manually retrieve them later using:" echo "aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN" BOOTSTRAP_BROKERS="BOOTSTRAP_BROKERS_NOT_AVAILABLE" AUTH_METHOD="UNKNOWN" break fi if [ -z "$BOOTSTRAP_BROKERS" ] || [ "$BOOTSTRAP_BROKERS" = "None" ]; then echo "Bootstrap brokers not available yet. Retrying in 30 seconds... (Attempt $RETRY_COUNT/$MAX_RETRIES)" sleep 30 fi done echo "Bootstrap brokers: $BOOTSTRAP_BROKERS" echo "Authentication method: $AUTH_METHOD" echo "==============================================" # Create setup script for the client machine echo "Creating setup script for the client machine" cat > setup_client.sh << 'EOF' #!/bin/bash # Set up logging LOG_FILE="client_setup_$(date +%Y%m%d_%H%M%S).log" exec > >(tee -a "$LOG_FILE") 2>&1 echo "Starting client setup" echo "==============================================" # Install Java echo "Installing Java" sudo yum -y install java-11 # Set environment variables echo "Setting up environment variables" export KAFKA_VERSION="3.6.0" echo "KAFKA_VERSION=$KAFKA_VERSION" # Download and extract Apache Kafka echo "Downloading Apache Kafka" wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz if [ $? -ne 0 ]; then echo "Failed to download Kafka. Trying alternative mirror..." wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz fi echo "Extracting Kafka" tar -xzf kafka_2.13-$KAFKA_VERSION.tgz export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION echo "KAFKA_ROOT=$KAFKA_ROOT" # Download the MSK IAM authentication package (needed for both IAM and TLS) echo "Downloading MSK IAM authentication package" cd $KAFKA_ROOT/libs wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-1.1.6-all.jar if [ $? -ne 0 ]; then echo "Failed to download specific version. Trying to get latest version..." LATEST_VERSION=$(curl -s https://api.github.com/repos/aws/aws-msk-iam-auth/releases/latest | grep -o '"tag_name": "[^"]*' | cut -d'"' -f4) wget https://github.com/aws/aws-msk-iam-auth/releases/download/$LATEST_VERSION/aws-msk-iam-auth-$LATEST_VERSION-all.jar if [ $? -ne 0 ]; then echo "Failed to download IAM auth package. Please check the URL and try again." exit 1 fi export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-$LATEST_VERSION-all.jar else export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-1.1.6-all.jar fi echo "CLASSPATH=$CLASSPATH" # Create client properties file based on authentication method echo "Creating client properties file" cd $KAFKA_ROOT/config # The AUTH_METHOD_PLACEHOLDER will be replaced by the script AUTH_METHOD="AUTH_METHOD_PLACEHOLDER" if [ "$AUTH_METHOD" = "IAM" ]; then echo "Configuring for IAM authentication" cat > client.properties << 'EOT' security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler EOT elif [ "$AUTH_METHOD" = "TLS" ]; then echo "Configuring for TLS authentication" cat > client.properties << 'EOT' security.protocol=SSL EOT else echo "Unknown authentication method. Creating basic TLS configuration." cat > client.properties << 'EOT' security.protocol=SSL EOT fi echo "Client setup completed" echo "==============================================" # Create a script to set up environment variables cat > ~/setup_env.sh << 'EOT' #!/bin/bash export KAFKA_VERSION="3.6.0" export KAFKA_ROOT=~/kafka_2.13-$KAFKA_VERSION export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-1.1.6-all.jar export BOOTSTRAP_SERVER="BOOTSTRAP_SERVER_PLACEHOLDER" export AUTH_METHOD="AUTH_METHOD_PLACEHOLDER" echo "Environment variables set:" echo "KAFKA_VERSION=$KAFKA_VERSION" echo "KAFKA_ROOT=$KAFKA_ROOT" echo "CLASSPATH=$CLASSPATH" echo "BOOTSTRAP_SERVER=$BOOTSTRAP_SERVER" echo "AUTH_METHOD=$AUTH_METHOD" EOT chmod +x ~/setup_env.sh echo "Created environment setup script: ~/setup_env.sh" echo "Run 'source ~/setup_env.sh' to set up your environment" EOF # Replace placeholders in the setup script if [ -n "$BOOTSTRAP_BROKERS" ] && [ "$BOOTSTRAP_BROKERS" != "None" ] && [ "$BOOTSTRAP_BROKERS" != "BOOTSTRAP_BROKERS_NOT_AVAILABLE" ]; then sed -i "s|BOOTSTRAP_SERVER_PLACEHOLDER|$BOOTSTRAP_BROKERS|g" setup_client.sh else # If bootstrap brokers are not available, provide instructions to get them sed -i "s|BOOTSTRAP_SERVER_PLACEHOLDER|\$(aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN --query 'BootstrapBrokerStringTls' --output text)|g" setup_client.sh fi # Replace auth method placeholder sed -i "s|AUTH_METHOD_PLACEHOLDER|$AUTH_METHOD|g" setup_client.sh echo "Setup script created" echo "==============================================" # Display summary of created resources echo "" echo "==============================================" echo "RESOURCE SUMMARY" echo "==============================================" echo "MSK Cluster ARN: $CLUSTER_ARN" echo "MSK Cluster Name: $CLUSTER_NAME" echo "Authentication Method: $AUTH_METHOD" echo "IAM Policy ARN: $POLICY_ARN" echo "IAM Role Name: $ROLE_NAME" echo "IAM Instance Profile: $INSTANCE_PROFILE_NAME" echo "Client Security Group: $CLIENT_SG_ID" echo "EC2 Instance ID: $INSTANCE_ID" echo "EC2 Instance Type: $SELECTED_INSTANCE_TYPE" echo "EC2 Instance DNS: $CLIENT_DNS" echo "Key Pair: $KEY_NAME (saved to $KEY_FILE)" echo "Bootstrap Brokers: $BOOTSTRAP_BROKERS" echo "==============================================" echo "" # Instructions for connecting to the instance and setting up the client echo "NEXT STEPS:" echo "1. Connect to your EC2 instance:" echo " ssh -i $KEY_FILE ec2-user@$CLIENT_DNS" echo "" echo "2. Upload the setup script to your instance:" echo " scp -i $KEY_FILE setup_client.sh ec2-user@$CLIENT_DNS:~/" echo "" echo "3. Run the setup script on your instance:" echo " ssh -i $KEY_FILE ec2-user@$CLIENT_DNS 'chmod +x ~/setup_client.sh && ~/setup_client.sh'" echo "" echo "4. Source the environment setup script:" echo " source ~/setup_env.sh" echo "" # Provide different instructions based on authentication method if [ "$AUTH_METHOD" = "IAM" ]; then echo "5. Create a Kafka topic (using IAM authentication):" echo " \$KAFKA_ROOT/bin/kafka-topics.sh --create \\" echo " --bootstrap-server \$BOOTSTRAP_SERVER \\" echo " --command-config \$KAFKA_ROOT/config/client.properties \\" echo " --replication-factor 3 \\" echo " --partitions 1 \\" echo " --topic MSKTutorialTopic" echo "" echo "6. Start a producer:" echo " \$KAFKA_ROOT/bin/kafka-console-producer.sh \\" echo " --broker-list \$BOOTSTRAP_SERVER \\" echo " --producer.config \$KAFKA_ROOT/config/client.properties \\" echo " --topic MSKTutorialTopic" echo "" echo "7. Start a consumer:" echo " \$KAFKA_ROOT/bin/kafka-console-consumer.sh \\" echo " --bootstrap-server \$BOOTSTRAP_SERVER \\" echo " --consumer.config \$KAFKA_ROOT/config/client.properties \\" echo " --topic MSKTutorialTopic \\" echo " --from-beginning" elif [ "$AUTH_METHOD" = "TLS" ]; then echo "5. Create a Kafka topic (using TLS authentication):" echo " \$KAFKA_ROOT/bin/kafka-topics.sh --create \\" echo " --bootstrap-server \$BOOTSTRAP_SERVER \\" echo " --command-config \$KAFKA_ROOT/config/client.properties \\" echo " --replication-factor 3 \\" echo " --partitions 1 \\" echo " --topic MSKTutorialTopic" echo "" echo "6. Start a producer:" echo " \$KAFKA_ROOT/bin/kafka-console-producer.sh \\" echo " --broker-list \$BOOTSTRAP_SERVER \\" echo " --producer.config \$KAFKA_ROOT/config/client.properties \\" echo " --topic MSKTutorialTopic" echo "" echo "7. Start a consumer:" echo " \$KAFKA_ROOT/bin/kafka-console-consumer.sh \\" echo " --bootstrap-server \$BOOTSTRAP_SERVER \\" echo " --consumer.config \$KAFKA_ROOT/config/client.properties \\" echo " --topic MSKTutorialTopic \\" echo " --from-beginning" else echo "5. Manually retrieve bootstrap brokers and configure authentication:" echo " aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN" fi echo "" echo "8. Verify the topic was created:" echo " \$KAFKA_ROOT/bin/kafka-topics.sh --list \\" echo " --bootstrap-server \$BOOTSTRAP_SERVER \\" echo " --command-config \$KAFKA_ROOT/config/client.properties" echo "==============================================" echo "" # Ask if user wants to clean up resources echo "" echo "===========================================" echo "CLEANUP CONFIRMATION" echo "===========================================" echo "Do you want to clean up all created resources? (y/n): " read -r CLEANUP_CHOICE if [[ $CLEANUP_CHOICE =~ ^[Yy]$ ]]; then cleanup_resources echo "All resources have been cleaned up." else echo "Resources will not be cleaned up. You can manually clean them up later." echo "To clean up resources, run this script again and choose 'y' when prompted." fi echo "Script completed successfully!"-
For API details, see the following topics in Amazon CLI Command Reference.
-
For a complete list of Amazon SDK developer guides and code examples, see Create Amazon EC2 resources using an Amazon SDK. This topic also includes information about getting started and details about previous SDK versions.
Getting started with Amazon ElastiCache
Getting started with Amazon Neptune