在适用于 Apache Flink 的亚马逊托管服务中使用自定义指标 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在适用于 Apache Flink 的亚马逊托管服务中使用自定义指标

适用于 Apache Flink 的托管服务公开了 19 个指标 CloudWatch,包括资源使用量和吞吐量指标。此外,您可以创建自己的指标来跟踪应用程序特定的数据,例如处理事件或访问外部资源。

工作方式

Managed Service for Apache Flink 中的自定义指标使用 Apache Flink 指标系统。Apache Flink 指标具有以下属性:

  • 类型:指标的类型描述了它如何衡量和报告数据。可用的 Apache Flink 指标类型包括计数、计量表、直方图和计量器。有关 Apache Flink 指标类型的更多信息,请参阅指标类型

    注意

    Amazon CloudWatch 指标不支持 Histogram Apache Flink 指标类型。 CloudWatch 只能显示计数、仪表和仪表类型的 Apache Flink 指标。

  • 范围:指标的范围由其标识符和一组键值对组成,这些键值对表示将如何报告该指标。 CloudWatch指标的标识符由以下内容组成:

    有关指标范围的更多信息,请参阅范围

有关 Apache Flink 指标的更多信息,请参阅 Apache Flink 文档中的指标

要在 Managed Service for Apache Flink 中创建自定义指标,您可以从任何通过调用 GetMetricGroup 扩展 RichFunction 的用户函数访问 Apache Flink 指标系统。此方法返回一个可用于创建和注册自定义指标的MetricGroup对象。适用于 Apache 的托管服务 Flink 报告使用组密钥KinesisAnalytics创建的所有指标。 CloudWatch您定义的自定义指标具有以下特征:

  • 您的自定义指标具有指标名称和组名称。这些名称必须由字母数字字符组成。

  • 您在用户范围(KinesisAnalytics指标组除外)中定义的属性将作为 CloudWatch 维度发布。

  • 默认情况下,自定义指标是在该Application级别发布的。

  • 维度(任务/运算符/并行度)将根据应用程序的监控级别添加到指标中。您可以使用操作的参数或CreateApplication操作的或MonitoringConfiguration参数来设置应用程序的UpdateApplication监控级别。MonitoringConfigurationUpdate

查看创建映射类的示例

以下代码示例演示如何创建用于创建和增量自定义指标的映射类,以及如何通过将映射类添加到DataStream对象来在应用程序中实现该映射类。

记录计数自定义指标

以下代码示例演示如何创建映射类,该映射类用于创建对数据流中的记录进行计数的指标(功能与numRecordsIn指标相同):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

在前面的示例中,应用程序处理的每条记录的valueToExpose变量都会递增。

定义映射类后,您将创建一个实现映射的应用程序内部流:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

有关此应用程序的完整代码,请参阅记录计数自定义指标应用程序

字数自定义指标

以下代码示例演示如何创建映射类,该映射类用于创建对数据流中的字数进行计数的指标:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

在前面的示例中,应用程序处理的每个字的counter变量都会递增。

定义映射类后,您将创建一个实现映射的应用程序内部流:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

有关此应用程序的完整代码,请参阅字数计数自定义指标应用程序

查看自定义指标

应用程序的自定义指标显示在控制AWS/KinesisAnalytics面板的 CloudWatch Metrics 控制台的应用程序指标组下。