Using an OpenSearch Ingestion pipeline with Amazon Managed Streaming for Apache Kafka - Amazon OpenSearch Service
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using an OpenSearch Ingestion pipeline with Amazon Managed Streaming for Apache Kafka

You can use the Kafka plugin to ingest data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) into your OpenSearch Ingestion pipeline. With Amazon MSK, you can build and run applications that use Apache Kafka to process streaming data. OpenSearch Ingestion uses Amazon PrivateLink to connect to Amazon MSK.


Before you create your OpenSearch Ingestion pipeline, perform the following steps:

  1. Create an Amazon MSK cluster by following the steps in Creating a cluster in the Amazon Managed Streaming for Apache Kafka Developer Guide.

    • For Cluster type, choose Provisioned. OpenSearch Ingestion doesn't support Serverless MSK clusters.

    • For Broker type, choose any option except for t3 types, as these aren't supported by OpenSearch Ingestion.

  2. After the cluster has an Active status, follow the steps in Turn on multi-VPC connectivity.

  3. Follow the steps in Attach a cluster policy to the MSK cluster to attach one of the following policies, depending on if your cluster and pipeline are in the same Amazon Web Services account. This policy allows OpenSearch Ingestion to create a Amazon PrivateLink connection to your Amazon MSK cluster and read data from Kafka topics. Make sure that you update the resource with your own ARN.

    The following policies applies when your cluster and pipeline are in the same Amazon Web Services account:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    If your Amazon MSK cluster is in a different Amazon Web Services account than your pipeline, attach the following policy instead. The ARN for the Amazon principal should be the ARN for the same pipeline role that you provide to your pipleine YAML configuration:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. Create a Kafka topic by following the steps in Create a topic. Make sure that BootstrapServerString is one of the private endpoint (single-VPC) bootstrap URLs. The value for --replication-factor should be 2 or 3, based on the number of zones your Amazon MSK cluster has. The value for --partitions should be at least 10.

  5. Produce and consume data by following the steps in Produce and consume data. Again, make sure that BootstrapServerString is one of your private endpoint (single-VPC) bootstrap URLs.

Step 1: Configure the pipeline role

After you have your Amazon MSK cluster set up, add the following Kafka permissions in the pipeline role that you want to use in your pipeline configuration:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

Step 2: Create the pipeline

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:iam::{account-id}:role/cluster-role" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: [""] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

You can use a preconfigured Amazon MSK blueprint to create this pipeline. For more information, see Using blueprints to create a pipeline.

Step 3: (Optional) Use the Amazon Glue Schema Registry

When you use OpenSearch Ingestion with Amazon MSK, you can use the AVRO data format for schemas hosted in the Amazon Glue Schema Registry. With the Amazon Glue Schema Registry, you can centrally discover, control, and evolve data stream schemas.

To use this option, enable the schema type in your pipeline configuration:

schema: type: "aws_glue"

You must also provide Amazon Glue with read access permissions in your pipeline role. You can use the Amazon managed policy called AWSGlueSchemaRegistryReadonlyAccess. Additionally, your registry must be in the same Amazon Web Services account and Region as your OpenSearch Ingestion pipeline.

Step 4: (Optional) Configure recommended compute units (OCUs) for the Amazon MSK pipeline

Each compute unit has one consumer per topic. Brokers balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK hosts multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline.

For optimal performance, distribute your partitions across many compute units for parallel processing. If topics have a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), we recommend that you configure a pipeline with 1–96 OCUs. This is because it will automatically scale as needed. If a topic has a low number of partitions (for example, less than 96), keep the maximum compute unit the same as the number of partitions.

When a pipeline has more than one topic, choose the topic with the highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.