安装 SerDe 库 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

安装 SerDe 库

注意

先决条件:完成以下步骤之前,您需要先运行 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或 Apache Kafka 集群。您的创建器和使用器需要在 Java 8 或更高版本上运行。

Serde 库提供用于序列化和反序列化数据的框架。

您将为生成数据的应用程序(统称为“序列化程序”)安装开源序列化程序。序列化程序处理序列化、压缩以及与架构注册表的交互。序列化程序自动从正在写入架构注册表兼容目标(如 Amazon MSK)的记录中提取架构。同样,您将在使用数据的应用程序上安装开源反序列化程序。

在创建器和使用器上安装库:

  1. 在创建器和使用器的 pom.xml 文件中,通过下面的代码添加此依赖关系:

    <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>

    或者,您可以克隆 Amazon Glue 架构注册表

  2. 使用这些必填属性设置您的创建器:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"

    如果没有现有架构,则需要打开自动注册(下一步)。如果您确实有要应用的架构,则将“my-schema”替换为您的架构名称。此外,如果架构自动注册处于关闭状态,则必须提供“registry-name”。如果架构在“default-registry”下创建,则可以省略注册表名称。

  3. (可选)设置这些可选的创建器属性。有关详细属性说明,请参阅自述文件

    props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed

    自动注册会在默认注册表(“default-registry”)下注册架构版本。如果上一步未指定 SCHEMA_NAME,则主题名称被推断为 SCHEMA_NAME

    有关兼容性模式的更多信息,请参阅架构版本控制和兼容性

  4. 使用这些必填属性设置您的使用器:

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Amazon Web Services 区域 props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
  5. (可选)设置这些可选的使用器属性。有关详细属性说明,请参阅自述文件

    properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario