一.引言

kafka是广泛使用的流处理组件,我们知道怎么使用它,也知道它的实现原理。但是更重要的部分是它的设计理念,即kafka设计者当时是如何考量各种方案的,了解这些,对提升我们的设计能力非常有帮助。

二.动机

我们将 Kafka 设计为一个统一平台,来处理大型公司可能拥有的所有实时数据流。 为此,我们必须考虑相当广泛的用例集。

  • 它必须具有高吞吐量,才能支持大容量事件流,例如实时日志聚合。
  • 它需要优雅地处理大量积压数据,以便能够支持离线系统的周期性数据负载。
  • 系统必须保证low-latency delivery,才能处理更传统的消息传递用例。
  • 我们希望支持分区、分布式、实时处理,基于旧的事件流创建新的事件流。 这激发了我们的分区和consumer模型。
  • 最后,在将流送入其他数据系统进行服务的情况下,系统必须能够在出现机器故障时保证容错性。

支持这些用途使我们做出具有许多独特元素的设计,与传统的消息传递系统相比,它更类似于数据库日志。 我们将在以下部分概述设计的一些元素。

三.持久化

不要害怕文件系统!
Kafka 严重依赖文件系统来存储和缓存消息。 人们普遍认为“磁盘很慢”,这让人们怀疑持久化结构能否提供有竞争力的性能。 事实上,磁盘比人们预期的要慢得多,也快得多,这取决于它们的使用方式。 一个设计得当的磁盘结构通常可以和网络一样快。

关于磁盘性能的关键事实是,在过去十年中,硬盘驱动器的吞吐量与磁盘寻道的延迟有所不同。 在6个 7200rpm SATA RAID-5 阵列的 JBOD 配置上,顺序写入的性能约为 600MB/秒,而随机写入的性能仅为约 100k/秒——相差超过 6000 倍。 这些顺序读取和写入是所有使用模式中最可预测的,并且由操作系统进行了大量优化。 现代操作系统提供read-ahead和write-behind技术,以大的数据块预取数据,并将较小的逻辑写入分组为大的物理写入。 可以在这篇 ACM 队列文章ACM Queue article;找到对这个问题的进一步讨论;他们实际上发现顺序磁盘访问在某些情况下比随机内存访问更快!

tips:read-ahead

Linux的文件预读readahead,指Linux系统内核将指定文件的某 区域预读进页缓存起来,便于接下来对该区域进行读取时,不会因缺页(page fault)而阻塞。因为从内存读取比从磁盘读取要快很多。预读可以有效的减少磁盘的寻道次数和应用程序的I/O等待时间,是改进磁盘读I/O性能的重要 优化手段之一。

为了弥补这种性能差异,现代操作系统越来越积极地使用内存进行磁盘缓存。 现代操作系统会倾向于将所有空闲内存用作磁盘缓存,而性能损失很小。 所有的磁盘读写都会经过这个统一的缓存。 如果不使用直接 I/O,则无法轻易关闭此功能。因此即使进程缓存了数据,该数据也可能会在操作系统page cache中复制,将其存储两次。

此外,当构建在 JVM 之上,任何花时间研究 Java 内存使用的人都知道两件事:

  1. 对象的内存开销非常高,通常会使存储的数据空间翻倍(或更糟)。
  2. 随着堆内数据的增加,Java GC变得越来越繁琐和缓慢。

tips: Java对象构成

Java 的对象包括对象头,类指针,数组长度(可选),数据,比如只有一个int类型字段的对象需要96bit的内存。

由于这些因素,使用文件系统和依赖page cache优于在内存中维护缓存或其他结构——至少,可以用作缓存的内存翻倍,并且可能通过存储紧凑的字节结构而不是单个对象再次翻倍。 这样做将导致 32GB 机器上的缓存高达 28-30GB, 并且没有GC问题。 此外,即使服务重新启动,此缓存仍将保持预热状态,而进程内缓存将需要在内存中重建(对于 10GB 的缓存可能需要 10 分钟),否则它将需要从一个完全未加载的缓存开始(这可能意味着糟糕的初始性能)。 这也大大简化了代码,因为维护缓存和文件系统之间一致性的所有逻辑现在都在操作系统中,这往往比one-off 进程内尝试更有效、更正确。 如果您的磁盘使用有利于顺序读取,那么read-ahead实际上是在每次磁盘读取时使用有用的数据预先填充此缓存。

这表明了一种非常简单的设计:与其在内存中保留尽可能多的内容,并在我们用完空间时,恐慌地将其全部刷新到文件系统,不如将其反转。 所有数据都立即写入文件系统上的持久日志中,而不必刷新到磁盘。 实际上这只是意味着它被转移到内核的page cache中。

这种以page cache为中心的设计风格在此处有关 Varnish 设计的文章article 中进行了描述(以及适度的自大)。

 

常数时间性能
消息系统中使用的持久数据结构通常是以每个consumer队列为单位,带有关联的 BTree 或其他通用随机访问数据结构,以维护有关消息的元数据。 BTree 是可用的最通用的数据结构,可以在消息系统中支持各种事务性和非事务性语义。 不过,它们确实带来了相当高的成本:Btree 操作是 O(log N)。 通常 O(log N) 被认为本质上等同于常数时间,但对于磁盘操作而言并非如此。 磁盘寻道的速度是10 毫秒一次,并且每个磁盘一次只能进行一次寻道,因此并行性受到限制。 因此,即使很少的磁盘寻道也会导致非常高的开销。 由于存储系统混合了非常快的缓存操作和非常慢的物理磁盘操作,固定缓存数据增加时,树结构的性能通常是超线性的——即数据加倍会使性能变得慢两倍不止。

直观地说,持久队列可以建立在简单的读取和追加到文件的基础上,就像日志记录解决方案的一样。 这种结构的优点是所有操作都是 O(1) 并且读取不会阻塞彼此的写入。 这具有明显的性能优势,因为性能与数据大小完全分离——一台服务器现在可以充分利用许多廉价、低转速的 1+TB SATA 驱动器。 尽管它们的寻道性能较差,但这些驱动器对于大容量读写具有可接受的性能,而且价格是前者的 1/3,容量是前者的 3 倍。

在没有任何性能损失的情况下访问几乎无限的磁盘空间意味着我们可以提供一些消息系统中不常见的功能。 例如,在 Kafka 中,我们可以将消息保留相对较长的时间(比如一周),而不是尝试在消息被消费后立即删除。 正如我们将要描述的,这为consumer带来了很大的灵活性。

四.效率

我们在效率方面付出了巨大的努力。 我们的主要用例之一是处理网络活动数据,该数据量非常大:每次页面浏览都可能产生数十次写入。 此外,我们假设发布的每条消息都被至少一个consumer(通常是很多)消费,因此我们努力使消费尽可能高效。

我们还发现,根据构建和运行大量类似系统的经验,效率是有效多租户操作的关键。 如果下游基础设施服务很容易因为应用程序使用量的小波动而成为瓶颈,那么这种小的变化通常会产生问题。 必须非常快,才能确保应用程序不会因为基础架构出现问题。 当尝试在集中式集群上运行支持数十或数百个应用程序的集中式服务时,这一点尤为重要,因为使用模式的变化几乎每天都会发生。

 

我们在上一节中讨论了磁盘效率。 一旦消除了不良的磁盘访问模式,在这种类型的系统中有两个常见的低效率原因:太多的小 I/O 操作和过多的字节复制。

小I/O问题既发生在客户端和服务器之间,也发生在服务器自身的持久化操作中。

为避免这种情况,我们的协议是围绕“消息集”抽象构建的,该抽象将消息自然地分组在一起。 这允许网络请求将消息组合在一起并分摊网络往返的开销,而不是一次发送一条消息。 服务器依次将消息块一次性附加到其日志中,而consumer一次获取大的线性块。

这个简单的优化使性能得到巨大的提升。 批处理导致更大的网络数据包、更大的顺序磁盘操作、连续的内存块等,所有这些都允许 Kafka 将突发的随机消息写入流转换为流向consumer的线性写入。

 

另一个低效率是字节复制。 在低消息速率下这不是问题,但在负载下影响很大。 为了避免这种情况,我们采用了一种由producer、broker和consumer共享的标准化二进制消息格式(因此数据块可以在它们之间传输而无需修改)。

broker维护的消息日志本身只是一个文件目录,每个文件由一系列消息集填充,这些消息集以producer和consumer使用的相同格式写入磁盘。 维护这种通用格式可以优化最重要的操作:持久日志块的网络传输。 现代 unix 操作系统提供高度优化的代码路径,用于将数据从页面缓存传输到套接字; 在 Linux 中,这是通过 sendfile 系统调用完成的。

要了解 sendfile 的影响,了解从文件到套接字传输数据的通用数据路径:

  1. 操作系统从磁盘读取数据到内核空间的page cache
  2. 应用程序从内核空间读取数据到用户空间缓冲区
  3. 应用程序将数据写回内核空间的套接字缓冲区
  4. 操作系统将数据从套接字缓冲区复制到 NIC 缓冲区,并在此处通过网络发送

这显然是低效的,有四个副本和两个系统调用。 使用 sendfile,通过允许操作系统将数据从页面缓存直接发送到网络,可以避免这种重新复制。 所以在这个优化路径中,只需要最终拷贝到网卡缓冲区。

我们期望一个共同的用例是一个topic的多个consumer。 使用上面的零拷贝优化,数据只被复制到页面缓存中一次并在每次消费时重复使用,而不是存储在内存中并在每次读取时复制到用户空间。 这允许以接近网络连接限制的速率使用消息。

page cache 和 sendfile 的这种组合意味着在 Kafka 集群上,consumer大部分时间不会落后,你将看不到磁盘上的任何读取活动,因为它们将完全从缓存中提供数据。

TLS/SSL 库在用户空间运行(Kafka 目前不支持内核中的 SSL_sendfile)。 由于此限制,启用 SSL 时不使用 sendfile。 启用S​​SL配置,参考security.protocol和security.inter.broker.protocol

有关 Java 中的 sendfile 和zero copy支持的更多背景信息,请参阅本文article

tips: send file 图解

 

端到端批量压缩
在某些情况下,瓶颈实际上不是 CPU 或磁盘,而是网络带宽。 对于需要通过广域网在数据中心之间发送消息的数据管道来说尤其如此。 当然,用户总是可以在不需要 Kafka 的任何支持的情况下一次压缩一条消息,但这可能会导致非常差的压缩率,因为大部分冗余是由于相同类型的消息之间的重复(例如字段名称在JSON 或 Web 日志中的用户agent或常见字符串值)。 高效压缩需要将多条消息一起压缩,而不是单独压缩每条消息。

Kafka 通过高效的批处理格式支持这一点。 一批消息可以聚集在一起压缩并以这种形式发送到服务器。 这批消息将以压缩形式写入,并在日志中保持压缩状态,只会被consumer解压。

Kafka 支持 GZIP、Snappy、LZ4 和 ZStandard 压缩协议。 有关压缩的更多详细信息,请参见此处here.。

五. producer

负载均衡
producer直接将数据发送到作为分区leader的broker,而无需任何中间路由层。 为了帮助producer做到这一点,所有 Kafka 节点都可以在任何给定时间回答有关哪些服务器处于活动状态以及topic 分区的leader所在的元数据请求,以允许producer适当地定向其请求。

客户端控制将消息发布到哪个分区。 这可以随机完成,实现一种随机负载平衡,也可以通过某种语义分区函数来完成。 我们通过允许用户指定一个键来进行分区并使用它来散列到一个分区,来自定义语义分区的接口(如果需要,还有一个选项可以覆盖分区函数)。 例如,如果选择的键是用户 ID,则给定用户的所有数据都将发送到同一分区。 这反过来将允许consumer对他们的消费做出本地假设。 这种分区方式明确设计为允许在consumer中进行本地敏感处理程序。

 tips: 如果我们想保证订单消费的顺序性,可以将同一用户的订单发送给同一分区,一个分区只会同时被一个consumer消费,并且在consumer中进行单线程执行。

异步发送
批处理是效率的重要驱动因素之一,为了启用批处理,Kafka producer将尝试在内存中累积数据并在单个请求中发送更大的批次。 批处理可以配置为累积不超过固定数量的消息,并且等待时间不超过某个固定的延迟限制(比如 64k 或 10 毫秒)。 这允许积累更多的字节来发送,并且在服务器上执行少量且更大的 I/O 操作。 这种缓冲是可配置的,并提供了一种机制来权衡少量的额外延迟以获得更好的吞吐量。

有关producer的配置 configuration 和 api  api 的详细信息可以在文档的其他地方找到。

tips

批处理的设计思想在很多其他分布式组件中出现过,比如es中的批量插入。

六.consumer

Kafka consumer通过向引导它想要消费的分区的broker发出“获取”请求来工作。 consumer在每个请求中指定其在日志中的offset,并从该位置开始接收回一大块日志。 因此,consumer对该位置有很大的控制权,并且可以在需要时回退它以重新使用数据。

 

推 vs. 拉
我们最初考虑的一个问题是,consumer是应该从broker那里提取数据,还是broker应该将数据push给consumer。 在这方面,Kafka 遵循大多数消息系统共享的更传统的设计,其中数据从producer push到broker,并由consumer从broker pull。 一些以日志为中心的系统,例如 Scribe 和 Apache Flume,遵循一种非常不同的基于push的路径,其中数据被push到下游。 两种方法各有利弊。 然而,基于push的系统难以处理各种consumer,因为broker控制数据传输的速率。 目标通常是让consumer能够以最大可能的速度消费; 不幸的是,在push系统中,这意味着当consumer的消费率低于生产率时,consumer往往会不知所措(本质上是DDoS攻击)。 基于pull的系统具有更好的特性,即consumer落后后,在可能的时候赶上来。 这可以通过某种退避协议来缓解,consumer可以通过该协议表明它不堪重负,但是让传输速率充分利用(但不要过度利用)比看起来更棘手。 以前以这种方式构建系统的尝试使我们采用了更传统的拉模型。

基于pull的系统的另一个优点是它有助于将发送给consumer的数据积极地分批处理。 基于push的系统必须选择立即发送请求或积累更多数据然后在不知道下游consumer是否能够立即处理它的情况下发送它。 如果针对低延迟进行了调整,这将导致一次只发送一条消息,但无论如何传输最终都会被缓冲,这是一种浪费。 基于pull的设计解决了这个问题,因为consumer总是在其在日志中的当前位置(或达到某个可配置的最大大小)之后拉取所有可用消息。 因此,可以在不引入不必要延迟的情况下获得最佳批处理。

pull系统的不足之处在于,如果broker没有数据,consumer可能会进行空转等待数据到达。 为避免这种情况,我们在pull请求中设置了参数,允许consumer请求在“长轮询”中阻塞,等待数据到达(并可选择等待给定字节数可用以确保较大的传输大小)。

您可以想象其他可能的设计,这些设计只会端到端地pull。 producer将在本地写入本地日志,而broker将从中提取数据,而consumer则从中提取数据。 通常会提出一种类似类型的“存储转发”producer。 这很有趣,但我们觉得不太适合我们拥有数千个producer的目标用例。 我们大规模运行持久性数据系统的经验让我们感到,在系统中涉及许多应用程序的数千个磁盘实际上不会使事情变得更可靠,而且操作起来会是一场噩梦。 在实践中,我们发现我们可以大规模运行具有强大 SLA 的管道,而无需producer持久性。

 

consumer位置

令人惊讶的是,跟踪已消费的内容是消息传递系统的关键性能点之一。
大多数消息传递系统都保留有关在broker上使用了哪些消息的元数据。 也就是说,当消息被分发给consumer时,broker要么立即在本地记录该事实,要么等待consumer的确认。 这是一个相当直观的选择,而且对于单机服务器来说确实不清楚这个状态还能怎么设计。 由于许多消息传递系统中用于存储的数据结构扩展性很差,这也是一个务实的选择——因为broker知道消费了什么,它可以立即删除它,从而保持数据量较小。

可能不明显的是,让broker和consumer就已消费的内容达成一致并不是一个微不足道的问题。 如果broker在每次通过网络分发消息时立即将消息记录为已消费,那么如果consumer未能处理该消息(比如因为它崩溃或请求超时或其他原因),该消息将丢失。 为了解决这个问题,很多消息系统都增加了确认功能,即消息在发送时只标记为已发送而不是被消费; broker等待来自consumer的特定确认以将消息记录为已消费。 这种策略解决了丢失消息的问题,但又产生了新的问题。 首先,如果consumer处理消息但在发送确认之前失败,那么消息将被消费两次。 第二个问题是关于性能的,现在broker必须记录关于每条消息的多个状态(首先锁定它以免它被第二次发出,然后将其标记为永久消费以便它可以被删除)。 必须处理棘手的问题,例如如何处理已发送但从未确认的消息。

Kafka 以不同的方式处理这个问题。 我们的topic分为一组完全有序的分区,每个分区在任何给定时间都由每个订阅consumer组中的一个consumer消费。 这意味着consumer在每个分区中的位置只是一个整数,即下一条要消费的消息的偏移量。 这使得关于已消耗内容的状态非常小,每个分区只有一个数字。 可以定期检查此状态。 这使得消息确认非常廉价。

这个决定有一个附带的好处。 consumer可以故意回退到旧的偏移量并重新消费数据。 这违反了队列的共同约定,但事实证明这是许多consumer的基本特征。 比如consumer代码有bug,在消费了一些消息后被发现,修复bug后consumer可以重新消费这些消息。

 

离线数据加载
可扩展的持久性允许consumer,只定期消费的可能性,例如批量数据加载,周期性地将数据批量加载到离线系统,如 Hadoop 或关系数据仓库。
在 Hadoop 的情况下,我们通过将负载拆分到各个map任务来并行化数据加载,每个映射任务对应一个节点/topic/分区组合,从而允许加载中的完全并行。 Hadoop 提供了任务管理,失败的任务可以重新启动而没有重复数据的危险——它们只是从原来的位置重新启动。

 

static membership
静态成员旨在提高流应用程序、consumer组和其他构建等在rebalance协议之上的应用程序的可用性。 rebalance协议依赖group coordinator将实体 ID 分配给组成员。 这些生成的 ID 是短暂的,并且会在成员重新启动和重新加入时更改。 对于基于consumer的应用程序,这种“dynamic membership”可能会导致在代码部署、配置更新和定期重启等管理操作期间将大部分任务重新分配给不同的实例。 对于大型状态应用程序,打乱的任务需要很长时间才能恢复到之前的状态,并导致应用程序部分或完全不可用。 受此观察的启发,Kafka 的group management protocol允许组成员提供持久实体 ID。 基于这些 id,组成员资格保持不变,因此不会触发rebalance。
如果你想使用static membership,

  1. 将broker集群和客户端应用程序升级到 2.3 或更高版本,并确保升级后的broker使用 2.3 或更高版本的 inter.broker.protocol.version。
  2. 将配置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 设置为一组下每个consumer实例的唯一值。
  3. 对于 Kafka Streams 应用程序,为每个 KafkaStreams 实例设置唯一的 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG 就足够了,与实例使用的线程数无关。

如果您的broker版本低于 2.3,但您选择在客户端设置 ConsumerConfig#GROUP_INSTANCE_ID_CONFIG,应用程序将检测broker版本,然后抛出 UnsupportedException。 如果您不小心为不同的实例配置了重复的 ID,broker端的防护机制将通过触发 org.apache.kafka.common.errors.FencedInstanceIdException 通知您的重复客户端立即关闭。 有关详细信息,请参阅 KIP-345

七.消息语义

现在我们对producer和consumer的工作方式有了一些了解,让我们讨论一下 Kafka 在producer和consumer之间提供的语义保证。 显然,可以提供多种可能的消息传递保证:

At most once——消息可能会丢失,但永远不会重新传递。
At least once——消息永远不会丢失,但可以重新传递。
Exactly once——这才是人们真正想要的,每条消息只传递一次。
值得注意的是,这分为两个问题:发布消息的持久性保证和消费消息时的保证。


许多系统声称提供“Exactly once”交付语义,但阅读细则很重要,这些声明中的大多数都是误导性的(即它们不会转化为consumer或producer可能失败的情况,有多个consumer进程的情况,或写入磁盘的数据可能丢失的情况)。

Kafka 的语义是直截了当的。 发布消息时,我们有消息被“提交”到日志的概念。 一旦发布的消息被提交,只要复制该消息写入的分区的一个broker保持“存活”状态,它就不会丢失。 提交消息的定义、活动分区以及我们尝试处理的故障类型的描述将在下一节中更详细地描述。 现在让我们假设一个完美的、无损的broker,并尝试理解对producer和consumer的保证。 如果producer尝试发布消息并遇到网络错误,则无法确定此错误是发生在消息提交之前还是之后。 这类似于使用自动生成的键插入数据库表的语义。

在 0.11.0.0 之前,如果producer未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。 这提供了At least once传递语义,因为如果原始请求实际上已经成功,则消息可能会在重新发送期间再次写入日志。 从 0.11.0.0 开始,Kafka producer还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目​​。 为此,broker为每个producer分配一个 ID,并使用producer随每条消息发送的序列号对消息进行重复数据删除。 同样从 0.11.0.0 开始,producer支持使用类似事务的语义将消息发送到多个topic分区的能力:即 要么所有消息都已成功写入,要么都没有。 主要用例是 Kafka topic之间的一次性处理(如下所述)。

并非所有用例都需要如此强大的保证。 对于对延迟敏感的用途,我们允许producer指定其所需的持久性级别。 如果producer指定它想要等待提交的消息,这可能需要 10 毫秒的数量级。 然而,producer也可以指定它想要完全异步地执行发送,或者它只想等到leader(但不一定是follower)收到消息。

 

现在让我们从consumer的角度来描述语义。 所有副本都有完全相同的日志和相同的偏移量。 consumer控制其在此日志中的位置。 如果consumer从未崩溃,它可以将这个位置存储在内存中,但如果consumer失败并且我们希望这个topic分区被另一个进程接管,新进程将需要选择一个合适的位置来开始处理。 假设consumer读取了一些消息——它有几个选项来处理消息和更新它的位置。

  • 它可以读取消息,然后保存它在日志中的位置,最后处理消息。 在这种情况下,consumer进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。 在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息还没有被处理。 这对应于“At most once”语义,因为在consumer失败消息的情况下可能不会被处理。
  • 它可以读取消息、处理消息并最终保存其位置。 在这种情况下,consumer进程有可能在处理消息之后但在保存其位置之前崩溃。 在这种情况下,当新进程接管它接收到的前几条消息时,它已经被处理过了。 这对应于consumer失败情况下的“At least once”语义。 在许多情况下,消息有一个主键,因此更新是幂等的(两次接收相同的消息只是用它自己的另一个副本覆盖记录)。

那么 exactly once 语义(即你真正想要的东西)呢? 当从 Kafka topic消费并生产到另一个topic时(如在 Kafka Streams 应用程序中),我们可以利用上面提到的 0.11.0.0 中的新事务producer功能。 consumer的位置作为消息存储在topic中,因此我们可以在与接收处理数据的输出topic相同的事务中将偏移量写入 Kafka。 如果交易被中止,consumer的位置将恢复到它的旧值,并且输出topic的产生的数据将对其他consumer不可见,这取决于他们的“隔离级别”。 在默认的“read_uncommitted”隔离级别中,所有消息对consumer都是可见的,即使它们是中止事务的一部分,但在“read_committed”中,consumer将只返回来自已提交事务的消息(以及任何不属于这部分的消息)。

tips:kafka事务示例

写入外部系统时,限制在于需要将consumer的位置与实际存储为输出的内容相协调。 实现这一点的经典方法是在consumer位置的存储和consumer输出的存储之间引入两阶段提交。 但这可以通过让consumer将其偏移量存储在与其输出相同的位置来更简单和更普遍地处理。 这更好,因为consumer可能想要写入的许多输出系统不支持两阶段提交。 作为这方面的一个例子,考虑一个 Kafka Connect 连接器,它在 HDFS 中填充数据以及它读取的数据的偏移量,这样可以保证数据和偏移量都被更新,或者两者都不更新。 我们对许多其他数据系统遵循类似的模式,这些系统需要这些更强的语义并且消息没有主键以允许重复数据删除。

因此Kafka有效地支持Kafka Streams中的exactly-once delivery,在Kafka topics之间传输和处理数据时,一般可以使用事务性producer/consumer来提供exactly-once交付。 其他目标系统的 Exactly-once 交付通常需要与此类系统合作,但 Kafka 提供了偏移量,这使得实现这一点变得可行(另请参阅 Kafka Connect)。 否则,Kafka 默认保证At least once交付,并允许用户通过在处理一批消息之前禁用对producer的重试并在consumer中提交偏移量来实现At most once交付。

八.复制

Kafka 在可配置数量的服务器上为每个topic的分区复制日志(您可以逐个topic地设置此复制因子)。 这允许在集群中的服务器发生故障时自动故障转移到这些副本,因此消息在出现故障时仍然可用。

其他消息系统提供了一些与复制相关的功能,但是,在我们(完全有偏见的)看来,这似乎是一个附加的东西,没有被大量使用,并且有很大的缺点:副本不活跃,吞吐量受到严重影响,它需要繁琐的手动配置等, Kafka 旨在默认与复制一起使用——事实上,我们将未复制的topic实现为复制因子为 1 的复制topic。

复制单元是topic分区。 在非故障情况下,Kafka 中的每个分区都有一个leader和0个或多个follower。 包括leader在内的副本总数构成复制因子。 所有写入都到分区的leader,从分区的leader或follower读。 通常,分区比broker多得多,leader平均分布在broker之间。 follower上的日志与leader的日志相同——都具有相同的偏移量和相同顺序的消息(当然,在任何给定时间,leader可能在其日志末尾有一些尚未复制的消息).

follower像普通的 Kafka consumer一样消费来自leader的消息,并将它们应用到自己的日志中。 让follower从leader那里pull有一个很好的特性,那就是允许follower自然地将他们正在应用到他们日志的时候进行批处理。

与大多数分布式系统一样,自动处理故障需要精确定义节点“存活”的含义。 在 Kafka 中,一个称为“controller”的特殊节点负责管理集群中broker的注册。 Broker 活跃度有两个条件:

  1. broker必须与controller保持活跃的会话,以便接收定期的元数据更新。
  2. 作为follower的broker必须复制leader的数据,而不是落后“太远”。

“活动会话”的含义取决于集群配置。 对于 KRaft 集群,通过向controller发送定期心跳来维持活动会话。 如果controller在 broker.session.timeout.ms 配置的超时到期之前未能收到心跳,则该节点被视为离线。

对于使用 Zookeeper 的集群,活性是通过broker在其 Zookeeper 会话初始化时创建的临时节点的存在间接确定的。 如果broker在 zookeeper.session.timeout.ms 到期之前未能向 Zookeeper 发送心跳后丢失其会话,则该节点将被删除。 然后,controller会通过 Zookeeper 监视通知节点删除,并将broker标记为离线。

我们将满足这两个条件的节点称为“同步”,以避免“活着”或“失败”的含糊不清。 leader跟踪一组“同步”副本,称为 ISR。 如果这些条件中的任何一个未能满足,则broker将从 ISR 中移除。 例如,如果一个follower宕机,那么controller将通过丢失会话来通知失败,并将从 ISR 中删除broker。 另一方面,如果 follower 落后于 leader 太远但仍有活动会话,则 leader 也可以将其从 ISR 中移除。 滞后副本的确定是通过 replica.lag.time.max.ms 配置来控制的。 无法在此配置设置的最长时间内赶上leader日志末尾的副本将从 ISR 中删除。

在分布式系统术语中,我们只尝试处理“失败/恢复”故障模型,其中节点突然停止工作然后恢复(可能不知道它们已经宕机)。 Kafka 不处理所谓的“拜占庭式”故障,在这种情况下,节点会产生任意或恶意的响应(可能是由于错误或犯规)。

我们现在可以更精确地定义,当该分区的 ISR 中的所有副本都将一条消息应用到它们的日志时,该消息被视为已提交。 只有提交的消息才会发送给leader。 这意味着leader不必担心如果leader失败可能会看到一条可能丢失的消息。 另一方面,producer可以选择是否等待消息被提交,这取决于他们对延迟和持久性之间权衡的偏好。 此首选项由producer使用的 acks 设置控制。 请注意,topic具有同步副本“最小数量”的设置,当producer请求确认消息已写入完整的同步副本集时,将检查该设置。 如果producer请求不那么严格的确认,消息可以被提交和消费,即使同步副本的数量低于最小值(例如,它可以与leader一样低)。

Kafka 提供的保证是提交的消息不会丢失,只要至少有一个同步副本始终处于存活状态。

在短暂的故障转移期后,Kafka 将在出现节点故障时保持可用,但在出现网络分区时可能无法保持可用。

 

复制日志:Quorums、ISR 和状态机
Kafka 分区的核心是复制的日志。 复制日志是分布式数据系统中最基本的原语之一,实现它的方法有很多种。 复制的日志可以被其他系统用作以状态机样式实现其他分布式系统的原语。
复制的日志模拟了对一系列值的顺序达成共识的过程(通常将日志条目编号为 0、1、2、...)。 有很多方法可以实现这一点,但最简单和最快的是使用一个leader来选择提供给它的值的排序。 只要leader还活着,所有的follower只需要复制leader选择的值和顺序。

当然,如果leader没有失败,我们就不需要follower了! 当leader宕机时,我们需要从follower中选择一个新的leader。 但是follower本身可能会落后或崩溃,所以我们必须确保我们选择了一个最新的follower。 日志复制算法必须提供的基本保证是,如果我们告诉客户端一条消息已提交,而leader失败了,我们选出的新leader也必须拥有该消息。 这会产生一个权衡:如果leader等待更多的follower在获取信息之前提交一条信息,那么将会有更多的选举leader。

如果您选择所需的确认数量和必须比较的日志数量以选择leader,从而保证有重叠,那么这称为Quorum。

这种权衡的一种常见方法是对提交决定和leader选举都使用多数表决。 这不是 Kafka 所做的,但无论如何让我们探索它以了解权衡。 假设我们有 2f+1 个副本。 如果 f+1 个副本必须在leader声明提交之前收到一条消息,并且如果我们通过从至少 f+1 个副本中选择具有最完整日志的follower来选举新的leader,那么,不超过 f失败时,leader保证拥有所有已提交的消息。 这是因为在任何 f+1 个副本中,必须至少有一个副本包含所有已提交的消息。 该副本的日志将是最完整的,因此将被选为新的leader。 每个算法都必须处理许多剩余的细节(例如精确定义什么使日志更完整,确保leader失败期间日志的一致性或更改副本集中的服务器集)但我们现在将忽略这些。

这种多数表决方法有一个非常好的特性:延迟仅取决于最快的服务器。 也就是说,如果复制因子为 3,则延迟由较低的follower而不是较高的follower决定。

这个家族中有丰富多样的算法,包括 ZooKeeper 的 Zab、Raft 和 Viewstamped Replication。 据我们所知,与 Kafka 的实际实现最相似的学术出版物是来自 Microsoft 的 PacificA。

多数表决的不利之处在于,您无需多次失败就会失去可选举的leader。 容忍一次故障需要三份数据,容忍两次故障需要五份数据。 根据我们的经验,只有足够的冗余来容忍单个故障对于实际系统来说是不够的,但是每次写入五次,磁盘空间要求是 5 倍,吞吐量是 1/5,对于大容量数据问题来说并不是很实用。 这可能就是为什么Quorum算法更常出现在 ZooKeeper 等共享集群配置中,但不太常见于主数据存储的原因。 例如,在 HDFS 中,namenode 的高可用性功能是建立在基于多数投票的日志上的,但这种更昂贵的方法并不用于数据本身。

Kafka 采用略微不同的方法来选择其Quorum集。 Kafka 不是多数表决,而是动态维护一组同步到leader的同步副本(ISR)。 只有这个集合中的成员才有资格被选为leader。 在所有同步副本都收到写入之前,不会将对 Kafka 分区的写入视为已提交。 只要 ISR 集发生变化,它就会保留在集群元数据中。 正因为如此,ISR 中的任何副本都有资格被选为leader。 这是 Kafka 的使用模型的一个重要因素,其中有许多分区并且确保leader平衡很重要。 使用此 ISR 模型和 f+1 个副本,Kafka topic可以容忍 f 次故障而不会丢失已提交的消息。

对于我们希望处理的大多数用例,我们认为这种权衡是合理的。 实际上,为了容忍 f 次失败,多数投票和 ISR 方法都将在提交消息之前等待相同数量的副本确认(例如,为了在一次失败中幸存下来,多数群体需要三个副本和一个确认,而 ISR 方法需要两个副本和一个确认)。 在没有最慢服务器的情况下提交的能力是多数表决方法的一个优势。 但是,我们认为通过允许客户端选择是否阻塞消息提交来改进它,并且由于所需的复制因子较低而带来的额外吞吐量和磁盘空间是值得的。

另一个重要的设计区别是 Kafka 不要求崩溃的节点恢复时所有数据都完好无损。 在这个空间中,复制算法依赖于“稳定存储”的存在并不少见,这种存储在任何故障恢复场景中都不会丢失,而不会出现潜在的一致性违规。 这个假设有两个主要问题。 首先,磁盘错误是我们在持久数据系统的实际操作中观察到的最常见问题,它们通常不会让数据保持完整。 其次,即使这不是问题,我们也不希望在每次写入时都使用 fsync 来保证一致性,因为这会使性能降低两到三个数量级。 我们允许副本重新加入 ISR 的协议,确保在重新加入之前,它必须再次完全重新同步,即使它在崩溃中丢失了未刷新的数据。

tips: es使用的是Quorum算法。

unclean leader选举:如果他们都宕机了怎么办?
请注意,Kafka 对数据丢失的保证基于至少一个副本保持同步。 如果复制分区的所有节点都宕机,则此保证不再有效。
然而,当所有副本都宕机时,实际系统需要做一些合理的事情。 如果您不幸遇到这种情况,请务必考虑会发生什么。 有两种行为可以实现:

  1. 等待 ISR 中的一个副本恢复并选择这个副本作为leader(希望它仍然拥有所有数据)。
  2. 选择第一个的副本(不一定在 ISR 中)作为leader。

这是可用性和一致性之间的简单权衡。 如果我们在 ISR 中等待副本,那么只要这些副本关闭,我们就将保持不可用状态。 如果此类副本被破坏或数据丢失,那么我们将永久关闭。 另一方面,如果一个不同步的副本恢复并且我们允许它成为leader,那么它的日志就会成为真实的来源,即使它不能保证有每条提交的消息。 默认情况下,从 0.11.0.0 版本开始,Kafka 选择第一种策略并倾向于等待一致的副本。 可以使用配置属性 unclean.leader.election.enable 更改此行为,以支持正常运行时间优于一致性的用例。

这种困境并不是 Kafka 特有的。 它存在于任何基于群体的方案中。 例如,在多数表决方案中,如果大多数服务器遭受永久性故障,那么您必须选择丢失 100% 的数据,或者通过将现有服务器上保留的内容作为新的真实来源来违反一致性。

 

可用性和耐用性保证
写入 Kafka 时,producer可以选择是等待消息被 0,1 还是所有 (-1) 个副本确认。 请注意,“所有副本的确认”并不能保证所有分配的副本都已收到消息。 默认情况下,当 acks=all 时,一旦所有当前同步副本都收到消息,就会发生确认。 例如,如果一个topic只配置了两个副本并且一个失败了(即只剩下一个同步副本),那么指定 acks=all 的写入将成功。 但是,如果其余副本也发生故障,这些写入可能会丢失。 虽然这确保了分区的最大可用性,但对于一些更喜欢持久性而不是可用性的用户来说,这种行为可能是不受欢迎的。 因此,我们提供了两个topic级配置,可用于使消息持久性优于可用性:

  1. 禁用unclean的leader选举——如果所有副本都不可用,那么分区将保持不可用状态,直到最近的leader再次可用。 这实际上更倾向于不可用而不是消息丢失的风险。 请参阅上一节关于 Unclean Leader Election 的说明。
  2. 指定最小 ISR 大小 - 如果 ISR 的大小超过某个最小值,分区将仅接受写入,以防止丢失仅写入单个副本的消息,该副本随后变得不可用。 此设置仅在producer使用 acks=all 并保证消息将被至少这么多同步副本确认时才生效。 此设置提供了一致性和可用性之间的权衡。 最小 ISR 大小的较高设置可保证更好的一致性,因为可以保证将消息写入更多副本,从而降低丢失消息的可能性。 但是,它会降低可用性,因为如果同步副本的数量低于最小阈值,分区将不可用于写入。

 

副本管理
上面关于复制日志的讨论实际上只涵盖了一个日志,即 一个topic分区。 但是,Kafka 集群将管理成百上千个这样的分区。 我们尝试以循环方式平衡集群内的分区,以避免将高容量topic分区聚集在少量节点上。 同样,我们尝试平衡leader,以便每个节点包含一定比率的leader。
优化leader选举过程也很重要,因为这是不可用的关键窗口。 leader选举的简单实现最终会在节点失败时为该节点托管的所有分区运行每个分区的选举。 正如上面关于复制的部分所讨论的,Kafka 集群有一个特殊的角色,称为“controller”,负责管理broker的注册。 如果 controller 检测到 broker 发生故障,它会负责选举 ISR 的剩余成员之一作为新的 leader。 结果是我们能够将许多所需的leader层变更通知在一起批处理,这使得大量分区的选举过程成本更低、速度更快。 如果controller本身出现故障,则将选举另一个controller。

九.日志压缩

日志压缩确保 Kafka 始终至少保留单个topic分区数据日志中每个消息键的最后一个已知值。 它解决了如下用例和场景,例如在应用程序崩溃或系统故障后恢复状态,或在操作维护期间应用程序重启后重新加载缓存。 让我们更详细地研究这些用例,然后描述压缩的工作原理。
到目前为止,我们只描述了更简单的数据保留方法,即在固定时间段后或当日志达到某个预定大小时丢弃旧日志数据。 这适用于时间事件数据,例如每条记录独立的日志记录。 然而,一类重要的数据流是对给予键可变数据的变更日志(例如,对数据库表的更改)。

让我们讨论这种流的具体示例。 假设我们有一个包含用户电子邮件地址的topic; 每次用户更新他们的电子邮件地址时,我们都会使用他们的用户 ID 作为主键向该topic发送一条消息。 现在假设我们在一段时间内为 ID 为 123 的用户发送以下消息,每条消息对应于电子邮件地址的更改(省略其他 ID 的消息):

123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

日志压缩为我们提供了更细粒度的保留机制,因此我们保证至少保留每个主键的最后更新(例如 bill@gmail.com)。 通过这样做,我们保证日志包含每个键的最终值的完整快照,而不仅仅是最近更改的键。 这意味着下游leader可以在这个topic之外恢复他们自己的状态,而我们不必保留所有更改的完整日志。
让我们先看看一些有用的用例,然后再看看如何使用它。

  1. 数据库更改订阅。 通常需要在多个数据系统中拥有一个数据集,并且这些系统中的一个通常是某种数据库(RDBMS 或者可能是其他的键值存储)。 例如,您可能有一个数据库、一个缓存、一个搜索集群和一个 Hadoop 集群。 对数据库的每次更改都需要反映在缓存、搜索集群中,并最终反映在 Hadoop 中。 在只处理实时更新的情况下,您只需要最近的日志。 但是,如果您希望能够重新加载缓存或恢复失败的搜索节点,您可能需要一个完整的数据集。
  2. Event sourcing。 这是一种应用程序设计风格,它将查询处理与应用程序设计放在一起,并使用更改日志作为应用程序的主要存储。
  3. 日志记录以实现高可用性。 一个执行本地计算的进程可以通过注销它对其本地状态所做的更改来实现容错,这样另一个进程可以重新加载这些更改并在它失败时继续执行。 一个具体的例子是在流查询系统中处理计数、聚合和其他类似“分组依据”的处理。 Samza 是一个实时流处理框架,正是为此目的而使用此功能。

tips:mysql中的redo log,es中的transaction log,redis中的aof持久化都是日志记录以实现高可用性的例子。这里的日志压缩类比redis中的aof日志重写。

在这些情况中的每一种情况下,都需要主要处理实时变化,但偶尔,当机器崩溃或数据需要重新加载或重新处理时,需要进行完全加载。 日志压缩可以同时支持这两个用例。 这篇博文this blog post中更详细地描述了这种日志的使用方式。
总体思路很简单。 如果我们有无限的日志保留,并且我们记录了上述情况下的每个更改,那么我们将捕获系统从第一次开始时开始的每次状态。 使用这个完整的日志,我们可以通过重放日志中的前 N ​​条记录来恢复到任何时间点。 这个假设的完整日志对于多次更新单个记录的系统不是很实用,因为即使对于稳定的数据集,日志也会无限增长。 丢弃旧更新的简单日志保留机制将限制空间,但日志不再是恢复当前状态的一种方式——现在从日志开头恢复不再重新创建当前状态,因为旧更新可能根本无法捕获.

日志压缩是一种提供更细粒度的保留每条记录的机制,而不是粗粒度的基于时间的保留。 我们的想法是有选择地删除我们使用相同主键进行更新的记录。 这样可以保证日志至少具有每个键的最后状态。

可以为每个topic设置此保留策略,因此单个集群可以有一些topic,其中保留是通过大小或时间强制执行的,而其他topic的保留是通过压缩强制执行的。

此功能的灵感来自 LinkedIn 最古老、最成功的基础设施之一——称为 Databus 的数据库变更日志缓存服务。 与大多数日志结构存储系统不同,Kafka 是为订阅而构建的,并组织数据以实现快速线性读写。 与 Databus 不同,Kafka 充当真实来源存储,因此即使在上游数据源无法重放的情况下,它也很有用。

 

日志压缩基础
这是一张high-level图片,显示了 Kafka 日志的逻辑结构以及每条消息的偏移量。

日志的头部与传统的 Kafka 日志相同。 它具有密集的、顺序的偏移量并保留所有消息。 日志压缩添加了一个用于处理日志尾部的选项。 上图显示了带有压缩尾部 的 log。 请注意,日志尾部的消息保留了首次写入时分配的原始偏移量——永远不会改变。 另请注意,即使具有该偏移量的消息已被压缩,所有偏移量仍会保留在日志中的有效位置; 在这种情况下,该位置与日志中出现的下一个最高偏移量无法区分。 例如,在上图中,偏移量 36、37 和 38 都是等效位置,从这些偏移量中的任何一个开始的读取都将返回以 38 开头的消息集。

压缩也允许删除。 带有键和空负载的消息将被视为从日志中删除。 这样的记录有时被称为tombstone。 此删除标记将导致删除任何具有该键的先前消息(就像任何具有该键的新消息一样),但删除标记是特殊的,因为它们将在一段时间后从日志中清除以释放空间. 不再保留删除的时间点在上图中标记为“删除保留点”。

压缩是通过定期重新复制日志段在后台完成的。 清理不会阻止读取,并且可以限制使用不超过可配置的 I/O 吞吐量,以避免影响producer和leader。 压缩日志段的实际过程如下所示:

 

日志压缩提供什么保证?
日志压缩保证以下内容:

  1. 任何停留在日志头部的leader都会看到写入的每条消息; 这些消息将具有顺序偏移量。 topic的 min.compaction.lag.ms 可用于保证消息写入后必须经过的最短时间长度才能被压缩。 IE。 它提供了每条消息将在(未压缩的)头部保留多长时间的下限。 topic的 max.compaction.lag.ms 可用于保证消息写入时间和消息符合压缩条件的时间之间的最大延迟。
  2. 始终保持消息的顺序。 压缩永远不会重新排序消息,只是删除一些。
  3. 消息的偏移量永远不会改变。 它是日志中某个位置的永久标识符。
  4. 任何从日志开头开始的leader都将至少看到所有记录的最终状态,按照它们的写入顺序。 此外,如果leader在小于topic的 delete.retention.ms 设置(默认为 24​​ 小时)的时间段内到达日志头部,则会看到所有已删除记录的删除标记。 换句话说:由于删除标记的删除与读取同时发生,因此如果leader滞后超过 delete.retention.ms,则它有可能错过删除标记。

日志压缩详细信息
日志压缩由日志清理器处理,这是一个重新复制日志段文件的后台线程池,删除其键出现在日志头部的记录。 每个压缩器线程的工作方式如下:

  1. 它选择日志头与日志尾的比率最高的日志
  2. 它为日志头部的每个键创建最后一个偏移量的简洁摘要
  3. 它从头到尾复制日志,删除日志中稍后出现的键。 新的、干净的段会立即交换到日志中,因此所需的额外磁盘空间只是一个额外的日志段(不是日志的完整副本)。
  4. 日志头的摘要本质上只是一个空间紧凑的哈希表。 每个条目正好使用 24 个字节。 结果,使用 8GB 的​​清理器缓冲区,一次清理器迭代可以清理大约 366GB 的日志头(假设有 1k 条消息)。

 

配置日志清理器
默认情况下启用日志清理器。 这将启动cleaner线程池。 要针对特定​​topic启用日志清理,请添加特定于日志的属性
log.cleanup.policy=compact
log.cleanup.policy 属性是在broker的 server.properties 文件中定义的broker配置设置; 它会影响集群中所有没有配置覆盖的topic,如此处所述。 日志清理器可以配置为保留最少量的未压缩的日志“头”。 这是通过设置压缩时间延迟来实现的。
log.cleaner.min.compaction.lag.ms
这可用于防止比最小消息年龄更新的消息被压缩。 如果未设置,则所有日志段都符合压缩条件,但最后一段除外,即 当前正在写入的那个。 活动段不会被压缩,即使它的所有消息都早于最小压缩时间延迟。 可以配置日志清理器以确保最大延迟,在该延迟之后,日志的未压缩“头”有资格进行日志压缩。
log.cleaner.max.compaction.lag.ms
这可用于防止低生产率的日志在无限制的持续时间内不符合压缩条件。 如果未设置,则不压缩不超过 min.cleanable.dirty.ratio 的日志。 请注意,这个压缩截止日期并不是硬性保证,因为它仍然取决于日志清理器线程的可用性和实际压缩时间。 您将需要监控 uncleanable-partitions-count、max-clean-time-secs 和 max-compaction-delay-secs 指标。
此处here.描述了更cleanner的配置。

十.配额

Kafka 集群能够对请求强制执行配额,以控制客户端使用的broker资源。 Kafka broker可以为共享配额的每组客户端强制执行两种类型的客户端配额:

  1. 网络带宽配额定义字节速率阈值(自 0.9 起)
  2. 请求率配额将 CPU 利用率阈值定义为网络和 I/O 线程的百分比(自 0.11 起)

tips,这里的配额可以类比为常见的限流器

为什么需要配额?
producer和leader有可能生产/消费非常大量的数据或以非常高的速率生成请求,从而垄断broker资源,导致网络饱和,并且通常 DOS 其他客户端和broker本身。 拥有配额可以防止这些问题,并且在大型多租户集群中尤为重要,在这种情况下,一小部分行为不端的客户端可能会降低行为良好的用户体验。 事实上,当将 Kafka 作为服务运行时,这甚至可以根据商定的合同强制执行 API 限制。

 

client group
Kafka 客户端的身份是用户主体,代表安全集群中经过身份验证的用户。 在支持未经身份验证的客户端的集群中,用户主体是broker使用可配置的 PrincipalBuilder 选择的一组未经身份验证的用户。 Client-id 是客户端的逻辑分组,具有由客户端应用程序选择的有意义的名称。 元组 (user, client-id) 定义了共享用户主体和客户端 id 的安全逻辑客户端组。
配额可以应用于(用户、客户端 ID)、用户或客户端 ID 组。 对于给定的连接,将应用与该连接匹配的最具体的配额。 配额组的所有连接共享为该组配置的配额。 例如,如果 (user="test-user", client-id="test-client") 的生产配额为 10MB/秒,这将在用户“test-user”的所有producer实例与客户端之间共享- ID“测试客户端”。

 

配额配置
可以为(用户、客户端 ID)、用户和客户端 ID 组定义配额配置。 可以在任何需要更高(或更低)配额的配额级别覆盖默认配额。 该机制类似于每个topic的日志配置覆盖。 用户和 (user, client-id) 配额覆盖写入 /config/users 下的 ZooKeeper,client-id 配额覆盖写入 /config/clients 下。 这些覆盖被所有broker读取并立即生效。 这让我们无需滚动重启整个集群即可更改配额。 有关详细信息,请参见此处 here 。 每个组的默认配额也可以使用相同的机制动态更新。

配额配置的优先顺序是:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>

 

网络带宽配额
网络带宽配额定义为共享配额的每组客户端的字节速率阈值。 默认情况下,每个唯一的客户端组都会收到由集群配置的以字节/秒为单位的固定配额。 此配额是基于每个broker定义的。 在客户端受到限制之前,每组客户端最多可以发布/获取每个broker的 X 字节/秒。

 

请求速率配额
请求率配额定义为客户端可以在配额窗口内使用每个broker的请求处理程序 I/O 线程和网络线程的时间百分比。 n% 的配额表示一个线程的 n%,因此配额超出总容量 ((num.io.threads + num.network.threads) * 100)%。 在被限制之前,每组客户端可以在配额窗口中的所有 I/O 和网络线程中使用总百分比最高为 n% 的百分比。 由于为 I/O 和网络线程分配的线程数通常基于broker主机上可用的内核数,因此请求率配额代表每组共享配额的客户端可能使用的 CPU 的总百分比。

 

执行细节
默认情况下,每个唯一的客户端组都会收到集群配置的固定配额。 此配额是基于每个broker定义的。 在受到限制之前,每个客户端都可以使用每个broker的这个配额。 我们决定为每个broker定义这些配额比为每个客户端设置固定的集群带宽要好得多,因为这需要一种机制来在所有broker之间共享客户端配额使用情况。 这可能比配额实施本身更难做到!

broker在检测到配额违规时如何反应? 在我们的解决方案中,broker首先计算将违规客户置于其配额之下所需的延迟量,并立即返回包含延迟的响应。 在获取请求的情况下,响应将不包含任何数据。 然后,broker将到客户端的通道静音,不再处理来自客户端的请求,直到延迟结束。 在收到具有非零延迟持续时间的响应后,Kafka 客户端还将避免在延迟期间向broker发送进一步的请求。 因此,来自受限客户端的请求会被双方有效阻止。 即使使用不尊重broker延迟响应的旧客户端实现,broker通过静音其套接字通道施加的背压仍然可以处理行为不端的客户端的节流。 那些向受限通道发送进一步请求的客户端只有在延迟结束后才会收到响应。

在多个小窗口(例如 30 个窗口,每个 1 秒)上测量字节速率和线程利用率,以便快速检测和纠正配额违规。 通常,具有较大的测量窗口(例如 10 个窗口,每个窗口 30 秒)会导致大量流量突发,随后出现长时间延迟,这在用户体验方面并不是很好。

 

十一.参考

https://kafka.apache.org/documentation/

内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/darcy-yuan/p/17330215.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!