与 集成Amazon Glue架构注册表 - Amazon连接词
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

与 集成Amazon Glue架构注册表

这些部分介绍了与Amazon Glue:Schema 注册表。

使用案例:将架构注册表连接到亚马逊 MSK 或 Apache Kafka

假设您正在向 Apache Kafka 主题写入数据,您可以按照以下步骤开始操作。

  1. 创建具有至少一个主题的亚马逊 MSK 或 Apache Kafka 集群。如果创建 Amazon MSK 集群,您可以使用 AWS 管理控制台。按照以下说明操作:https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html.

  2. 按照安装 SerDe 库步骤。

  3. 要创建架构注册表、架构或架构版本,请按照架构注册表入门部分。

  4. 启动您的生产者和消费者使用架构注册表从亚马逊 MSK 或 Apache Kafka 主题写入和读取记录。示例创建器和消费器代码可以在中找到。自述文件从塞尔德图书馆。生产器上的架构注册表库将自动序列化记录,并使用模式版本 ID 装饰记录。

  5. 如果已输入此记录的架构,或者如果启用了自动注册,则该架构将已在模式注册表中注册。

  6. 消费者从亚马逊 MSK 或阿帕奇卡夫卡主题阅读,使用Amazon Glue模式注册表库,将自动从模式注册表中查找架构。

使用案例:将 Amazon Kinesis Data Streams 与Amazon Glue架构注册表

此集成要求您拥有现有的 Amazon Kinesis 数据流。有关更多信息,请参阅 。Amazon Kinesis Data Streams 入门

您可以通过两种方式与 Kinesis 数据流中的数据进行交互。

  • 通过 Java 中的 Kinesis Producer Library (KPL) 和 Kinesis 客户端库 (KCL) 库。不提供多语言支持。

  • 通过PutRecordsPutRecord, 和GetRecordsAWS Java 开发工具包中提供的 Kinesis Data Streams API。

如果您当前使用 KPL/KCL 库,我们建议您继续使用该方法。有更新的 KCL 和 KPL 版本集成了架构注册表,如示例所示。否则,您可以使用示例代码利用开发创建器Amazon Glue架构注册表(如果直接使用 KDS API)。

架构注册表集成仅适用于 KPL v0.14.2 或更高版本以及 KCL v2.3 或更高版本。

使用KPL/KCL 库与数据交互

本节介绍使用 KPL/KCL 库将 Kinesis 数据流与架构注册表集成。有关使用 KPL/KCL 的更多信息,请参阅使用 Amazon Kinesis 创建器库开发创建器

在 KPL 中设置架构注册表

  1. 定义数据、数据格式和模式名称的架构定义Amazon Glue:Schema 注册表。

  2. (可选)配置GlueSchemaRegistryConfiguration对象。

  3. 将架构对象传递到addUserRecord API

    private static final String SCHEMA_DEFINITION = "{"namespace": "example.avro",\n" + " "type": "record",\n" + " "name": "User",\n" + " "fields": [\n" + " {"name": "name", "type": "string"},\n" + " {"name": "favorite_number", "type": ["int", "null"]},\n" + " {"name": "favorite_color", "type": ["string", "null"]}\n" + " ]\n" + "}"; KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion("us-west-1") //[Optional] configuration for Schema Registry. GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration("us-west-1"); schemaRegistryConfig.setCompression(true); config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig); ///Optional configuration ends. final KinesisProducer producer = new KinesisProducer(config); final ByteBuffer data = getDataToSend(); com.amazonaws.services.schemaregistry.common.Schema gsrSchema = new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema"); ListenableFuture<UserRecordResult> f = producer.addUserRecord( config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema); private static ByteBuffer getDataToSend() { org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION); GenericRecord user = new GenericData.Record(avroSchema); user.put("name", "Emily"); user.put("favorite_number", 32); user.put("favorite_color", "green"); ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null); new GenericDatumWriter<>(avroSchema).write(user, encoder); encoder.flush(); return ByteBuffer.wrap(outBytes.toByteArray()); }

设置 Kinesis 客户端库

您将在 Java 中开发您的 Kinesis 客户端库使用器。有关更多信息,请参阅 。在 Java 中开发 Kinesis 客户端库用户

  1. 创建的实例GlueSchemaRegistryDeserializer通过传递GlueSchemaRegistryConfiguration对象。

  2. 传递GlueSchemaRegistryDeserializerretrievalConfig.glueSchemaRegistryDeserializer

  3. 访问传入消息的模式,方法是调用kinesisClientRecord.getSchema()

    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(this.region.toString()); GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)); retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig ); public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records() .forEach( r -> log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema())); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting."); Runtime.getRuntime().halt(1); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } private GenericRecord recordToAvroObj(KinesisClientRecord r) { byte[] data = new byte[r.data().remaining()]; r.data().get(data, 0, data.length); org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition()); DatumReader datumReader = new GenericDatumReader<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null); return (GenericRecord) datumReader.read(null, binaryDecoder); }

使用 Kinesis 数据流 API 与数据进行交互

本节介绍使用 Kinesis Data Streams API 将 Kinesis Data Streams 与模式注册表集成。

  1. 更新这些 Maven 依赖关系:

    <dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.11.884</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> </dependency> <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-cbor</artifactId> <version>2.11.3</version> </dependency> </dependencies>
  2. 在生产器中,使用PutRecords或者PutRecordAPI 在 Kinesis Data Streams。

    //The following lines add a Schema Header to the record com.amazonaws.services.schemaregistry.common.Schema awsSchema = new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(), schemaName); GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer = new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(getConfigs())); byte[] recordWithSchemaHeader = glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes);
  3. 在生产者中,使用PutRecords或者PutRecordAPI 将记录放入数据流。

  4. 在使用器中,从标头中删除架构记录,然后序列化 Avro 架构记录。

    //The following lines remove Schema Header from record GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), getConfigs()); byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()]; recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length); com.amazonaws.services.schemaregistry.common.Schema awsSchema = glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes); byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes); //The following lines serialize an AVRO schema record if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) { Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition()); Object genericRecord = convertBytesToRecord(avroSchema, record); System.out.println(genericRecord); }

使用 Kinesis 数据流 API 与数据进行交互

以下是使用PutRecordsGetRecordsAPI。

//Full sample code import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerImpl; import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializerImpl; import com.amazonaws.services.schemaregistry.utils.AVROUtils; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.glue.model.DataFormat; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class PutAndGetExampleWithEncodedData { static final String regionName = "us-east-2"; static final String streamName = "testStream1"; static final String schemaName = "User-Topic"; static final String AVRO_USER_SCHEMA_FILE = "src/main/resources/user.avsc"; KinesisApi kinesisApi = new KinesisApi(); void runSampleForPutRecord() throws IOException { Object testRecord = getTestRecord(); byte[] recordAsBytes = convertRecordToBytes(testRecord); String schemaDefinition = AVROUtils.getInstance().getSchemaDefinition(testRecord); //The following lines add a Schema Header to a record com.amazonaws.services.schemaregistry.common.Schema awsSchema = new com.amazonaws.services.schemaregistry.common.Schema(schemaDefinition, DataFormat.AVRO.name(), schemaName); GlueSchemaRegistrySerializerImpl glueSchemaRegistrySerializer = new GlueSchemaRegistrySerializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName)); byte[] recordWithSchemaHeader = glueSchemaRegistrySerializer.encode(streamName, awsSchema, recordAsBytes); //Use PutRecords api to pass a list of records kinesisApi.putRecords(Collections.singletonList(recordWithSchemaHeader), streamName, regionName); //OR //Use PutRecord api to pass single record //kinesisApi.putRecord(recordWithSchemaHeader, streamName, regionName); } byte[] runSampleForGetRecord() throws IOException { ByteBuffer recordWithSchemaHeader = kinesisApi.getRecords(streamName, regionName); //The following lines remove the schema registry header GlueSchemaRegistryDeserializerImpl glueSchemaRegistryDeserializer = new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), new GlueSchemaRegistryConfiguration(regionName)); byte[] recordWithSchemaHeaderBytes = new byte[recordWithSchemaHeader.remaining()]; recordWithSchemaHeader.get(recordWithSchemaHeaderBytes, 0, recordWithSchemaHeaderBytes.length); com.amazonaws.services.schemaregistry.common.Schema awsSchema = glueSchemaRegistryDeserializer.getSchema(recordWithSchemaHeaderBytes); byte[] record = glueSchemaRegistryDeserializer.getData(recordWithSchemaHeaderBytes); //The following lines serialize an AVRO schema record if (DataFormat.AVRO.name().equals(awsSchema.getDataFormat())) { Schema avroSchema = new org.apache.avro.Schema.Parser().parse(awsSchema.getSchemaDefinition()); Object genericRecord = convertBytesToRecord(avroSchema, record); System.out.println(genericRecord); } return record; } private byte[] convertRecordToBytes(final Object record) throws IOException { ByteArrayOutputStream recordAsBytes = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().directBinaryEncoder(recordAsBytes, null); GenericDatumWriter datumWriter = new GenericDatumWriter<>(AVROUtils.getInstance().getSchema(record)); datumWriter.write(record, encoder); encoder.flush(); return recordAsBytes.toByteArray(); } private GenericRecord convertBytesToRecord(Schema avroSchema, byte[] record) throws IOException { final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema); Decoder decoder = DecoderFactory.get().binaryDecoder(record, null); GenericRecord genericRecord = datumReader.read(null, decoder); return genericRecord; } private Map<String, String> getMetadata() { Map<String, String> metadata = new HashMap<>(); metadata.put("event-source-1", "topic1"); metadata.put("event-source-2", "topic2"); metadata.put("event-source-3", "topic3"); metadata.put("event-source-4", "topic4"); metadata.put("event-source-5", "topic5"); return metadata; } private GlueSchemaRegistryConfiguration getConfigs() { GlueSchemaRegistryConfiguration configs = new GlueSchemaRegistryConfiguration(regionName); configs.setSchemaName(schemaName); configs.setAutoRegistration(true); configs.setMetadata(getMetadata()); return configs; } private Object getTestRecord() throws IOException { GenericRecord genericRecord; Schema.Parser parser = new Schema.Parser(); Schema avroSchema = parser.parse(new File(AVRO_USER_SCHEMA_FILE)); genericRecord = new GenericData.Record(avroSchema); genericRecord.put("name", "testName"); genericRecord.put("favorite_number", 99); genericRecord.put("favorite_color", "red"); return genericRecord; } }

Apache Flink 是一个流行的开源框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Apache Flink 的 Amazon Kinesis Data Analytics 是一项完全托管的 AWS 服务,可让您轻松构建和管理 Apache Flink 应用程序以处理流式传输数据。

开源 Apache Flink 提供了大量的源和汇。例如,预定义的数据源包括从文件、目录和套接字读取,以及从集合和迭代器中提取数据。Apache Flink 数据流连接器为 Apache Flink 提供代码,以便与各种第三方系统(如 Apache Kafka 或 Kinesis 作为源和/或汇)进行接口。

有关更多信息,请参阅 。Amazon Kinesis Data Analytics 开发人员指南

Apache Flink 卡夫卡连接器

Apache Flink 提供了一个 Apache Kafka 数据流连接器,用于从卡夫卡主题读取数据并将数据写入数据,并提供一次保证。弗林克的卡夫卡消费者FlinkKafkaConsumer,提供读取一个或多个 Kafka 主题的权限。阿帕奇·弗林克的卡夫卡制片人FlinkKafkaProducer,允许将记录流写入一个或多个 Kafka 主题。有关更多信息,请参阅 。Apache Kafka 接头

Apache Flink Kinesis Streams 连接器

Kinesis 数据流连接器可让您访问 Amazon Kinesis 数据流。这些区域有:FlinkKinesisConsumer是一个精确一次的并行流数据源,它订阅同一 AWS 服务区域内的多个 Kinesis 流,并且可以在作业运行期间透明地处理流的重新分片。使用者的每个子任务负责从多个 Kinesis 分片中获取数据记录。随着分片关闭并由 Kinesis 创建,每个子任务获取的分片数量将发生变化。这些区域有:FlinkKinesisProducer使用 Kinesis 生成器库 (KPL) 将来自 Apache Flink 流的数据放入 Kinesis 流中。有关更多信息,请参阅 。Amazon Kinesis Streams 连接器

有关更多信息,请参阅 。Amazon Glue架构 GitHub 存储库

随模式注册表提供的 SerDes 库与 Apache Flink 集成。要使用 Apache Flink,您需要实现SerializationSchemaDeserializationSchema接口称为GlueSchemaRegistryAvroSerializationSchemaGlueSchemaRegistryAvroDeserializationSchema,您可以将其插入 Apache Flink 连接器。

添加Amazon Glue架构注册表依赖关系到 Apache Flink 应用程序

将集成依赖关系设置为Amazon GlueApache Flink 应用程序中的架构注册表:

  1. 将依赖项添加到pom.xml文件。

    <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-flink-serde</artifactId> <version>1.0.0</version> </dependency>

将卡夫卡或 MSK 与阿帕奇弗林克集成

您可以将 Kinesis Data Analytics 用于 Apache Flink,以卡夫卡作为源或卡夫卡作为接收器。

Kafka 作为源

下图显示了将 Kafka 数据流与 Apache Flink 的 Kinesis Data Analytics 集成在一起,并将 Kinesis 数据流作为源。


			Kafka 作为源。

Kafka 作为水槽

下图显示了将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成在一起,并将 Kafka 作为一个汇。


			Kafka 作为水槽。

要将卡夫卡(或 MSK)与阿帕奇弗林克的 Kinesis Data Analytics 集成,将卡夫卡作为源或卡夫卡作为接收器,请在下面进行代码更改。将粗体代码块添加到类似部分中相应的代码中。

如果 Kafka 是源代码,则使用解序列化程序代码(块 2)。如果 Kafka 是接收器,请使用序列化程序代码(块 3)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); // block 1 Map<String, Object> configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>( topic, // block 2 GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs), properties); FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>( topic, // block 3 GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs), properties); DataStream<GenericRecord> stream = env.addSource(consumer); stream.addSink(producer); env.execute();

将 Kinesis Data Streams 与 Apache Fink 集成

您可以将运动数据流用作源或汇的 Apache Flink 的 Kinesis Data Analytics。

作为源的 Kinesis Data Streams

下图显示了将 Kinesis Data Streams 与 Apache Flink 的动能数据分析集成,并将 Kinesis Data Streams 作为源。


			作为源的 Kinesis Data Streams。

作为汇的 Kinesis Data Streams

下图显示了将 Kinesis Data Streams 与 Apache Flink 的动能数据分析集成,并将 Kinesis Data Streams 作为汇。


			作为汇的 Kinesis Data Streams。

要将 Kinesis Data Streams 与 Apache Flink 的 Kinesis Data Analytics 集成,并将 Kinesis Data Streams 作为源或 Kinesis 数据流作为汇,请在下面进行代码更改。将粗体代码块添加到类似部分中相应的代码中。

如果 Kinesis Data Streams 是源,请使用反序列化程序代码(块 2)。如果 Kinesis Data Streams 是接收器,请使用序列化程序代码(块 3)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String streamName = "stream"; Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); // block 1 Map<String, Object> configs = new HashMap<>(); configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); FlinkKinesisConsumer<GenericRecord> consumer = new FlinkKinesisConsumer<>( streamName, // block 2 GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs), properties); FlinkKinesisProducer<GenericRecord> producer = new FlinkKinesisProducer<>( // block 3 GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs), properties); producer.setDefaultStream(streamName); producer.setDefaultPartition("0"); DataStream<GenericRecord> stream = env.addSource(consumer); stream.addSink(producer); env.execute();

使用案例:与 AWS Lambda 集成

要将 AWS Lambda 函数用作 Apache Kafka/亚马逊MSK 消费者,并使用 AWS Glue 架构注册表反序列化 AVRO 编码的消息,请访问MSK 实验页

使用案例:Amazon Glue Data Catalog

Amazon Glue表支持您可以手动指定的模式,也可以通过引用Amazon Glue:Schema 注册表。模式注册表与数据目录集成,以允许您在创建或更新时选择性地使用存储在模式注册表中的架构Amazon Glue数据目录中的表或分区。要标识架构注册表中的架构定义,至少需要知道它所属的架构的 ARN。架构的架构版本(包含架构定义)可以通过其 UUID 或版本号引用。总有一个模式版本,即 “最新” 版本,可以在不知道其版本号或 UUID 的情况下查找。

当调用CreateTable或者UpdateTable操作时,您将传递一个TableInput结构,其中包含StorageDescriptor,它可能有一个SchemaReference添加到架构注册表中的现有架构。同样,当您调用GetTable或者GetPartitionAPI 时,响应可能包含模式和SchemaReference。使用模式引用创建表或分区时,数据目录将尝试为此架构引用提取架构。如果无法在模式注册表中找到架构,它会在GetTable响应;否则响应将具有模式引用和模式引用。

您还可以从Amazon Glue控制台。

要执行这些操作并创建、更新或查看架构信息,您必须为调用者 IAM 用户授予GetSchemaVersionAPI。

添加表或更新表的架构

从现有架构添加新表会将表绑定到特定架构版本。注册新架构版本后,您可以从Amazon Glue控制台或使用UpdateTable 操作 (Python: update_table)API。

从现有架构添加表

您可以创建Amazon Glue表从注册表中的模式版本中使用Amazon Glue控制台或CreateTableAPI。

Amazon Glue API

当调用CreateTableAPI,您将传递一个TableInput,其中包含StorageDescriptor它有一个SchemaReference添加到架构注册表中的现有架构。

Amazon Glue 控制台

若要从Amazon Glue控制台:

  1. 登录 Amazon Web 服务管理控制台,然后打开 Amazon Web Services 管理控制台。Amazon Glue控制台https://console.aws.amazon.com/glue/

  2. 在导航窗格中的数据目录中,选择

  3. 添加表菜单中,选择从现有架构添加表

  4. 配置表属性和数据存储,按Amazon Glue开发人员指南 的第一个版本。

  5. 选择 Glue 架构页面上,选择注册表模式所在的位置。

  6. 选择架构名称,然后选择版本要应用的架构。

  7. 查看架构预览,然后选择下一步

  8. 审核和创建表。

应用于表的架构和版本将显示在Glue 架构列中的列表。您可以查看表格以查看更多详细信息。

更新表的方案

当新架构版本变为可用时,您可能需要使用UpdateTable 操作 (Python: update_table)API 或Amazon Glue控制台。

重要

更新现有表的架构时,该表具有Amazon Glue模式时,模式注册表中引用的新模式可能不兼容。这可能会导致您的作业失败。

Amazon Glue API

当调用UpdateTableAPI,您将传递一个TableInput,其中包含StorageDescriptor它有一个SchemaReference添加到架构注册表中的现有架构。

Amazon Glue 控制台

要更新表的架构,请从Amazon Glue控制台:

  1. 登录 Amazon Web 服务管理控制台,然后打开 Amazon Web Services 管理控制台。Amazon Glue控制台https://console.aws.amazon.com/glue/

  2. 在导航窗格中的数据目录中,选择

  3. 从表的列表中查看表。

  4. 单击更新架构在通知您有关新版本的框中。

  5. 查看当前架构和新架构之间的差异。

  6. 选择显示所有架构差异以查看更多信息。

  7. 选择Save As以接受新版本。

使用案例:Apache Kafka 溪

阿帕奇卡夫卡流 API 是一个客户端库,用于处理和分析存储在阿帕奇卡夫卡中的数据。本节介绍了阿帕奇卡夫卡流与Amazon Glue架构注册表,它允许您在数据流应用程序上管理和强制实施架构。有关 Apache Kafka 流的更多信息,请参阅Apache Kafka 溪

与 SerDes 库集成

有一个AWSKafkaAvroSerDe类,您可以使用它配置 Streams 应用程序。

Kafka 流应用程序示例代码

使用Amazon GlueApache Kafka 流应用程序中的架构注册表:

  1. 配置卡夫卡流应用程序。

    final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avro-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AWSKafkaAvroSerDe.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1"); props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
  2. 从主题 avro-输入创建流。

    StreamsBuilder builder = new StreamsBuilder(); final KStream<String, GenericRecord> source = builder.stream("avro-input");
  3. 处理数据记录(该示例过滤掉那些其值为粉色或金额值为 15 的记录)。

    final KStream<String, GenericRecord> result = source .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color")))); .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
  4. 将结果写回主题 avro-输出。

    result.to("avro-output");
  5. 启动阿帕奇卡夫卡流应用程序。

    KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();

实现结果

这些结果显示了在步骤 3 中筛选出来的记录的过滤过程,作为 “粉红色” 或值为 “15.0” 的收藏夹颜色。

筛选前的记录:

{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"} {"name": "Harry", "favorite_number": 10, "favorite_color": "black"} {"name": "Hermione", "favorite_number": 1, "favorite_color": "red"} {"name": "Ron", "favorite_number": 0, "favorite_color": "pink"} {"name": "Jay", "favorite_number": 0, "favorite_color": "pink"} {"id": "commute_1","amount": 3.5} {"id": "grocery_1","amount": 25.5} {"id": "entertainment_1","amount": 19.2} {"id": "entertainment_2","amount": 105} {"id": "commute_1","amount": 15}

筛选后的记录:

{"name": "Sansa", "favorite_number": 99, "favorite_color": "white"} {"name": "Harry", "favorite_number": 10, "favorite_color": "black"} {"name": "Hermione", "favorite_number": 1, "favorite_color": "red"} {"name": "Ron", "favorite_number": 0, "favorite_color": "pink"} {"id": "commute_1","amount": 3.5} {"id": "grocery_1","amount": 25.5} {"id": "entertainment_1","amount": 19.2} {"id": "entertainment_2","amount": 105}

使用案例:Apache Kafka Connect

阿帕奇卡夫卡 Connect 与Amazon Glue架构注册表使您能够从连接器获取架构信息。阿帕奇卡夫卡转换器指定了阿帕奇卡夫卡中的数据格式以及如何将其转换为阿帕奇卡夫卡 Connect 数据。每个 Apache Kafka Connect 用户都需要根据他们希望的数据从 Apache Kafka 加载或存储到 Apache Kafka 中时的格式来配置这些转换器。通过这种方式,您可以定义自己的转换器,将 Apache Kafka Connect 数据转换为Amazon Glue模式注册表(例如:Avro),并使用我们的序列化器注册其模式并执行序列化。然后转换器还能够使用我们的反序列化器来反序列化从 Apache Kafka 接收的数据,并将其转换回 Apache Kafka Connect 数据。下面给出了一个示例工作流图。


			阿帕奇卡夫卡 Connect 工作流程.
  1. 安装aws-glue-schema-registry项目,方法是克隆Github 存储库Amazon Glue架构注册表

    git clone git@github.com:awslabs/aws-glue-schema-registry.git cd aws-glue-schema-registry mvn clean install mvn dependency:copy-dependencies
  2. 如果您计划使用阿帕奇卡夫卡 Connect独立模式,更新连接独立。属性使用以下说明执行此步骤。如果您计划使用阿帕奇卡夫卡 Connect分发模式,更新连接 Avro-分布式。属性使用相同的说明。

    1. 也将这些属性添加到 Apache Kafka 连接属性文件中:

      key.converter.region=us-east-2 value.converter.region=us-east-2 key.converter.schemaAutoRegistrationEnabled=true value.converter.schemaAutoRegistrationEnabled=true key.converter.avroRecordType=GENERIC_RECORD value.converter.avroRecordType=GENERIC_RECORD
    2. 将下面的命令添加到Launch mode (启动模式)位置kafka-run-class.sh

      -cp $CLASSPATH:"<your AWS Glue Schema Registry base directory>/target/dependency/*"
  3. 将下面的命令添加到Launch mode (启动模式)位置kafka-run-class.sh

    -cp $CLASSPATH:"<your AWS Glue Schema Registry base directory>/target/dependency/*"

    它应如下所示:

    # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" fi
  4. 如果使用 bash,请运行以下命令以在 bash_配置文件中设置您的CLASSPATH。对于任何其他 shell,请相应地更新环境。

    echo 'export GSR_LIB_BASE_DIR=<>' >>~/.bash_profile echo 'export GSR_LIB_VERSION=1.0.0' >>~/.bash_profile echo 'export KAFKA_HOME=<your Apache Kafka installation directory>' >>~/.bash_profile echo 'export CLASSPATH=$CLASSPATH:$GSR_LIB_BASE_DIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/common/target/schema-registry-common-$GSR_LIB_VERSION.jar:$GSR_LIB_BASE_DIR/avro-serializer-deserializer/target/schema-registry-serde-$GSR_LIB_VERSION.jar' >>~/.bash_profile source ~/.bash_profile
  5. (可选)如果要使用简单的文件源进行测试,请克隆文件源连接器。

    git clone https://github.com/mmolimar/kafka-connect-fs.git cd kafka-connect-fs/
    1. 在源连接器配置下,将数据格式编辑为 Avro,文件读取器AvroFileReader并从您正在读取的文件路径中更新示例 Avro 对象。例如:

      vim config/kafka-connect-fs.properties
      fs.uris=<path to a sample avro object> policy.regexp=^.*\.avro$ file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
    2. 安装源连接器。

      mvn clean package echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile source ~/.bash_profile
    3. 更新下面的汇属性<your Apache Kafka installation directory>/config/connect-file-sink.properties更新主题名称和输出文件名。

      file=<output file full path> topics=<my topic>
  6. 启动源连接器(在本示例中,它是一个文件源连接器)。

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
  7. 运行接收器连接器(在本示例中,它是文件接收器连接器)。

    $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties

从第三方架构注册表迁移到Amazon Glue架构注册表

从第三方架构注册表迁移到Amazon Glue架构注册表依赖于现有的当前第三方架构注册表。如果 Apache Kafka 主题中存在使用第三方模式注册表发送的记录,则使用者需要第三方模式注册表来反序列化这些记录。这些区域有:AWSKafkaAvroDeserializer提供了指定指向第三方反序列化器的辅助反序列化器类的功能,并用于反序列化这些记录。

第三方架构的报废有两个条件。首先,只有在使用第三方架构注册表的 Apache Kafka 主题中的记录不再需要任何使用者,也不再需要任何使用者。其次,退休可以通过将 Apache Kafka 主题老化而发生,具体取决于为这些主题指定的保留期限。请注意,如果您的主题具有无限保留期,您仍然可以迁移到Amazon Glue架构注册表,但您将无法停用第三方模式注册表。作为一种解决方法,您可以使用应用程序或 Mirror Maker 2 从当前主题中读取并使用 AWS Glue 架构注册表生成新主题。

要从第三方架构注册表迁移到Amazon Glue:Schema

  1. 创建注册表Amazon Glue模式注册表,或使用默认注册表。

  2. 停止使用者。修改它以包含Amazon Glue模式注册表作为主反序列化程序,第三方模式注册表作为辅助。

    • 设置使用者属性。在此示例中,次级解序器被设置为不同的反序列化器。行为如下所示:消费者从 Amazon MSK 检索记录,并首先尝试使用AWSKafkaAvroDeserializer。如果无法读取包含Amazon Glue架构注册表架构,AWSKafkaAvroDeserializer然后尝试使用辅助解序列化器中提供的反序列化器类。还需要在使用者属性中提供特定于辅助反序列化器的属性,例如模式注册表配置和特定的文件,如下所示。

      consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName()); consumerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, KafkaClickstreamConsumer.gsrRegion); consumerProps.setProperty(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, KafkaAvroDeserializer.class.getName()); consumerProps.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "URL for third-party schema registry"); consumerProps.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
  3. 重新启动使用器。

  4. 停止生产者并将生产者指向Amazon Glue:Schema 注册表。

    1. 设置生产者属性。在此示例中,创建者将使用默认注册表和自动注册模式版本。

      producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName()); producerProps.setProperty(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); producerProps.setProperty(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); producerProps.setProperty(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
  5. (可选)手动将现有架构和架构版本从当前第三方架构注册表移动到Amazon Glue模式注册表中的默认注册表Amazon Glue模式注册表或特定的非默认注册表Amazon Glue:Schema 注册表。这可以通过以 JSON 格式从第三方模式注册表导出架构并在Amazon Glue使用 AWS 控制台或 AWS CLI 的架构注册器。

    如果您需要使用 AWS CLI 和 AWS 控制台为新创建的架构版本启用与先前架构版本的兼容性检查,或者当生产者发送带有新架构且启用模式版本自动注册的消息时,此步骤可能非常重要。

  6. 启动创建者

适用于架构注册表的 AWS CloudFormation 模板示例

以下是用于在 AWS CloudFormation 中创建架构注册表资源的示例模板。要在您的账户中创建此堆栈,请将上面的模板复制到文件SampleTemplate.yaml,然后运行以下命令:

aws cloudformation create-stack --stack-name ABCSchemaRegistryStack --template-body "'cat SampleTemplate.yaml'"

此示例使用AWS::Glue::Registry创建注册表,AWS::Glue::Schema创建架构AWS::Glue::SchemaVersion创建架构版本,AWS::Glue::SchemaVersionMetadata填充架构版本元数据。

Description: "A sample CloudFormation template for creating Schema Registry resources." Resources: ABCRegistry: Type: "AWS::Glue::Registry" Properties: Name: "ABCSchemaRegistry" Description: "ABC Corp. Schema Registry" Tags: - Key: "Project" Value: "Foo" ABCSchema: Type: "AWS::Glue::Schema" Properties: Registry: Arn: !Ref ABCRegistry Name: "TestSchema" Compatibility: "NONE" DataFormat: "AVRO" SchemaDefinition: > {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]} Tags: - Key: "Project" Value: "Foo" SecondSchemaVersion: Type: "AWS::Glue::SchemaVersion" Properties: Schema: SchemaArn: !Ref ABCSchema SchemaDefinition: > {"namespace":"foo.avro","type":"record","name":"user","fields":[{"name":"status","type":"string", "default":"ON"}, {"name":"name","type":"string"},{"name":"favorite_number","type":"int"}]} FirstSchemaVersionMetadata: Type: "AWS::Glue::SchemaVersionMetadata" Properties: SchemaVersionId: !GetAtt ABCSchema.InitialSchemaVersionId Key: "Application" Value: "Kinesis" SecondSchemaVersionMetadata: Type: "AWS::Glue::SchemaVersionMetadata" Properties: SchemaVersionId: !Ref SecondSchemaVersion Key: "Application" Value: "Kinesis"