自定义 MSK 快速代理配置(读/写访问) - Amazon Managed Streaming for Apache Kafka
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

自定义 MSK 快速代理配置(读/写访问)

可使用 Amazon MSK 的更新配置功能或使用 Apache Kafka 的 AlterConfig API,更新读/写代理配置。Apache Kafka 的代理配置是静态的,也可以是动态的。静态配置需要重新启动代理才能应用配置,而动态配置不需要重启代理。有关配置属性和更新模式的更多信息,请参阅更新代理配置

MSK 快速代理的静态配置

您可以使用 Amazon MSK 来创建自定义 MSK 配置文件,以设置以下静态属性。Amazon MSK 可设置并管理其他所有未设置的属性。可以从 MSK 控制台或使用配置命令创建并更新静态配置文件。

属性 描述 默认值

allow.everyone.if.no.acl.found

如果要将此属性设置为 false,请务必先为集群定义 Apache Kafka ACL。如果将此属性设置为 false,且没有先定义 Apache Kafka ACL,则将失去对集群的访问权限。如果发生这种情况,您可以再次更新配置并将此属性设置为 true,以重新获得对集群的访问权限。

true

auto.create.topics.enable

在服务器上启用主题的自动创建。

false

compression.type

指定给定主题的最终压缩类型。此配置接受以下标准压缩编解码器:gzip、snappy、lz4、zstd。

该配置还接受 uncompressedproducer,前者相当于没有压缩,后者意味着保留由创建器设置的原始压缩编解码器。

Apache Kafka 默认值

connections.max.idle.ms

空闲连接超时(以毫秒为单位)。如果连接的空闲时间超过您为此属性设置的值,服务器套接字处理器线程会关闭这些连接。

Apache Kafka 默认值

delete.topic.enable

启用删除主题操作。如果禁用此设置,则无法通过管理工具删除主题。

Apache Kafka 默认值

group.initial.rebalance.delay.ms

在组协调器执行第一次重新平衡之前,组协调器等待更多数据使用器加入新组的时间。更长的延迟时间意味着重新平衡可能会更少,但这会增加处理开始之前的时间。

Apache Kafka 默认值

group.max.session.timeout.ms

注册使用器的最长会话超时时间。超时时间越长,可供使用器用来处理检测信号之间的消息的时间就越多,但这会导致需要花更多时间来检测故障。

Apache Kafka 默认值

leader.imbalance.per.broker.percentage

各代理允许的领导节点不平衡比率。如果各代理超过了此值,则控制器将触发领导平衡操作。此值以百分比的形式指定。

Apache Kafka 默认值

log.cleanup.policy 超出保留时段的分段的默认清除策略。有效策略的逗号分隔列表。有效策略为 deletecompact。对于启用了分层存储的集群,有效策略为仅 delete Apache Kafka 默认值
log.message.timestamp.after.max.ms

消息时间戳和代理时间戳之间允许的时间戳差。消息时间戳可晚于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。

如果 log.message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 log.message.timestamp.type=LogAppendTime,此配置将被忽略。

86400000(24 * 60 * 60 * 1000 毫秒,即 1 天)
log.message.timestamp.before.max.ms

代理时间戳和消息时间戳之间允许的时间戳差。消息时间戳可早于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。

如果 log.message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 log.message.timestamp.type=LogAppendTime,此配置将被忽略。

86400000(24 * 60 * 60 * 1000 毫秒,即 1 天)
log.message.timestamp.type 指定消息中的时间戳是消息创建时间还是日志追加时间。允许的值是 CreateTimeLogAppendTime Apache Kafka 默认值
log.retention.bytes 删除日志前的最大日志大小。 Apache Kafka 默认值
log.retention.ms 日志文件删除之前保留的毫秒数。 Apache Kafka 默认值
max.connections.per.ip 每个 IP 地址经许可的最大连接数。如果使用 max.connections.per.ip.overrides 属性配置了覆盖,则可以将其设置为 0。如果达到限制,来自该 IP 地址的新连接将被丢弃。 Apache Kafka 默认值

max.incremental.fetch.session.cache.slots

维护的增量提取会话的最大数量。

Apache Kafka 默认值

message.max.bytes

Kafka 允许的最大记录批处理大小。如果增加此值,并且存在大于 0.10.2 的使用器,则使用器的提取大小也必须增加,以便它们能够提取如此大的记录批处理。

最新的消息格式版本总是将消息分组到批处理中来提高效率。以前的消息格式版本不会将未压缩的记录分组到批处理中,在此情况下,此限制仅适用于单条记录。可使用主题级别 max.message.bytes 配置为每个主题设置此值。

Apache Kafka 默认值

num.partitions

每个主题的默认分区数。

1

offsets.retention.minutes

当一个使用器组丢失其所有使用器(即变空)后,其偏移量将在此保留期内保留,然后被丢弃。对于独立使用器(即,使用手动分配的使用器),偏移量会在最后一次提交时间加上此保留期后过期。

Apache Kafka 默认值

replica.fetch.max.bytes

尝试为每个分区提取的消息的字节数。这不是绝对最大值。如果提取的第一个非空分区中的第一个记录批处理大于此值,则将返回该记录批处理以确保取得进展。message.max.bytes(代理配置)或 max.message.bytes(主题配置)定义代理接受的最大记录批处理大小。

Apache Kafka 默认值

replica.selector.class

实现 ReplicaSelector 的完全限定类名称。代理使用此值来查找首选读取副本。如果您希望允许使用者从最近的副本提取,请将此属性设置为 org.apache.kafka.common.replica.RackAwareReplicaSelector

Apache Kafka 默认值

socket.receive.buffer.bytes

套接字服务器套接字的 SO_RCVBUF 缓冲区。如果值为 -1,则使用操作系统默认值。

102400

socket.request.max.bytes

套接字请求中的最大字节数。

104857600

socket.send.buffer.bytes

套接字服务器套接字的 SO_SNDBUF 缓冲区。如果值为 -1,则使用操作系统默认值。

102400

transaction.max.timeout.ms

事务的最大超时时间。如果客户端请求的事务时间超过此值,则代理会在 InitProducerIdRequest 中返回错误。这可防止客户端的超时时间过长,且此情况可能会导致使用器无法阅读事务中包含的主题。

Apache Kafka 默认值

transactional.id.expiration.ms

事务协调器在其事务 ID 过期之前,等待接收当前事务的任何事务状态更新的时间(以毫秒为单位)。此设置还会影响生成器 ID 的到期时间,因为它会导致生成器 ID 在最后一次使用给定生成器 ID 写入之后过期。如果由于主题的保留设置而删除了生成器 ID 的最后一次写入内容,则生成器 ID 可能会提前过期。此属性的最小值为 1 毫秒。

Apache Kafka 默认值

快速代理的动态配置

可使用 Apache Kafka AlterConfig API 或 Kafka-configs.sh 工具,编辑以下动态配置。Amazon MSK 可设置并管理其他所有未设置的属性。您可以动态设置不要求代理重新启动的集群级别和代理级别的配置属性。

属性 描述 默认值

advertised.listeners

发布以供客户端使用的侦听器(如果与 listeners 配置属性不同)。在 IaaS 环境中,这可能需要与代理绑定的接口不同。如果未指定,则将使用侦听器的值。与侦听器不同,传播 0.0.0.0 元地址是无效的。

此外与 listeners 不同的是,此属性中可能存在重复的端口,因此可配置一个侦听器来传播另一个侦听器的地址。在使用外部负载均衡器的某些情况下,这可能很有用。

此属性是根据每个代理设置的。

null

compression.type

给定主题的最终压缩类型。可以将此属性设置为标准压缩编解码器(gzipsnappylz4zstd)。它还接受 uncompressed。此值等同于不压缩。如果将该值设置为 producer,则意味着保留生成器设置的原始压缩编解码器。

Apache Kafka 默认值
log.cleaner.delete.retention.ms 为日志压缩主题保留删除逻辑标记的时间。此设置还规定了使用者从偏移量 0 开始时必须完成读取的时间限制,以确保获得最后阶段的有效快照。否则,删除逻辑可能会在完成扫描之前进行收集。 86400000(24 * 60 * 60 * 1000 毫秒,即 1 天),Apache Kafka 默认值
log.cleaner.min.compaction.lag.ms 消息在日志中保持未压缩状态的最短时间。此设置仅适用于当前压缩的日志。 0,Apache Kafka 默认值
log.cleaner.max.compaction.lag.ms 消息在日志中一直不符合压缩条件的最长时间。此设置仅适用于当前压缩的日志。该配置将限制在 [7 天,Long.Max] 范围内。 9223372036854775807,Apache Kafka 默认值

log.cleanup.policy

超出保留时段的分段的默认清除策略。有效策略的逗号分隔列表。有效策略为 deletecompact。对于启用了分层存储的集群,有效策略为仅 delete

Apache Kafka 默认值

log.message.timestamp.after.max.ms

消息时间戳和代理时间戳之间允许的时间戳差。消息时间戳可晚于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。如果 log.message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 log.message.timestamp.type=LogAppendTime,此配置将被忽略。

86400000(24 * 60 * 60 * 1000 毫秒,即 1 天)

log.message.timestamp.before.max.ms

代理时间戳和消息时间戳之间允许的时间戳差。消息时间戳可早于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。如果 log.message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 log.message.timestamp.type=LogAppendTime,此配置将被忽略。

86400000(24 * 60 * 60 * 1000 毫秒,即 1 天)

log.message.timestamp.type

指定消息中的时间戳是消息创建时间还是日志追加时间。允许的值是 CreateTimeLogAppendTime

Apache Kafka 默认值

log.retention.bytes

删除日志前的最大日志大小。

Apache Kafka 默认值

log.retention.ms

日志文件删除之前保留的毫秒数。

Apache Kafka 默认值

max.connection.creation.rate

代理在任何时间经许可的最大连接创建速率。

Apache Kafka 默认值

max.connections

代理在任何时间经许可的最大连接数。此限制叠加在使用 max.connections.per.ip 配置的每个 IP 限制之上。

Apache Kafka 默认值

max.connections.per.ip

每个 IP 地址经许可的最大连接数。如果使用 max.connections.per.ip.overrides 属性配置了覆盖,则可以将其设置为 0。如果达到限制,来自该 IP 地址的新连接将被丢弃。

Apache Kafka 默认值

max.connections.per.ip.overrides

用逗号分隔的列表,支持按 IP 地址或主机名覆盖默认最大连接数。示例值为 hostName:100,127.0.0.1:200

Apache Kafka 默认值

message.max.bytes

Kafka 允许的最大记录批处理大小。如果增加此值,并且存在大于 0.10.2 的使用器,则使用器的提取大小也必须增加,以便它们能够提取如此大的记录批处理。最新的消息格式版本总是将消息分组到批处理中来提高效率。以前的消息格式版本不会将未压缩的记录分组到批处理中,在此情况下,此限制仅适用于单条记录。可使用主题级别 max.message.bytes 配置为每个主题设置此值。

Apache Kafka 默认值

producer.id.expiration.ms

主题分区领导者在生成器 ID 到期之前的等待时间(单位为毫秒)。当与生成器 ID 关联的事务仍在进行时,生成器 ID 将不会过期。请注意,如果由于主题的保留设置而删除了生成器 ID 的最后一次写入内容,则生成器 ID 可能会提前过期。此值设置为等于或大于 delivery.timeout.ms 有助于防止在重试期间过期并避免消息重复,但是对于大多数用例来说,默认值应该是合理的设置。

Apache Kafka 默认值

快速代理上的主题级别配置

您可以使用 Apache Kafka 命令为新主题和现有主题设置或修改主题级别的配置属性。如果无法提供任何主题级别的配置,Amazon MSK 会使用代理默认值。与代理级别的配置一样,Amazon MSK 可保护某些主题级别的配置属性免受更改。例如复制因子 min.insync.replicasunclean.leader.election.enable。如果尝试创建复制因子值不为 3 的主题,Amazon MSK 将默认创建复制因子为 3 的主题。有关主题级别的配置属性以及如何设置这些属性之示例的更多信息,请参阅 Apache Kafka 文档中的 Topic-Level Configs

属性 描述

cleanup.policy

此配置指定要在日志段上使用的保留策略。“删除”策略(默认设置)将在达到保留时间或大小限制时丢弃旧分段。“压缩”策略将启用日志压缩,保留每个键的最新值。也可以在逗号分隔的列表中指定这两个策略(例如,“删除、压缩”)。在这种情况下,将根据保留时间和大小配置丢弃旧分段,而对保留的分段进行压缩。分区中的数据达到 256 MB 后将触发快速代理上的压缩。

compression.type

指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(gzipsnappylz4zstd)。此外,它还接受 uncompressedproducer,前者相当于没有压缩,后者意味着保留由创建器设置的原始压缩编解码器。

delete.retention.ms

为日志压缩主题保留删除逻辑标记的时间。此设置还规定了使用者从偏移量 0 开始时必须完成读取的时间限制,以确保获得最后阶段的有效快照。否则,删除逻辑可能会在完成扫描之前进行收集。

此设置的默认值为 86400000(24 * 60 * 60 * 1000 毫秒,即 1 天),Apache Kafka 默认值

max.message.bytes

Kafka 允许的最大记录批处理大小(启用压缩时的压缩后值)。如果增加此数量,并且存在大于 0.10.2 的使用器,则使用器的提取大小也必须增加,以便它们能够提取如此大的记录批处理。在最新的消息格式版本中,总是将记录分组到批处理中来提高效率。在以前的消息格式版本中,未压缩的记录不会分组到批处理中,在此情况下,此限制仅适用于单条记录。可使用主题级别 max.message.bytes config,根据各个主题进行设置。

message.timestamp.after.max.ms

此配置设置了消息时间戳和代理时间戳之间允许的时间戳差。消息时间戳可晚于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。如果 message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 message.timestamp.type=LogAppendTime,此配置将被忽略。

message.timestamp.before.max.ms

此配置设置了代理时间戳和消息时间戳之间允许的时间戳差。消息时间戳可早于代理时间戳,也可以为同一时间,允许的最大差值取决于此配置中设置的值。如果 message.timestamp.type=CreateTime,则当时间戳差超过此指定阈值时,消息将被拒绝。如果 message.timestamp.type=LogAppendTime,此配置将被忽略。

message.timestamp.type

定义消息中的时间戳是消息创建时间还是日志追加时间。该值应该是 CreateTimeLogAppendTime

min.compaction.lag.ms

消息在日志中保持未压缩状态的最短时间。此设置仅适用于当前压缩的日志。

此设置的默认值为 0,Apache Kafka 默认值

max.compaction.lag.ms

消息在日志中一直不符合压缩条件的最长时间。此设置仅适用于当前压缩的日志。该配置将限制在 [7 天,Long.Max] 范围内。

此设置的默认值为 9223372036854775807,Apache Kafka 默认值。

retention.bytes

如果使用“删除”保留策略,此配置可控制分区(由日志段组成)在丢弃旧日志段释放空间之前可以增长到的最大大小。默认没有大小限制,只有时间限制。由于此限制是在分区级别强制执行的,因此将其乘以分区数即可计算出主题保留容量(以字节为单位)。此外,retention.bytes configuration 独立于 segment.mssegment.bytes 配置运行。此外,如果将 retention.bytes 配置为零,则会触发新分段的滚动出现。

retention.ms

如果使用“删除”保留策略,此配置可控制在丢弃旧日志段释放空间之前保留日志的最长时间。这代表使用者必须在多长时间内读取其数据的 SLA。如果设置为 -1,则不应用时间限制。此外,retention.ms 配置独立于 segment.mssegment.bytes 配置运行。此外,如果满足 retention.ms 条件,还会触发新分段的滚动出现。