

# Monitoring Flink Kubernetes operator and Flink jobs
<a name="jobruns-flink-monitoring"></a>

This section describes several ways that you can monitor your Flink jobs with Amazon EMR on EKS. These include integrating Flink with the Amazon Managed Service for Prometheus, using the *Flink Web Dashboard*, which provides job status and metrics, or using a monitoring configuration to send log data to Amazon S3 and Amazon CloudWatch.

**Topics**
+ [

# Use Amazon Managed Service for Prometheus to monitor Flink jobs
](jobruns-flink-monitoring-prometheus.md)
+ [

# Use the Flink UI to monitor Flink jobs
](jobruns-flink-monitoring-ui.md)
+ [

# Use monitoring configuration to monitor Flink Kubernetes operator and Flink jobs
](jobruns-flink-monitoring-configuration.md)

# Use Amazon Managed Service for Prometheus to monitor Flink jobs
<a name="jobruns-flink-monitoring-prometheus"></a>

You can integrate Apache Flink with Amazon Managed Service for Prometheus (management portal). Amazon Managed Service for Prometheus supports ingesting metrics from Amazon Managed Service for Prometheus servers in clusters running on Amazon EKS. Amazon Managed Service for Prometheus works together with a Prometheus server already running on your Amazon EKS cluster. Running Amazon Managed Service for Prometheus integration with Amazon EMR Flink operator will automatically deploy and configure a Prometheus server to integrate with Amazon Managed Service for Prometheus.

1. [ Create an Amazon Managed Service for Prometheus Workspace](https://docs.amazonaws.cn/prometheus/latest/userguide/AMP-onboard-create-workspace.html). This workspace serves as an ingestion endpoint. You will need the remote write URL later.

1. Set up IAM roles for service accounts.

   For this method of onboarding, use IAM roles for the service accounts in the Amazon EKS cluster where the Prometheus server is running. These roles are also called *service roles*.

   If you don't already have the roles, [ set up service roles for the ingestion of metrics from Amazon EKS clusters.](https://docs.amazonaws.cn/prometheus/latest/userguide/set-up-irsa.html)

   Before you continue, create an IAM role called `amp-iamproxy-ingest-role`.

1. Install the Amazon EMR Flink Operator with Amazon Managed Service for Prometheus.

Now that you have an Amazon Managed Service for Prometheus workspace, a dedicated IAM role for Amazon Managed Service for Prometheus, and the necessary permissions, you can install the Amazon EMR Flink operator.

Create an `enable-amp.yaml` file. This file lets you use a custom configuration to override Amazon Managed Service for Prometheus settings. Make sure to use your own roles.

```
kube-prometheus-stack:
    prometheus:
    serviceAccount:
        create: true
        name: "amp-iamproxy-ingest-service-account"
        annotations:
            eks.amazonaws.com/role-arn: "arn:aws:iam::<AWS_ACCOUNT_ID>:role/amp-iamproxy-ingest-role"
    remoteWrite:
        - url: <AMAZON_MANAGED_PROMETHEUS_REMOTE_WRITE_URL>
        sigv4:
            region: <AWS_REGION>
        queueConfig:
            maxSamplesPerSend: 1000
            maxShards: 200
            capacity: 2500
```

Use the [https://helm.sh/docs/helm/helm_install/](https://helm.sh/docs/helm/helm_install/) command to pass overrides to the `flink-kubernetes-operator` chart.

```
helm upgrade -n <namespace> flink-kubernetes-operator \
   oci://public.ecr.aws/emr-on-eks/flink-kubernetes-operator \
   --set prometheus.enabled=true
   -f enable-amp.yaml
```

This command automatically installs a Prometheus reporter in the operator on port 9999. Any future `FlinkDeployment` also exposes a `metrics` port on 9249.
+ Flink operator metrics appear in Prometheus under the label `flink_k8soperator_`.
+ Flink Task Manager metrics appear in Prometheus under the label `flink_taskmanager_`.
+ Flink Job Manager metrics appear in Prometheus under the label `flink_jobmanager_`.

# Use the Flink UI to monitor Flink jobs
<a name="jobruns-flink-monitoring-ui"></a>

To monitor the health and performance of a running Flink application, use the *Flink Web Dashboard*. This dashboard provides information about the status of the job, the number of TaskManagers, and the metrics and logs for the job. It also lets you view and modify the configuration of the Flink job, and to interact with the Flink cluster to submit or cancel jobs.

To access the Flink Web Dashboard for a running Flink application on Kubernetes:

1. Use the `kubectl port-forward` command to forward a local port to the port on which the Flink Web Dashboard is running in the Flink application's TaskManager pods. By default, this port is 8081. Replace *deployment-name* with the name of the Flink application deployment from above.

   ```
   kubectl get deployments -n namespace
   ```

   Example output:

   ```
   kubectl get deployments -n flink-namespace
   NAME                        READY   UP-TO-DATE   AVAILABLE  AGE
   basic-example               1/1       1            1           11m
   flink-kubernetes-operator   1/1       1            1           21h
   ```

   ```
   kubectl port-forward deployments/deployment-name 8081 -n namespace
   ```

1. If you want to use a different port locally, use the *local-port*:8081 parameter.

   ```
   kubectl port-forward -n flink deployments/basic-example 8080:8081
   ```

1. In a web browser, navigate to `http://localhost:8081` (or `http://localhost:local-port` if you used a custom local port) to access the Flink Web Dashboard. This dashboard shows information about the running Flink application, such as the status of the job, the number of TaskManagers, and the metrics and logs for the job.  
![\[Sample Flink Dashboard UI\]](http://docs.amazonaws.cn/en_us/emr/latest/EMR-on-EKS-DevelopmentGuide/images/sample-flink-dashboard-ui.png)

# Use monitoring configuration to monitor Flink Kubernetes operator and Flink jobs
<a name="jobruns-flink-monitoring-configuration"></a>

Monitoring configuration lets you easily set up log archiving of your Flink application and operator logs to S3 and/or CloudWatch (you can choose either one or both). Doing so adds a FluentD sidecar to your JobManager and TaskManager pods and subsequently forwards these components' logs to your configured sinks.

**Note**  
You must set up IAM Roles for the service account for your Flink operator and your Flink job (Service Accounts) to be able to use this feature, as it requires interacting with other Amazon Web Services services. You must set this up using IRSA in [Setting up the Flink Kubernetes operator for Amazon EMR on EKS](jobruns-flink-kubernetes-operator-setup.md).

## Flink application logs
<a name="jobruns-flink-monitoring-configuration-application-logs"></a>

You can define this configuration in the following way.

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: FLINK IMAGE TAG
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  executionRoleArn: JOB EXECUTION ROLE
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
  monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2GB
        maxFilesToKeep: 10
```

The following are configuration options.
+ `s3MonitoringConfiguration` – configuration key to set up forwarding to S3
  + `logUri` (required) – the S3 bucket path of where you want to store your logs.
  + The path on S3 once the logs are uploaded will look like the following.
    + No log rotation enabled:

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```
    + Log rotation is enabled. You can use both a rotated file and a current file (one without the date stamp).

      ```
      s3://${logUri}/${POD NAME}/STDOUT or STDERR.gz
      ```

      The following format is an incrementing number.

      ```
      s3://${logUri}/${POD NAME}/stdout_YYYYMMDD_index.gz
      ```
  + The following IAM permissions are required to use this forwarder.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject"
        ],
        "Resource": [
           "S3_BUCKET_URI/*",
           "S3_BUCKET_URI"
        ]
    }
    ```
+ `cloudWatchMonitoringConfiguration` – configuration key to set up forwarding to CloudWatch.
  + `logGroupName` (required) – nameof the CloudWatch log group that you want to send logs to (automatically creates the group if it doesn't exist).
  + `logStreamNamePrefix` (optional) – name of the log stream that you want to send logs into. Default value is an empty string. The format is as follows:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
  + The following IAM permissions are required to use this forwarder.

    ```
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogStream",
            "logs:CreateLogGroup",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}:*",
            "arn:aws:logs:REGION:ACCOUNT-ID:log-group:{YOUR_LOG_GROUP_NAME}"
        ]
    }
    ```
+ `sideCarResources` (optional) – the configuration key to set resource limits on the launched Fluentbit sidecar container.
  + `memoryLimit` (optional) – the default value is 512Mi. Adjust according to your needs.
  + `cpuLimit` (optional) – this option doesn't have a default. Adjust according to your needs.
+ `containerLogRotationConfiguration` (optional) – controls the container log rotation behavior. It is enabled by default.
  + `rotationSize` (required) – specifies the file size for the log rotation. The range of possible values is from 2KB to 2GB. The numeric unit portion of the rotationSize parameter is passed as an integer. Since decimal values aren't supported, you can specify a rotation size of 1.5GB, for example, with the value 1500MB. The default is 2GB.
  + `maxFilesToKeep` (required) – specifies the maximum number of files to retain in container after rotation has taken place. The minimum value is 1, and the maximum value is 50. The default is 10.

## Flink operator logs
<a name="jobruns-flink-monitoring-configuration-operator-logs"></a>

We can also enable log archiving for the operator by using the following options in the `values.yaml` file in your helm chart installation. You can enable S3, CloudWatch, or both.

```
monitoringConfiguration: 
  s3MonitoringConfiguration:
    logUri: "S3-BUCKET"
    totalFileSize: "1G"
    uploadTimeout: "1m"
  cloudWatchMonitoringConfiguration:
    logGroupName: "flink-log-group"
    logStreamNamePrefix: "example-job-prefix-test-2"
  sideCarResources:
    limits:
      cpuLimit: 1
      memoryLimit: 800Mi
  memoryBufferLimit: 700M
```

The following are the available configuration options under `monitoringConfiguration`.
+ `s3MonitoringConfiguration` – set this option to archive to S3.
+ `logUri` (required) – The S3 bucket path where you want to store your logs.
+ The following are formats of what the S3 bucket paths might look like once the logs are uploaded.
  + No log rotation enabled.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```
  + Log rotation is enabled. You can use both a rotated file and a current file (one without the date stamp).

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/STDOUT or STDERR.gz
    ```

    The following format index is an incrementing number.

    ```
    s3://${logUri}/${POD NAME}/OPERATOR or WEBHOOK/stdout_YYYYMMDD_index.gz
    ```
+ `cloudWatchMonitoringConfiguration` – the configuration key to set up forwarding to CloudWatch.
  + `logGroupName` (required) – name of the CloudWatch log group that you want to send logs to. The group automatically gets created if it doesn't exist.
  + `logStreamNamePrefix` (optional) – name of the log stream that you want to send logs into. The default value is an empty string. The format in CloudWatch is as follows:

    ```
    ${logStreamNamePrefix}/${POD NAME}/STDOUT or STDERR
    ```
+ `sideCarResources` (optional) – the configuration key to set resource limits on the launched Fluentbit sidecar container.
  + `memoryLimit` (optional) – the memory limit. Adjust according to your needs. The default is 512Mi.
  + `cpuLimit` – the CPU limit. Adjust according to your needs. No default value.
+ `containerLogRotationConfiguration` (optional): – controls the container log rotation behavior. It is enabled by default.
  + `rotationSize` (required) – specifies file size for the log rotation. The range of possible values is from 2KB to 2GB. The numeric unit portion of the rotationSize parameter is passed as an integer. Since decimal values aren't supported, you can specify a rotation size of 1.5GB, for example, with the value 1500MB. The default is 2GB.
  + `maxFilesToKeep` (required) – specifies the maximum number of files to retain in container after rotation has taken place. The minimum value is 1, and the maximum value is 50. The default is 10.