与 AWS Glue 架构注册表集成 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

与 AWS Glue 架构注册表集成

这些部分介绍了与 AWS Glue 架构注册表的集成。

使用案例:将架构注册表连接到 Amazon MSK 或 Apache Kafka

假设您正在将数据写入 Apache Kafka 主题,并且您可以按照以下步骤来开始使用。

  1. 创建包含至少一个主题的 Amazon MSK 或 Apache Kafka 集群。如果创建 Amazon MSK 集群,您可以使用 AWS 管理控制台。按照以下说明操作:https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html.

  2. 按照上面的 安装 SerDe 库 步骤进行操作。

  3. 要创建架构注册表、架构或架构版本,请按照本文档的架构注册表入门部分中的说明进行操作。

  4. 启动创建者和使用者以使用架构注册表在 Amazon MSK 或 Apache Kafka 主题中写入和读取记录。示例创建者和使用者代码可在 Serde 库的 文件ReadMe中找到。创建者上的架构注册表库将自动序列化记录,并使用架构版本 ID 对该记录进行修饰。

  5. 如果输入了此记录的架构,或者启用了自动注册,则架构将在架构注册表中注册。

  6. 使用 AWS Glue 架构注册表库从 Amazon MSK 或 Apache Kafka 主题中读取数据的使用者将自动从架构注册表中查找架构。

使用案例:将 Amazon Kinesis Data Streams 与 AWS Glue 架构注册表集成

此集成要求您有一个现有的 Amazon Kinesis 数据流。有关更多信息,请参阅 Amazon Kinesis Data Streams 入门

有两种方法可以与 Kinesis 数据流中的数据交互。

  • 在 Java 中通过 Kinesis 创建器库 (KPL) 和 Kinesis 客户端库 (KCL) 库。未提供多语言支持。

  • 通过 AWS Java 开发工具包中提供的 PutRecordsPutRecordGetRecords Kinesis Data StreamsAPIs。

如果您当前使用 KPL/KCL 库,我们建议继续使用该方法。更新了集成了架构注册表的 KCL 和 KPL 版本,如示例所示。否则,如果直接使用 KDS AWS Glue,您可以使用示例代码利用 APIs 架构注册表。

架构注册表集成仅适用于 KPL v0.14.2 或更高版本以及 KCL v2.3 或更高版本。

使用 KPL/KCL 库与数据交互

本节介绍如何使用 KPL/KCL 库将 Kinesis Data Streams 与架构注册表集成。有关使用 KPL/KCL 的更多信息,请参阅使用 Amazon Kinesis 创建者库开发创建者

在 KPL 中设置架构注册表

  1. 定义在 AWS Glue 架构注册表中编写的数据、数据格式和架构名称的架构定义。

  2. (可选)配置 GlueSchemaRegistryConfiguration 对象。

  3. 将架构对象传递到 addUserRecord API

    private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n" + " "type": "record",\n" + " "name": "User",\n" + " "fields": [\n" + " {"name": "name", "type": "string"},\n" + " {"name": "favorite_number", "type": ["int", "null"]},\n" + " {"name": "favorite_color", "type": ["string", "null"]}\n" + " ]\n" + "}"; KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion("us-west-1") //[Optional] configuration for Schema Registry. GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration("us-west-1"); schemaRegistryConfig.setCompression(true); config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig); ///Optional configuration ends. final KinesisProducer producer = new KinesisProducer(config); final ByteBuffer data = getDataToSend(); com.amazonaws.services.schemaregistry.common.Schema gsrSchema = new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema"); ListenableFuture<UserRecordResult> f = producer.addUserRecord( config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema); private static ByteBuffer getDataToSend() { org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION); GenericRecord user = new GenericData.Record(avroSchema); user.put("name", "Emily"); user.put("favorite_number", 32); user.put("favorite_color", "green"); ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null); new GenericDatumWriter<>(avroSchema).write(user, encoder); encoder.flush(); return ByteBuffer.wrap(outBytes.toByteArray()); }

设置 Kinesis 客户端库

您将在 Java 中开发 Kinesis 客户端库使用者。有关更多信息,请参阅在 Java 中开发 Kinesis 客户端库使用者

  1. 通过传递 GlueSchemaRegistryDeserializer 对象来创建 GlueSchemaRegistryConfiguration 的实例。

  2. GlueSchemaRegistryDeserializer 传递到 retrievalConfig.glueSchemaRegistryDeserializer

  3. 通过调用 kinesisClientRecord.getSchema() 访问传入消息的架构。

    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(this.region.toString()); GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)); retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig ); public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records() .forEach( r -> log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema())); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting."); Runtime.getRuntime().halt(1); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } private GenericRecord recordToAvroObj(KinesisClientRecord r) { byte[] data = new byte[r.data().remaining()]; r.data().get(data, 0, data.length); org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition()); DatumReader datumReader = new GenericDatumReader<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null); return (GenericRecord) datumReader.read(null, binaryDecoder); }

使用 Kinesis Data Streams 与数据交互APIs

本节介绍如何使用 Kinesis Data Streams APIs 将 Kinesis Data Streams 与架构注册表集成。

  1. 更新以下 Maven 依赖项:

    <dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.11.884</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> </dependency> <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-cbor</artifactId> <version>2.11.3</version> </dependency> </dependencies>
  2. 在创建者中,使用 Kinesis Data Streams 中的 PutRecordsPutRecord API 添加架构标头信息。

    //The following lines add a Schema Header to the record com.amazonaws.services.schemaregistry.common.Schema awsSchema = new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(), schemaName); GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer = new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs())); byte[] recordWithSchemaHeader = glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
  3. 在创建者中,使用 PutRecordsPutRecord API 将记录放入数据流。

  4. 在使用者中,从标头中删除架构记录,然后序列化 Avro 架构记录。

    //The following lines remove Schema Header from record GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs()); byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()]; recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length); com.amazonaws.services.schemaregistry.common.Schema awsSchema = glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes); byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes); //The following lines serialize an AVRO schema record if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) { Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition()); Object genericRecord = convertBytesToRecord(avroSchema, record); System.out.println(genericRecord); }

使用 Kinesis Data Streams 与数据交互APIs

以下是使用 PutRecordsGetRecords APIs 的示例代码。

//Full sample code import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl; import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl; import com.amazonaws.services.schemaregistry.utils.AVROUtils; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.glue.model.DataFormat; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class PutAndGetExampleWithEncodedData { static final String regionName = "us-east-2"; static final String streamName = "testStream1"; static final String schemaName = "User-Topic"; static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc"; KinesisApi kinesisApi = new KinesisApi(); void runSampleForPutRecord() throws IOException { Object testRecord = getTestRecord(); byte[] recordAsBytes = convertRecordToBytes(testRecord); String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord); //The following lines add a Schema Header to a record com.amazonaws.services.schemaregistry.common.Schema awsSchema = new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(), schemaName); GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer = new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName)); byte[] recordWithSchemaHeader = glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes); //Use PutRecords api to pass a list of records kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName); //OR //Use PutRecord api to pass single record //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName); } byte[] runSampleForGetRecord() throws IOException { ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName); //The following lines remove the schema registry header GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName)); byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()]; recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length); com.amazonaws.services.schemaregistry.common.Schema awsSchema = glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes); byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes); //The following lines serialize an AVRO schema record if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) { Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition()); Object genericRecord = convertBytesToRecord(avroSchema, record); System.out.println(genericRecord); } return record; } private byte[] convertRecordToBytes(final Object record) throws IOException { ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null); GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record)); datumWriter.write(record, encoder); encoder.flush(); return recordAsBytes.toByteArray(); } private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException { final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema); Decoder decoder = DecoderFactory.get().binaryDecoder(record, null); GenericRecord genericRecord = datumReader.read(null, decoder); return genericRecord; } private Map<String, String> getMetadata() { Map<String, String> metadata = new HashMap<>(); metadata.put("event-source-1", "topic1"); metadata.put("event-source-2", "topic2"); metadata.put("event-source-3", "topic3"); metadata.put("event-source-4", "topic4"); metadata.put("event-source-5", "topic5"); return metadata; } private GlueSchemaRegistryConfiguration getConfigs() { GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName); configs.setSchemaName(schemaName); configs.setAutoRegistration(true); configs.setMetadata(getMetadata()); return configs; } private Object getTestRecord() throws IOException { GenericRecord genericRecord; Schema.Parser parser = new Schema.Parser(); Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE)); genericRecord = new GenericData.Record(avroSchema); genericRecord.put("name", "testName"); genericRecord.put("favorite_number", 99); genericRecord.put("favorite_color", "red"); return genericRecord; } }

Apache Flink 是一种流行的开源框架和分布式处理引擎,用于对未绑定和绑定的数据流进行有状态计算。Amazon Kinesis Data Analytics for Apache Flink 是一项完全托管的 AWS 服务,可让您构建和管理 Apache Flink 应用程序以处理流数据。

开源 Apache Flink 提供了大量源和接收器。例如,预定义数据源包括从文件、目录和套接字中读取数据,以及从集合和迭代器中提取数据。Apache Flink DataStream 连接器为 Apache Flink 提供代码,以便与各种第三方系统(如 Apache Kafka 或 Kinesis 作为源和/或接收器)进行交互。

有关更多信息,请参阅 Amazon Kinesis Data Analytics 开发人员指南

Apache Flink Kafka 连接器

Apache Flink 提供了一个 Apache Kafka 数据流连接器,用于通过“恰好一次”的保证在 Kafka 主题中读取数据以及将数据写入 Kafka 主题。Flink 的 Kafka 使用者 FlinkKafkaConsumer 提供从一个或多个 Kafka 主题读取的访问权限。Apache Flink 的 Kafka 创建器 FlinkKafkaProducer 允许将记录流写入到一个或多个 Kafka 主题。有关更多信息,请参阅 Apache Kafka 连接器

Apache Flink Kinesis Streams 连接器

Kinesis 数据流连接器提供对 Amazon Kinesis Data Streams 的访问。是一个恰好一次的并行流数据源,可订阅同一 AWS 服务区域内的多个 Kinesis 流,并且可以透明地处理作业正在运行时的流的重新分片。FlinkKinesisConsumer使用者的每个子任务负责从多个 Kinesis 分片中提取数据记录。当 Kinesis 关闭并创建分片时,每个子任务提取的分片数将发生变化。使用 Kinesis 创建器库 (KPL) 将数据从 Apache Flink 流放入 Kinesis 流。FlinkKinesisProducer有关更多信息,请参阅 Amazon Kinesis Streams 连接器

有关更多信息,请参阅 架构 Github 存储库AWS Glue。

随架构注册表提供的 SerDes 库与 Apache Flink 集成。要使用 Apache Flink,您需要实施名为 SerializationSchemaDeserializationSchema 接口,您可以将其插入 Apache Flink 连接器中。GlueSchemaRegistryAvroSerializationSchemaGlueSchemaRegistryAvroDeserializationSchema

将 AWS Glue 架构注册表依赖项添加到 Apache Flink 应用程序中

要在 Apache Flink 应用程序中设置对 AWS Glue 架构注册表的集成依赖项,请执行以下操作:

  1. 将依赖项添加到 pom.xml 文件。

    <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-flink-serde</artifactId> <version>1.0.0</version> </dependency>

将 Kafka 或 MSK 与 Apache Flink 集成

您可以使用适用于 Apache Flink 的 Kinesis Data Analytics,使用 Kafka 作为源或 Kafka 作为接收器。

将 Kafka 作为源

下图显示了将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成,使用 Kafka 作为源。


			将 Kafka 作为源。

将 Kafka 作为接收器

下图显示了将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kafka 作为接收器。


			将 Kafka 作为接收器。

要将 Kafka (或 MSK) 与 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kafka 作为源或 Kafka 作为接收器,请进行以下代码更改。在类似部分中,将加粗的代码块添加到相应的代码。

如果 Kafka 是源,则使用反串行化器代码 (块 2)。如果 Kafka 是接收器,请使用串行器代码 (块 3)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); // block 1 Map<String, Object> configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>( topic, // block 2 GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs), properties); FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>( topic, // block 3 GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs), properties); DataStream<GenericRecord> stream = env.addSource(consumer); stream.addSink(producer); env.execute();

将 Kinesis Data Streams 与 Apache Flink 集成

您可以将 Kinesis Data Analytics for Apache Flink 与 Kinesis Data Streams 结合使用作为源或接收器。

将 Kinesis Data Streams 作为源

下图显示了将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kinesis Data Streams 作为源。


			将 Kinesis Data Streams 作为源。

将 Kinesis Data Streams 作为接收器

下图显示了将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kinesis Data Streams 作为接收器。


			将 Kinesis Data Streams 作为接收器。

要将 Kinesis Data Streams 与适用于 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kinesis Data Streams 作为源,或者将 Kinesis Data Streams 作为接收器,请进行以下代码更改。在类似部分中,将加粗的代码块添加到相应的代码。

如果 Kinesis Data Streams 是源,请使用反串行化器代码 (块 2)。如果 Kinesis Data Streams 是接收器,请使用串行器代码(块 3)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String streamName = "stream"; Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); // block 1 Map<String, Object> configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>( streamName, // block 2 GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs), properties); FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>( // block 3 GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs), properties); producer.setDefaultStream(streamName); producer.setDefaultPartition("0"); DataStream<GenericRecord> stream = env.addSource(consumer); stream.addSink(producer); env.execute();

使用案例:与 AWS Lambda 集成

要将 AWS Lambda 函数用作 Apache Kafka/Amazon MSK 使用者并使用 AWS Glue 架构注册表反序列化 Avro 编码的消息,请访问 MSK Labs 页面

使用案例:AWS Glue 数据目录

AWS Glue 表支持您可以手动指定或通过引用 AWS Glue 架构注册表指定的架构。此架构注册表与数据目录集成,允许您在 AWS Glue 中创建或更新 Data Catalog 表或分区时选择性地使用存储在架构注册表中的架构。要在架构注册表中标识架构定义,您至少需要知道其所属的架构的 ARN。架构的架构版本(包含架构定义)可以按其 UUID 或版本号进行引用。始终有一个架构版本,即“最新”版本,可在不知道其版本号或 UUID 的情况下查找。

在调用 CreateTableUpdateTable 操作时,您将传递一个包含 TableInputStorageDescriptor 结构,该结构可能具有指向架构注册表中的现有架构的 SchemaReference。同样,当您调用 GetTableGetPartition APIs 时,响应可能包含架构和 SchemaReference。 当使用架构引用创建表或分区时,数据目录将尝试为该架构引用提取架构。如果无法在架构注册表中找到架构,它会在 GetTable 响应中返回空架构;否则,响应将同时具有架构和架构引用。

您还可以从 AWS Glue 控制台中执行操作。

要执行这些操作并创建、更新或查看架构信息,您必须为调用方 IAM 用户提供针对 GetSchemaVersion API 的权限。

为表添加表或更新架构

从现有架构添加新表会将表绑定到特定的架构版本。在注册新架构版本后,您可以从 AWS Glue 控制台中的“View table (查看表)”页面或使用 UpdateTable 操作 (Python: update_table) API 更新此表定义。

从现有架构添加表

您可以使用 AWS Glue 控制台或 AWS Glue API 从注册表中的架构版本创建 CreateTable 表。

AWS Glue API

调用 CreateTable API 时,您将传递一个包含 TableInputStorageDescriptor,该 具有到架构注册表中的现有架构。SchemaReference

AWS Glue 控制台

要从 AWS Glue 控制台创建表,请执行以下操作:

  1. 登录 AWS 管理控制台,并通过以下网址打开 AWS Glue 控制台:https://console.aws.amazon.com/glue/.

  2. 在导航窗格中的 Data catalog (数据目录) 下,选择 Tables (表)

  3. Add Tables 菜单中,选择 Add table from existing schema

  4. 根据 AWS Glue 开发人员指南配置表属性和数据存储。

  5. Choose a Glue schema (选择 Glue 架构) 页面中,选择架构所在的 Registry (注册表)

  6. 选择 Schema name (架构名称),然后选择要应用的架构的 Version (版本)

  7. 查看架构预览,然后选择 Next (下一步)

  8. 查看并创建表。

应用于表的架构和版本显示在表列表的 Glue schema 列中。您可以查看表以了解更多详细信息。

更新表的架构

当新架构版本变得可用时,您可能需要使用 UpdateTable 操作 (Python: update_table) API 或 AWS Glue 控制台更新表的架构。

重要

在更新手动指定了 AWS Glue 架构的现有表的架构时,架构注册表中引用的新架构可能不兼容。这可能会导致您的作业失败。

AWS Glue API

调用 UpdateTable API 时,您将传递一个包含 TableInputStorageDescriptor,而该 SchemaReference 具有到架构注册表中的现有架构。

AWS Glue 控制台

要从 AWS Glue 控制台更新表的架构,请执行以下操作:

  1. 登录 AWS 管理控制台,并通过以下网址打开 AWS Glue 控制台:https://console.aws.amazon.com/glue/.

  2. 在导航窗格中的 Data catalog (数据目录) 下,选择 Tables (表)

  3. 从表的列表中查看表。

  4. 单击框中的 Update schema (更新架构),以通知您有关新版本的信息。

  5. 查看当前架构和新架构之间的差异。

  6. 选择 Show all schema differences (显示所有架构差异) 以查看更多详细信息。

  7. 选择 Save table (保存表) 以接受新版本。

使用案例:Apache Kafka 流

Apache Kafka Streams API 是一个客户端库,用于处理和分析存储在 Apache Kafka 中的数据。本节介绍 Apache Kafka Streams 与 AWS Glue 架构注册表的集成,这使您可以在数据流应用程序上管理和实施架构。有关 Apache Kafka Streams 的更多信息,请参阅 Apache Kafka Streams

将 SerDes 与 库集成

有一个可用于配置 Streams 应用程序的 AWSKafkaAvroSerDe 类。

Kafka Streams 应用程序示例代码

要在 Apache Kafka Streams 应用程序中使用 AWS Glue 架构注册表,请执行以下操作:

  1. 配置 Kafka Streams 应用程序。

    final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
  2. 从主题 avro-input 创建流。

    StreamsBuilder builder = new StreamsBuilder(); final KStream<String, GenericRecord> source = builder.stream("avro-input");
  3. 处理数据记录(该示例筛选出其 favorite_color 值为粉色或数量值为 15 的记录)。

    final KStream<String, GenericRecord> result = source .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color")))); .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
  4. 将结果写回到主题 avro-output。

    result.to("avro-output");
  5. 启动 Apache Kafka Streams 应用程序。

    KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();

实施结果

这些结果显示在步骤 3 中作为“pink”的收藏颜色或值“15.0”筛选掉的记录的筛选过程。

筛选前的记录:

{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"} {"name": "Harry", "favorite_number": 10, "favorite_color": "black"} {"name": "Hermione", "favorite_number": 1, "favorite_color": "red"} {"name": "Ron", "favorite_number": 0, "favorite_color": "pink"} {"name": "Jay", "favorite_number": 0, "favorite_color": "pink"} {"id": "commute_1","amount": 3.5} {"id": "grocery_1","amount": 25.5} {"id": "entertainment_1","amount": 19.2} {"id": "entertainment_2","amount": 105} {"id": "commute_1","amount": 15}

筛选后的记录:

{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"} {"name": "Harry", "favorite_number": 10, "favorite_color": "black"} {"name": "Hermione", "favorite_number": 1, "favorite_color": "red"} {"name": "Ron", "favorite_number": 0, "favorite_color": "pink"} {"id": "commute_1","amount": 3.5} {"id": "grocery_1","amount": 25.5} {"id": "entertainment_1","amount": 19.2} {"id": "entertainment_2","amount": 105}

使用案例:Apache Kafka 连接

将 Apache Kafka Connect 与 AWS Glue 架构注册表集成,您可以从连接器获取架构信息。Apache Kafka 转换器指定 Apache Kafka 中的数据格式以及如何将其转换为 Apache Kafka Connect 数据。当每个 Apache Kafka Connect 用户从 Apache Kafka 加载或存储数据时,他们希望其采用的格式来配置这些转换器。通过这种方法,您可以定义自己的转换器,将 Apache Kafka Connect 数据转换为 AWS Glue 架构注册表中使用的类型(例如:Avro)并使用我们的序列化程序注册其架构并执行序列化。然后,转换器还能够使用我们的反串行化器对从 Apache Kafka 接收的数据进行反串行化,并将其转换回 Apache Kafka Connect 数据。下面给出了一个示例流程图。


			Apache Kafka Connect 工作流程。
  1. 通过克隆 aws-glue-schema-registry 架构注册表的 Github 存储库AWS Glue来安装 项目。

    git clone git@github.com:awslabs/aws-glue-schema-registry.git cd aws-glue-schema-registry mvn clean install mvn dependency:copy-dependencies
  2. 如果您计划在 Standalone 模式下使用 Apache Kafka Connect,请按照此步骤的以下说明更新 connect-standalone.properties。如果您计划在 Distributed 模式下使用 Apache Kafka Connect,请按照相同的说明更新 connect-avro-distributed.properties

    1. 将这些属性也添加到 Apache Kafka 连接属性文件中:

      key.converter.region=us-east-2 value.converter.region=us-east-2 key.converter.schemaAutoRegistrationEnabled=true value.converter.schemaAutoRegistrationEnabled=true key.converter.avroRecordType=GENERIC_RECORD value.converter.avroRecordType=GENERIC_RECORD
    2. 将以下命令添加到 kafka-run-class.sh 下的 Launch mode (启动模式) 部分:

      -cp $CLASSPATH:"<your AWS Glue Schema Registry base directory>/target/dependency/*"
  3. 将以下命令添加到 kafka-run-class.sh 下的 Launch mode (启动模式) 部分

    -cp $CLASSPATH:"<your AWS Glue Schema Registry base directory>/target/dependency/*"

    它应如下所示:

    # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" fi
  4. 如果使用 bash,请运行以下命令以在 bash_profile 中设置 CLASSPATH。对于任何其他 Shell,请相应地更新环境。

    echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile source ~/.bash_profile
  5. (可选)如果要使用简单的文件源进行测试,请克隆文件源连接器。

    git clone https://github.com/mmolimar/kafka-connect-fs.git cd kafka-connect-fs/
    1. 在源连接器配置下,将数据格式编辑为 Avro,将文件读取器编辑为 AvroFileReader,然后从要读取的文件路径更新示例 Avro 对象。例如:

      vim config/kafka-connect-fs.properties
      fs.uris=<path to a sample avro object> policy.regexp=^.*\.avro$ file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
    2. 安装源连接器。

      mvn clean package echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile source ~/.bash_profile
    3. 更新 <your Apache Kafka installation directory>/config/connect-file-sink.properties 下的接收器属性,更新主题名称和输出文件名。

      file=<output file full path> topics=<my topic>
  6. 启动 Source Connector (在此示例中,它是文件源连接器)。

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
  7. 运行接收器连接器(在本示例中,它是文件接收器连接器)。

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties

从第三方架构注册表迁移到 AWS Glue 架构注册表

从第三方架构注册表到 AWS Glue 架构注册表的迁移依赖于现有的当前第三方架构注册表。如果 Apache Kafka 主题中有使用第三方架构注册表发送的记录,使用者需要第三方架构注册表来反序列化这些记录。提供了指定辅助反串行化器类的功能,该类指向第三方反串行化器并用于反串行化这些记录。AWSKafkaAvroDeserializer

停用第三方 schema 有两个条件。首先,只有当 和任何使用者不再需要使用第三方架构注册表的 Apache Kafka 主题中的记录时,才能进行停用。其次,停用可能是因 Apache Kafka 主题超时而导致的,具体取决于为这些主题指定的保留期。请注意,如果您的主题具有无限保留,您仍然可以迁移到 AWS Glue 架构注册表,但无法停用第三方架构注册表。作为解决方法,您可以使用应用程序或镜像 Mirror Mirror 2 从当前主题中读取内容,并使用 AWS Glue 架构注册表生成到新主题。

要从第三方架构注册表迁移到 AWS Glue 架构注册表,请执行以下操作:

  1. 在 AWS Glue 架构注册表中创建一个注册表或使用默认注册表。

  2. 停止使用者。对其进行修改以包含 AWS Glue 架构注册表作为主要反串行化器,并将第三方架构注册表作为辅助镜像仓库。

    • 设置使用者属性。在此示例中,secondary_deserializer 设置为不同的解串器。行为如下所示:使用者从 Amazon MSK 检索记录并首先尝试使用 AWSKafkaAvroDeserializer。 如果无法读取包含 AWS Glue 架构注册表架构的 Avro 架构 ID 的幻字节,AWSKafkaAvroDeserializer 将尝试使用辅助反串行化器中提供的反串行化器类。还需要在使用者属性(例如 schema_registry_url_config 和 specific_avro_reader_config)中提供特定于辅助解串器的属性,如下所示。

      consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName()); consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion); consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName()); consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry"); consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
  3. 重新启动使用者。

  4. 停止创建器并将创建器指向 AWS Glue 架构注册表。

    1. 设置创建者属性。在此示例中,创建者将使用默认注册表和自动注册架构版本。

      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName()); producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
  5. (可选)手动将现有架构和架构版本从当前第三方架构注册表移动到 AWS Glue 架构注册表,移动到 AWS Glue 架构注册表中的默认注册表或 AWS Glue 架构注册表中的特定非默认注册表。这可以通过使用 AWS 控制台或 AWS CLI,以 JSON 格式从第三方架构注册表导出架构并在 AWS Glue 架构注册表中创建新架构来完成。

    如果您需要使用 AWS CLI 和 AWS 控制台为新创建的架构版本启用与以前的架构版本的兼容性检查,或者当创建者发送带有新架构的消息并打开架构版本的自动注册时,此步骤可能很重要。

  6. 启动创建者。

架构注册表的示例 AWS CloudFormation 模板

以下是用于在 AWS CloudFormation 中创建架构注册表资源的示例模板。 要在您的账户中创建此堆栈,请将上述模板复制到文件 SampleTemplate.yaml,然后运行以下命令:

aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"

此示例使用 AWS::Glue::Registry 创建注册表,AWS::Glue::Schema 创建架构,AWS::Glue::SchemaVersion 创建架构版本,AWS::Glue::SchemaVersionMetadata 填充架构版本元数据。

Description: "A sample CloudFormation template for creating Schema Registry resources." Resources: ABCRegistry: Type: "AWS::Glue::Registry" Properties: Name: "ABCSchemaRegistry" Description: "ABC Corp. Schema Registry" Tags: - Key: "Project" Value: "Foo" ABCSchema: Type: "AWS::Glue::Schema" Properties: Registry: Arn: !Ref ABCRegistry Name: "TestSchema" Compatibility: "NONE" DataFormat: "AVRO" SchemaDefinition: > {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]} Tags: - Key: "Project" Value: "Foo" SecondSchemaVersion: Type: "AWS::Glue::SchemaVersion" Properties: Schema: SchemaArn: !Ref ABCSchema SchemaDefinition: > {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]} FirstSchemaVersionMetadata: Type: "AWS::Glue::SchemaVersionMetadata" Properties: SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId Key: "Application" Value: "Kinesis" SecondSchemaVersionMetadata: Type: "AWS::Glue::SchemaVersionMetadata" Properties: SchemaVersionId: !Ref SecondSchemaVersion Key: "Application" Value: "Kinesis"