Kafka 知识汇总

简介

Kafka 被定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。Kafka 在现代的系统中主要承担三大角色:

  • 消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
  • 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
  • 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

基本概念

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

kafka基本概念

  • Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。

  • Consumer:消费者,也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。

  • Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,习惯使用首字母小写的 broker 来表示服务代理节点。

在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序

kafka 消息写入

每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

如下图所示,Kafka 集群中有 4 个 broker,某个主题中有 3 个分区,且副本因子(即副本个数)也为 3,如此每个分区便有 1 个 leader 副本和 2 个 follower 副本。生产者和消费者只与 leader 副本进行交互,而 follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。

kafka多副本机制

Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失

ISR 机制

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与 leader 副本同步滞后过多的副本(不包括leader副本)组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。

leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了leader副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息

如下图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset(LogStartOffset)为 0,最后一条消息的 offset 为 8,offset 为 9 的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 offset 在 0 至 5 之间的消息,而 offset 为 6 的消息对消费者而言是不可见的。

日志文件格式

LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,图中 offset 为 9 的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息

如下图所示,假设某个分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都为 3:

ISR机制-1

消息 3 和消息 4 从生产者发出之后会被先存入 leader 副本:

ISR机制-2

在消息写入 leader 副本之后,follower 副本会发送拉取请求来拉取消息 3 和消息 4 以进行消息同步。

在同步过程中,不同的 follower 副本的同步效率也不尽相同。如下图所示,在某一时刻 follower1 完全跟上了 leader 副本而 follower2 只同步了消息 3,如此 leader 副本的 LEO 为5,follower1 的 LEO 为5,follower2 的 LEO 为 4,那么当前分区的 HW 取最小值4,此时消费者可以消费到offset为0至3之间的消息。

ISR机制-3

写入消息(情形4)如下所示,所有的副本都成功写入了消息3和消息4,整个分区的HW和LEO都变为5,因此消费者可以消费到offset为4的消息了。

ISR机制-4

Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。而在异步复制方式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写入就被认为已经成功提交。在这种情况下,如果 follower 副本都还没有复制完而落后于 leader 副本,突然 leader 副本宕机,则会造成数据丢失。Kafka 使用的这种ISR的方式则有效地权衡了数据可靠性和性能之间的关系。

相关参数

这里列举几个常用的参数,具体其他的所有参数信息可以在 Kafka 官网查看。

zookeeper.connect

该参数指明 broker 要连接的 ZooKeeper 集群的服务地址(包含端口号),没有默认值,且此参数为必填项。可以配置为 localhost:2181,如果 ZooKeeper 集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181localhost2:2181localhost3:2181 这种格式。

最佳的实践方式是再加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的,也可以实现多个 Kafka 集群复用一套 ZooKeeper 集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181localhost2:2181localhost3:2181/kafka这种,如果不指定 chroot,那么默认使用 ZooKeeper 的根路径。

listeners

该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表,配置格式为 protocol1://hostname1:port1protocol2://hostname2:port2,其中 protocol 代表协议类型,Kafka 当前支持的协议类型有 PLAINTEXT、SSL、SASL_SSL 等,如果未开启安全认证,则使用简单的 PLAINTEXT 即可。hostname 代表主机名,port代表服务端口,此参数的默认值为 null。比如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开。如果不指定主机名,则表示绑定默认网卡,注意有可能会绑定到 127.0.0.1,这样无法对外提供服务,所以主机名最好不要为空;如果主机名是0.0.0.0,则表示绑定所有的网卡。与此参数关联的还有 advertised.listeners,作用和 listeners 类似,默认值也为 null。不过 advertised.listeners 主要用于 IaaS(Infrastructure as a Service)环境,比如公有云上的机器通常配备有多块网卡,即包含私网网卡和公网网卡,对于这种情况而言,可以设置 advertised.listeners 参数绑定公网IP供外部客户端使用,而配置 listeners 参数来绑定私网 IP 地址供 broker 间通信使用。

broker.id

该参数用来指定 Kafka 集群中 broker 的唯一标识,默认值为 -1。如果没有设置,那么 Kafka 会自动生成一个。这个参数还和 meta.properties 文件及服务端参数 broker.id.generation.enablereserved.broker.max.id 有关。

log.dir 和 log.dirs

Kafka 把所有的消息都保存在磁盘上,而这两个参数用来配置 Kafka 日志文件存放的根目录。一般情况下,log.dir 用来配置单个根目录,而 log.dirs 用来配置多个根目录(以逗号分隔),但是 Kafka 并没有对此做强制性限制,也就是说,log.dirlog.dirs 都可以用来配置单个或多个根目录。log.dirs 的优先级比 log.dir 高,但是如果没有配置 log.dirs,则会以 log.dir 配置为准。默认情况下只配置了 log.dir 参数,其默认值为 /tmp/kafka-logs

message.max.bytes

该参数用来指定 broker 所能接收消息的最大值,默认值为 1000012(B),约等于 976.6 KB。如果 Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出 RecordTooLargeException 的异常。如果需要修改这个参数,那么还要考虑 max.request.size (客户端参数)、max.message.bytes(topic端参数)等参数的影响。为了避免修改此参数而引起级联的影响,建议在修改此参数之前考虑分拆消息的可行性。

生产者

生产者的客户端使用不同语言不一样,这里就不写了,主要看看生产者客户端的内部实现原理。

消息在真正发往 Kafka 之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,最后才执行发送操作。

整体架构

生产者整体架构

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。

主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;

Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 RecordAccumulator 的内部还有一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size 参数来指定,默认值为16384B,即16KB。可以适当地调大 batch.size 参数以便多缓存一些消息。

ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息(ProducerRecord)流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。

在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本 <分区,Deque<ProducerBatch>> 的保存形式转变成 <Node,List<ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成 <Node,List<ProducerBatch>> 的形式之后,Sender 还会进一步封装成 <Node,Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.fight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque<Request> 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据更新

上面提及的 InFlightRequests 还可以获得 leastLoadedNode,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 NodeInFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。

对于下图的 InFlightRequests 来说,图中展示了三个节点 Node0、Node1 和 Node2,很明显 Node1 的负载最小。也就是说,Node1 为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互

kafka元数据更新

Producer 客户端在创建一条消息时,只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息

而实际上在使用时,只需要配置部分的 broker 节点的地址即可,不需要配置所有 broker 节点的地址,因为客户端可以自己发现其他 broker 节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及 leader 副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在 AR、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过 metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。客户端参数 metadata.max.age.ms 的默认值为300000,即 5 分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的,在创建完 MetadataRequest 之后同样会存入 InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。

相关参数

必填参数

在 Kafka 生产者客户端中,有 3 个参数是必填的:

  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为host1:port1host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
  • key.serializervalue.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。在发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。key.serializervalue.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。

接下来再说几个非必填,但比较重要的参数。

acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks 参数有3种类型的值(都是字符串类型)。

  • acks=1。默认值即为1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为1,是消息可靠性和吞吐量之间的折中方案。
  • acks=-1 或 acks=all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。

max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1 MB。一般情况下,这个默认值就可以满足大多数的应用场景了。并不建议盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的异常。比如将 broker 端的 message.max.bytes 参数配置为10,而 max.request.size参数配置为20,那么当发送一条大小为 15B 的消息时,生产者客户端就会报出异常。

retries 和 retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retriesretry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用来说,顺序性非常重要,比如 MySQL 的 binlog 传输,如果出现错误就会造成非常严重的后果。如果将 acks 参数配置为非零值,并且 max.in.fight.requests.per.connection 参数配置为大于 1 的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.fight.requests.per.connection 配置为1,而不是把 acks 配置为 0,不过这样也会影响整体的吞吐。

compression.type

这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

linger.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个 linger.ms 参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。

send.buffer.bytes

这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。

request.timeout.ms

这个参数用来配置 Producer 等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

消费者

消费者与消费组

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

如下图所示,某个主题中共有4个分区(Partition):P0、P1、P2、P3。有两个消费组 A 和 B 都订阅了这个主题,消费组 A 中有 4 个消费者(C0、C1、C2和C3),消费组 B 中有2个消费者(C4 和 C5)。按照 Kafka 默认的规则,最后的分配结果是消费组 A 中的每一个消费者分配到 1 个分区,消费组 B 中的每一个消费者分配到 2 个分区,两个消费组之间互不影响。每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费

消费者与消费者组-1

再看一下消费组内的消费者个数变化时所对应的分区分配的演变。假设目前某消费组内只有一个消费者 C0,订阅了一个主题,这个主题包含 7 个分区:P0、P1、P2、P3、P4、P5、P6。也就是说,这个消费者 C0 订阅了 7 个分区,具体分配情形下图:

消费者与消费者组-2

此时消费组内又加入了一个新的消费者 C1,按照既定的逻辑,需要将原来消费者 C0 的部分分区分配给消费者 C1 消费,如图所示。消费者 C0 和 C1 各自负责消费所分配到的分区,彼此之间并无逻辑上的干扰:

消费者与消费者组-3

紧接着消费组内又加入了一个新的消费者 C2,消费者 C0、C1 和 C2 按照下图中的方式各自负责消费所分配到的分区:

消费者与消费者组-4

消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。参考下图,一共有8个消费者,7个分区,那么最后的消费者 C7 由于分配不到任何分区而无法消费任何消息:

消费者与消费者组-5

以上分配逻辑都是基于默认的分区分配策略进行分析的,可以通过消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。

对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。

  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数 group.id 来配置,默认值为空字符串。

消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。

一个正常的消费逻辑需要具备以下几个步骤:

  • (1)配置消费者客户端参数及创建相应的消费者实例。
  • (2)订阅主题。
  • (3)拉取消息并消费。
  • (4)提交消费位移。
  • (5)关闭消费者实例。

消费者客户端相关参数

  • bootstrap.servers:该参数的释义和生产者客户端 KafkaProducer 中的相同,用来指定连接 Kafka 集群所需的 broker 地址清单,具体内容形式为 host1:port1host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的 broker 地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的 broker 地址信息,当其中任意一个宕机时,消费者仍然可以连接到 Kafka 集群上。

  • group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configuredgroupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。

  • key.deserializervalue.deserializer:与生产者客户端 KafkaProducer 中的 key.serializervalue.serializer 参数对应。消费者从 broker 端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中 key 和 value 所需反序列化操作的反序列化器,这两个参数无默认值。

  • client.id:这个参数用来设定 KafkaConsumer 对应的客户端 id,默认值也为“”。如果客户端不设置,则 KafkaConsumer 会自动生成一个非空字符串,内容形式如“consumer-1”“consumer-2”,即字符串“consumer-”与数字的拼接。

订阅主题和分区

通过使用 subscribe() 方法可以订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在 Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见

消费者不仅可以通过 subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在 KafkaConsumer 中还提供了一个 assign() 方法来实现这些功能。

这个方法只接受一个参数 partitions,用来指定需要订阅的分区集合。

如果事先并不知道主题中有多少个分区,还可以通过 partitionsFor() 方法来查询指定主题的元数据信息。

元数据信息中的属性 topic 表示主题名称,partition 代表分区编号,leader 代表分区的 leader 副本所在的位置,replicas 代表分区的AR集合,inSyncReplicas 代表分区的 ISR 集合,offineReplicas 代表分区的 OSR 集合。通过partitionFor()方法的协助,可以通过 assign()方法来实现订阅主题(全部分区)的功能。

如果需要取消订阅,可以通过 subscribe(Collection)方式实现的订阅,也可以取消通过 subscribe(Pattern) 方式实现的订阅,还可以取消通过 assign(Collection) 方式实现的订阅。

如果将 subscribe(Collection)assign(Collection) 中的集合参数设置为空集合,那么作用等同于 unsubscribe() 方法。

如果没有订阅任何主题或分区,那么再继续执行消费程序的时候会报异常。

集合订阅的方式 subscribe(Collection)、正则表达式订阅的方式 subscribe(Pattern) 和指定分区的订阅方式 assign(Collection) 分表代表了三种不同的订阅状态:AUTO_TOPICSAUTO_PATTERNUSER_ASSIGNED(如果没有订阅,那么订阅状态为 NONE)。然而这三种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出异常。

通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的。

消息消费

Kafka 中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅的主题(分区)上的一组消息。

对于 poll() 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。

poll() 方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。timeout 的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将 timeout 设置为 0,这样 poll() 方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值 Long.MAX_VALUE

消费者消费到的每条消息的类型为 ConsumerRecord,它具有以下字段信息:

  • topicpartition 这两个字段分别代表消息所属主题的名称和所在分区的编号。
  • offset 表示消息在所属分区的偏移量。
  • timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。timestampType 有两种类型: CreateTimeLogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳。
  • headers 表示消息的头部内容。
  • keyvalue 分别表示消息的键和消息的值,一般业务应用要读取的就是 value。
  • checksum 是 CRC32 的校验值。

poll() 方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干 ConsumerRecord

ConsumerRecords 类提供了一个 records(TopicPartition) 方法来获取消息集中指定分区的消息。

到目前为止,可以简单地认为 poll() 方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容。

位移提交

对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

在每次调用 poll() 方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题 __consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

如下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么就可以说消费者的消费位移为 x,图中也用了 lastConsumedOffset 这个单词来标识它。

kafka 位移提交-1

不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+1,对应于图中的 position,它表示下一条需要拉取的消息的位置。在消费者中还有一个 committed offset 的概念,它表示已经提交过的消费位移。

KafkaConsumer 类提供了 position(TopicPartition)committed(TopicPartition) 两个方法来分别获取上面所说的 positioncommitted offset 的值。

对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。如下图所示,当前一次 poll() 操作所拉取的消息集为 [x+2,x+7]x+2 代表上一次提交的消费位移,说明已经完成了 x+1 及之前的所有消息的消费,x+5 表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了 x+8,那么当前消费 x+5 的时候遇到了异常,在故障恢复之后,重新拉取的消息是从 x+8 开始的。也就是说,x+5x+7 之间的消息并未能被消费,如此便发生了消息丢失的现象。

kafka 位移提交-2

再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费 x+5 的时候遇到了异常,在故障恢复之后,重新拉取的消息是从 x+2 开始的。也就是说,x+2x+4 之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

而实际情况还会有比这两种更加复杂的情形,比如第一次的位移提交的位置为 x+8,而下一次的位移提交的位置为 x+4

自动提交

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为5秒,此参数生效的前提是 enable.auto.commit 参数为true。

在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移

在 Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?看一下下图中的情形。

kafka 自动提交

拉取线程A不断地拉取消息并存入本地缓存,另一个处理线程 B 从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第 y+1 次拉取,以及第 m 次位移提交的时候,也就是 x+6 之前的位移已经确认提交了,处理线程 B 却还正在消费 x+3 的消息。此时如果处理线程 B 发生了异常,待其恢复之后会从第 m 此位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为false。

手动提交

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()commitAsync() 两种类型的方法。

同步提交

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交(注意提交的值对应于前面图中 position 的位置),只要没有发生不可恢复的错误(Unrecoverable Error),它就会阻塞消费者线程直至位移提交完成。

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync() 的另一个含参方法,该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync() 方法只能提交当前批次对应的 position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式。

异步提交

commitSync() 方法相反,异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。

commitAsync 提供了一个可选的 callback 参数,用来提供一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete() 方法。

commitAsync()提交的时候同样会有失败的情况发生,那么应该怎么处理呢?有可能想到的是重试,问题的关键也就在这里了。如果某一次异步提交的消费位移为 x,但是提交失败了,然后下一次又异步提交了消费位移为 x+y,这次成功了。如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了 x。如果此时发生异常(或者再均衡),那么恢复之后的消费者(或者新的消费者)就会从 x 处开始消费消息,这样就发生了重复消费的问题。

为此可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。

如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关

控制和关闭消费

KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer 中使用 pause()resume() 方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。另外,还提供了一个无参的 paused() 方法来返回被暂停的分区集合。

指定位移消费

一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当 __consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。参考下图,按照默认的配置,消费者会从 9 开始进行消费(9 是下一条要写入消息的位置),更加确切地说是从 9 开始拉取消息。如果将 auto.offset.reset 参数配置为 “earliest”,那么消费者会从起始处,也就是 0 开始消费。

kafka 指定位移提交

除了查找不到消费位移,位移越界也会触发 auto.offset.reset 参数的执行。

auto.offset.reset 参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出异常。

到目前为止,可以知道消息的拉取是根据 poll() 方法中的逻辑来处理的,这个 poll() 方法中的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的 auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,需要一种更细粒度的掌控,可以从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们得以追前消费或回溯消费。

seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次 poll() 方法,等到分配到分区之后才可以重置消费位置。在调用 poll() 方法后,可以通过 KafkaConsumer 的 assignment() 方法来判定是否分配到了相应的分区。

如果对未分配到的分区执行 seek() 方法,那么会报出异常。

KafkaConsumer 中还提供了 seekToBeginning() 方法和 seekToEnd() 方法来实现从分区的开头或末尾开始消费。

有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如想要消费昨天 8 点之后的消息,这个需求更符合正常的思维逻辑。此时无法直接使用 seek() 方法来追溯到相应的位置。KafkaConsumer 同样考虑到了这种情况,它提供了一个 offsetsForTimes() 方法,通过 timestamp 来查询与此对应的分区位置。

前面提到了 Kafka 中的消费位移是存储在一个内部主题中的,而 seek() 方法可以突破这一限制:消费位移可以保存在任意的存储介质中,例如数据库、文件系统等。以数据库为例,将消费位移保存在其中的一个表中,在下次消费的时候可以读取存储在数据表中的消费位移并通过 seek() 方法指向这个具体的位置。

seek() 方法提供了从特定位置读取消息的能力,可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek() 方法也为提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。

在消费者订阅的时候,提供了再均衡监听器,用来设定发生再均衡动作前后的一些准备或收尾的动作。onPartitionsRevoked 这个方法会在再均衡开始之前和消费者停止读取消息之后被调用,可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。onPartitionsAssigned 这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。

多线程实现

KafkaProducer 是线程安全的,然而 KafkaConsumer 却是非线程安全的。KafkaConsumer 中定义了一个 acquire() 方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出异常。

KafkaConsumer 中的每个公用方法在执行所要执行的动作之前都会调用这个 acquire() 方法,只有 wakeup() 方法是个例外。

acquire() 方法和通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待,可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release() 方法成对出现,表示相应的加锁和解锁操作。

cquire() 方法和 release() 方法都是私有方法,因此在实际应用中不需要显式地调用,但了解其内部的机理之后可以促使正确、有效地编写相应的程序逻辑。

KafkaConsumer 非线程安全并不意味着在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于 Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。

可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。

多线程的实现方式有多种,

  • 第一种也是最常见的方式:线程封闭,即为每个线程实例化一个 KafkaConsumer 对象。

kafka消费者多线程实现-1

一个线程对应一个 KafkaConsumer 实例,可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。

  • 多线程消费同一分区

与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()seek() 等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,也并不推荐。一般而言,分区是消费线程的最小划分单位。

上面所说的第一种这种多线程的实现方式和开启多个消费进程的方式没有本质上的区别,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的 TCP 连接,如果分区数和 consumerThreadNum 的值都很大,那么会造成不小的系统开销。

  • 将处理消息模块改成多线程的实现方式

当每次拉去到消息后,如果这里对消息的处理非常迅速,那么 poll() 拉取的频次也会更高,进而整体消费的性能也会提升;相反,如果对消息的处理缓慢,比如进行一个事务性操作,或者等待一个RPC的同步响应,那么 poll() 拉取的频次也会随之下降,进而造成整体消费性能的下降。一般而言,poll() 拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,如果通过一定的方式来改进这一部分,那么就能带动整体消费性能的提升。因此有了第三种实现方式。

kafka消费者多线程实现-2

第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了,在具体实现的时候还需要考虑位移提交的情况。对于第一种实现方式而言,如果要做具体的位移提交,直接实现即可。而对于第三种实现方式,这里引入一个共享变量 offsets 来参与提交。

kafka消费者多线程实现-3

每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量 offsets 中,KafkaConsumerThread 在每一次 poll() 方法之后都读取 offsets 中的内容并对其进行位移提交。注意在实现的过程中对 offsets 读写需要加锁处理,防止出现并发问题。

但是在写入 offsets 的时候需要注意位移覆盖的问题。对于同一个分区中的消息,假设一个处理线程RecordHandler1 正在处理 offset 为 0~99 的消息,而另一个处理线程 RecordHandler2 已经处理完了 offset 为100~199 的消息并进行了位移提交,此时如果 RecordHandler1 发生异常,则之后的消费只能从 200 开始而无法再次消费 0~99 的消息,从而造成了消息丢失的现象。对此就要引入更加复杂的处理机制,这里提供一种解决思路,参考下图:

kafka消费者多线程实现-4

总体结构上是基于滑动窗口实现的。对于第三种实现方式而言,它所呈现的结构是通过消费者拉取分批次的消息,然后提交给多线程进行处理,而这里的滑动窗口式的实现方式是将拉取到的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小,总体上而言没有太多的变化,不同的是对于消费位移的把控。每一个方格代表一个批次的消息,一个滑动窗口包含若干方格,startOffset 标注的是当前滑动窗口的起始位置,endOffset 标注的是末尾位置。每当 startOffset 指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来 startOffset 所指方格中对应的消息,并且拉取新的消息进入窗口。滑动窗口的大小固定,所对应的用来暂存消息的缓存大小也就固定了,这部分内存开销可控。方格大小和滑动窗口的大小同时决定了消费线程的并发数:一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高;对于方格大小固定的情况,窗口越大并行度越高。不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常(比如Crash)的情况下也会引起大量的重复消费,同时还考虑线程切换的开销,建议根据实际情况设置一个合理的值,不管是对于方格还是窗口而言,过大或过小都不合适。

如果一个方格内的消息无法被标记为消费完成,那么就会造成 startOffset 的悬停。为了使窗口能够继续向前滑动,那么就需要设定一个阈值,当 startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列。真实应用中无法消费的情况极少,一般是由业务代码的处理逻辑引起的,比如消息中的内容格式与业务处理的内容格式不符,无法对这条消息进行决断,这种情况可以通过优化代码逻辑或采取丢弃策略来避免。如果需要消息高度可靠,也可以将无法进行业务逻辑的消息(这类消息可以称为死信)存入磁盘、数据库或 Kafka,然后继续消费下一条消息以保证整体消费进度合理推进,之后可以通过一个额外的处理任务来分析死信进而找出异常的原因。

其他相关参数

除了上面提到的客户端相关参数,还有一些可选的用于性能调优和故障排除的参数。

fetch.min.bytes

该参数用来配置 Consumer 在一次拉取请求(调用 poll()方法)中能从 Kafka 中拉取的最小数据量,默认值为 1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

fetch.max.bytes

该参数与 fetch.max.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kafka 中拉取的最大数据量,默认值为 52428800(B),也就是 50 MB。

如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数设定的值是一次拉取请求中所能拉取的最大数据量,那么显然 1B<10B,所以无法拉取。这个观点是错误的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。与此相关的,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端参数 max.message.bytes)来设置。

fetch.max.wait.ms

这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms。这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为 1048576(B),即 1 MB。这个参数与 fetch.max.bytes 参数相似,只不过 max.partition.fetch.bytes 用来限制一次拉取中每个分区的消息大小,而 fetch.max.bytes 用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

max.poll.records

这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为 500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是 540000(ms),即 9 分钟。

exclude.internal.topics

Kafka中有两个内部的主题:__consumer_offsets__transaction_stateexclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 65536(B),即 64 KB。如果设置为 -1,则使用操作系统的默认值。如果 Consumer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。

send.buffer.bytes

这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072(B),即 128 KB。与receive.buffer.bytes 参数一样,如果设置为 -1,则使用操作系统的默认值。

request.timeout.ms

这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000(ms)。

metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为300000(ms),即 5 分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。

isolation.level

这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到 HW(High Watermark)处的位置。

分区

主题和分区是Kafka 的两个核心概念,主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。分区的划分不仅为 Kafka 提供了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提供数据冗余以提高数据可靠性。从 Kafka 的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

如果 broker 端配置参数 auto.create.topics.enable 设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions (默认值为1)、副本因子为 default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数 num.partitionsdefault.replication.factor 的值来创建一个相应的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将 auto.create.topics.enable 参数设置为 true,这个参数会增加主题的管理与维护的难度。

更加推荐也更加通用的方式是通过 kafka-topics.sh 脚本来创建主题。

在执行完脚本之后,Kafka会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认情况下这个目录为 /tmp/kafka-logs/

主题、分区、副本和 Log(日志)的关系如下图所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是 Log 层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个 broker 是最优的选择。再比如在分区数为 3、副本因子为 3,并且 broker 数同样为3的情况下,分配 3、3、3 的分区副本个数给各个 broker 是最优的选择,也就是每个 broker 中都拥有所有分区的一个副本。

kafka 分区-1

分区管理

优先副本的选举

分区使用多副本机制来提升可靠性,但只有 leader 副本对外提供读写服务,而 follower 副本只负责在内部进行消息的同步。如果一个分区的 leader 副本不可用,那么就意味着整个分区变得不可用,此时就需要 Kafka 从剩余的 follower 副本中挑选一个新的 leader 副本来继续对外提供服务。

虽然不够严谨,但从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个 broker 节点上,对应的 leader 副本的分配也比较均匀。

在创建主题的时候,该主题的分区及副本会尽可能均匀地分布到 Kafka 集群的各个broker节点上,对应的 leader 副本的分配也比较均匀。比如使用 kafka-topics.sh 脚本创建一个分区数为 3、副本因子为 3 的主题 topic-partitions,创建之后的分布信息如下:

kafka 分区-2

可以看到 leader 副本均匀分布在 brokerId 为 0、1、2 的 broker 节点之中。针对同一个分区而言,同一个 broker 节点中不可能出现它的多个副本,即 Kafka 集群的一个 broker 中最多只能有它的一个副本,可以将 leader 副本所在的 broker 节点叫作分区的 leader 节点,而 follower 副本所在的 broker 节点叫作分区的 follower 节点。随着时间的更替,Kafka 集群的 broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的 leader 节点发生故障时,其中一个 follower 节点就会成为新的 leader 节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。当原来的 leader 节点恢复之后重新加入集群时,它只能成为一个新的 follower 节点而不再对外提供服务。比如将 brokerId 为 2 的节点重启,那么主题 topic-partitions 新的分布信息如下:

kafka分区-3

可以看到原本分区 1 的 leader 节点为 2,现在变成了 0,如此一来原本均衡的负载变成了失衡:节点 0 的负载最高,而节点1的负载最低。为了能够有效地治理负载失衡的情况,Kafka 引入了优先副本(preferredreplica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。比如上面主题 topic-partitions 中分区 0 的AR集合列表(Replicas)为[1,2,0],那么分区 0 的优先副本即为 1。

理想情况下,优先副本就是该分区的 leader 副本,所以也可以称之为preferred leader。Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证了所有分区的 leader 均衡分布。如果 leader 分布过于集中,就会造成集群负载不均衡。所谓的优先副本的选举是指通过一定的方式促使优先副本选举为 leader 副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。

需要注意的是,分区平衡并不意味着 Kafka 集群的负载均衡,因为还要考虑集群中的分区分配是否均衡。更进一步,每个分区的 leader 副本的负载也是各不相同的,有些 leader 副本的负载很高,比如需要承载 TPS 为 30000 的负荷,而有些 leader 副本只需承载个位数的负荷。也就是说,就算集群中的分区分配均衡、leader 分配均衡,也并不能确保整个集群的负载就是均衡的,还需要其他一些硬性的指标来做进一步的衡量。

在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是 auto.leader.rebalance.enable,此参数的默认值为 true,即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则 Kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(broker 中的不平衡率=非优先副本的 leader 个数/分区总数)是否超过 leader.imbalance.per.broker.percentage 参数配置的比值,默认值为 10%,如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。执行周期由参数 leader.imbalance.check.interval.seconds 控制,默认值为 300 秒,即 5 分钟。

不过在生产环境中不建议将 auto.leader.rebalance.enable 设置为默认的true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控,如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。前面也分析过,分区及副本的均衡也不能完全确保集群整体的均衡,并且集群中一定程度上的不均衡也是可以忍受的,为防止出现关键时期“掉链子”的行为,建议还是将掌控权把控在自己的手中,可以针对此类相关的埋点指标设置相应的告警,在合适的时机执行合适的操作,而这个“合适的操作”就是指手动执行分区平衡。

手动执行分区平衡

Kafka中 kafka-perferred-replica-election.sh 脚本提供了对分区 leader 副本进行重新平衡的功能。优先副本的选举过程是一个安全的过程,Kafka 客户端可以自动感知分区 leader 副本的变更。脚本中还提供了 path-to-json-file 参数来小批量地对部分分区执行优先副本的选举操作。通过 path-to-json-file 参数来指定一个 JSON 文件,这个 JSON 文件里保存需要执行优先副本选举的分区清单。

在实际生产环境中,一般使用 path-to-json-file 参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file 参数的选举操作方式。同时,优先副本的选举操作也要注意避开业务高峰期,以免带来性能方面的负面影响。

日志存储

文件目录布局

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset)。

如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大, Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上, Log 和 LogSegnient 也不是纯粹物理意义上的概念, Log 在物理上只以文件夹的形式存储, 而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以 . txnindex 为后缀的事务索引文件)。下图描绘了主题、分区与副本之间,Log 和 LogSegment 的关系。

kafka日志存储结构

向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作, 在此之前所有的 LogSegment 都不能写入数据。为了方便描述, 将最后一个 LogSegment 称为 activeSegment,即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegment 满足一定的条件时,就需要创建新的 activeSegment,之后追加的消息将写入新的 activeSegment。 为了便于消息的检索, 每个 LogSegment 中的日志文件 (以 .log 为文件后缀)都有对应的两个索引文件:偏移量索引文件(以 .index 为文件后缀)和时间戳索引文件(以 . timeindex 为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的offset。 偏移量是一个 64 位的长整数, 日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用0填充。 比如第一个 LogSegment 的基准偏移量为 0, 对应的日志文件为 00000000000000000000.log。

注意每个 LogSegment 中不只包含 .log,.index,.timeindex 这 3 种文件,还可能包含 .deleted,.cleaned,.swap 等临时文件,以及可能的 .snapshot,.txnindex,leader-epoch-checkpoint 等文件。

日志格式的演变

Kafka 的消息格式经历了 3 个版本:v0 版本,v1 版本和 v2 版本。

V0 版本

Kafka 消息格式的第一个版本通常称为 v0 版本, 在 Kafka 0.10.0 之前都采用的这个消息格式。这里只讨论消息未压缩的情形。

kafka v0版本日志格式

如上图所示,左边的 RECORD 部分就是 v0 版本的消息格式, 大多数人会把图中左边的整体(即包括 offset 和 message size 字段)都看作消息,因为每个 RECORD(v0 和 v1 版)必定对应一个 offset 和 message size。每条消息都有一个 offset 用来标志它在分区中的偏移量, 这个 offset 是逻辑值, 而非实际物理偏移值, message size 表示消息的大小,这两者在一起被称为日志头部(LOG_OVERHEAD),固定为 12B。与消息对应的还有消息集的概念,消息集中包含一条或多条消息,消息集不仅是存储于磁盘及在网络上传输(Produce & Fetch)的基本形式,而且是 Kafka 中压缩的基本单元(上图右边)。

消息格式中的各个字段解释如下:

  • crc32 (4B) : crc32 校验值,校验范围为 magic 至 value 之间。

  • magic (1B) : 消息格式版本号,此版本的 magic 值为 0。

  • attributes(1B) :消息的属性,总共占 1 个字节,低 3 位表示压缩类型:0 表示 NONE,1 表示 GZIP,2 表示 SNAPPY,3 表示 LZ4(LZ4自Kafka 0.9.x引入), 其余位保留。

  • key length(4B) :表示消息的 key 的长度。 如果为-1, 则表示没有设置 key, 即 key= null。

  • key: 可选, 如果没有 key 则无此字段。

  • value length (4B) : 实际消息体的长度。 如果为-1, 则表示消息为空。

  • value: 消息体。可以为空,比如墓碑(tombstone)消息 。

v0 版本中一个消息的最小长度(RECORD_OVERHEAD_ V0)为 crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B。 也就是说, v0 版本中一条消息的最小长度度为14B, 如果小于这个值, 那么这就是一条破损的消息而不被接收。

V1 版本

v1 版本比 v0 版本多了一个 timestamp 字段,表示消息的时间戳信息:

kafka v1版本日志格式

v1 版本的 magic 字段的值为1。 v1 版本的 attributes 字段中的低 3 位和 v0 版本的一样,还是表示压缩类型。而第4位(bit)也被利用了起来:0 表示 timestamp 类型为 CreateTime,而 1 表示 timestamp 类型为 LogAppendTime,其他位保留。 timestamp 类型由 broker 端参数 log.message.timestamp.type 来配置, 默认值为 CreateTime,即采用生产者创建消息时的时间戳。如果在创建 ProducerRecord 时没有显式指定消息的时间戳,那么 KafkaProducer 也会在发送这条消息前自动添加上。

v1 版本的消息的最小长度(RECORD_OVERHEAD_V1)要比 v0 版本的大 8 个字节,即 22B。如果像 v0 版本一样发送一条 key = “key”、value= “value” 的消息,那么此条消息在 v1 版本中会占用42B。

消息压缩

常见的压缩算法是数据量越大压缩效果越好,一条消息通常不会太大,这就导致压缩效果并不是太好。而 Kafka 实现的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果。

在一般情况下,生产者发送的压缩数据在 broker 中也是保待压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保待了端到端的压缩。

Kafka 日志中使用哪种压缩方式是通过参数 compression.type 来配置的,默认值为 “producer”,表示保留生产者使用的压缩方式。这个参数还可以配置为”gzip” “snappy” “lz4”, 分别对应 GZIP、 SNAPPY、 LZ4 这 3 种压缩算法。如果参数 compression.type 配置为 “uncompressed” , 则表示不压缩。

以上都是针对消息未压缩的情况, 而当消息压缩时是将整个消息集进行压缩作为内层消息 (inner message) , 内层消息整体作为外层 (wrapper message) 的 value, 其结构如下图:

kafka v1版本日志格式-消息压缩-1

其实每个从生产者发出的消息集中的消息 offset 都是从 0 开始的,这个 offset 不能直接存储在日志文件中,对 offset 的转换是在服务端进行的,客户端不需要做这个工作。

外层消息保存了内层消息中最后一条消息的绝对位移(absolute offset) , 绝对位移是相对于整个分区而言的:

kafka v1版本日志格式-消息压缩-2

对于未压缩的情形, 图右内层消息中最后一条的 offset 理应是 1030, 但被压缩之后就变成了 5, 而这个 1030 被赋予给了外层的 offset 。当消费者消费这个消息集的时候,首先解压缩整个消息集,然后找到内层消息中最后一条消息的 inner offset,再找到内层消息中最后一条消息前面的消息的 absolute offset。

V2 版本

v2 版本中消息集称为Record Batch,而不是先前的Message Set,其内部也包含了一条或多条消息,消息的格式如下:

kafka v2版本日志格式

在消息压缩的情形下,Record Batch Header部分(从 first offset 到 records count 字段)是不被压缩的,而被压缩的是 records 字段中的所有内容。生产者客户端中的 ProducerBatch 对应这里的 RecordBatch,而 ProducerRecord 对应这里的Record。

观察消息格式 Record 的关键字段,可以看到内部字段大量采用了 Varints,这样 Kafka 可以根据具体的值来确定需要几个字节来保存。v2 版本的消息格式去掉了 crc 字段,另外增加了 length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和 headers 信息,并且 attributes 字段被弃用了。

  • length:消息总长度。

  • attributes:弃用,但还是在消息格式中占据1B的大小,以备未来的格式扩展。

  • timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里一样保存与 RecordBatch 的起始时间戳的差值,则可以进一步节省占用的字节数。

  • offset delta:位移增量。保存与 RecordBatch 起始位移的差值,可以节省占用的字节数。

  • headers:这个字段用来支持应用级别的扩展,而不需要像 v0 和 v1 版本一样不得不将一些应用级别的属性值嵌入消息体。

  • first offset:表示当前 RecordBatch 的起始位移。

  • length:计算从 partition leader epoch 字段开始到末尾的长度。

  • partition leader epoch:分区 leader 纪元,可以看作分区 leader 的版本号或更新次数。

  • magic:消息格式的版本号,对 v2 版本而言,magic 等于2。

  • attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考 v0 和 v1;第4位表示时间戳类型;第5位表示此 RecordBatch 是否处于事务中,0 表示非事务,1 表示事务。第 6 位表示是否是控制消息(ControlBatch),0表示非控制消息,而1表示是控制消息,控制消息用来支持事务功能。

  • last offset delta:RecordBatch 中最后一个 Record 的 offset 与 first offset 的差值。主要被 broker 用来确保 RecordBatch中Record 组装的正确性。

  • first timestamp:RecordBatch 中第一条 Record 的时间戳。

变长字段

Kafka 从 0.11.0 版本开始所使用的消息格式版本为v2,这个版本的消息相比v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。

Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其占用的字节数就越少。Varints中的每个字节都有一个位于最高位的msb位(most significantbit),除最后一个字节外,其余msb位都设置为1,最后一个字节的msb位为0。这个msb位表示其后的字节是否和当前字节一起来表示同一个整数。除msb位外,剩余的7位用于存储数据本身,这种表示类型又称为Base 128。通常而言,一个字节8位可以表示256个值,所以称为Base 256,而这里只能用7位表示,2的7次方即128。Varints中采用的是小端字节序,即最小的字节放在最前面。举个例子,比如数字1,它只占一个字节,所以msb位为0:

0000 0001

再举一个复杂点的例子,比如数字300:

1010 1100 0000 0010

怎么知道这是300呢? 首先将每个字节的 msb 去掉, 这个仅仅是告诉我们是否已经读到数字的结尾(可以看到, 第一个字节被设置了,因为在 varint 中不止一个字节):

// 去掉最高位的 1 bit
1010 1100 0000 0010
→ 010 1100  000 0010

把两个 7bit 的组翻转过来, 然后可以将他们连接起来,去掉前面的0就得到最后的值:

000 0010  010 1100
→  000 0010 ++ 010 1100
→  100101100
→  256 + 32 + 8 + 4 = 300

Varints 可以用来表示 int32、int64、uint32、uint64、sint32、sint64、bool、enum 等类型。在实际使用过程中,如果当前字段可以表示为负数,那么对int32/int64和sint32/sint64而言,它们在进行编码时存在较大的区别。比如使用int64表示一个负数,那么哪怕是-1,其编码后的长度始终为10个字节(可以通过下面的代码来测试长度),就如同对待一个很大的无符号长整型数一样。为了使编码更加高效,Varints使用了ZigZag的编码方式

ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭正负整数,将带符号整数映射为无符号整数,这样可以使绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3。

原始有符号整型 编码结果
0 0
-1 1
1 2
2 3
2147483647 4294967294
-2147483648 4294967295

换句话说, 对于 sint32, 每个值 n 被编码为:

(n << 1) ^ (n >> 31)

或者 64 位版本:

(n << 1) ^ (n >> 63)

注意第二个移动 (n >> 31) 部分,是一个算数位移运算。 因此,移动的结果要么是0(如果n是正数) 要么是1(如果n是负数)。

以-1为例, 其二进制表现形式为 1111 1111 1111 1111 11111111 1111 1111 (补码)。

 (n<<1) = 1111 1111 1111 1111 1111 1111 1111 1110 
 (n>>31) = 1111 1111 1111 1111 1111 1111 1111 1111 
 (n << 1) ^ (n >>31) = 1

最终-1的Varints编码为0000 0001,这样原本用4个字节表示的-1现在可以用1个字节来表示了。

1 就显得非常简单了,其二进制表现形式为0000 0000 0000 00000000 0000 0000 0001。

(n<<1) = 0000 0000 0000 0000 0000 0000 0000 0010
(n>>31) = 0000 0000 0000 0000 0000 0000 0000 0000
(n << 1) A (n>>31) = 2 

最终1的Varints编码为0000 0010,也只占用1个字节。前面说过Varints中的一个字节中只有7位是有效数值位,即只能表示128个数值,转变成绝对值之后其实质上只能表示64个数值。比如对消息体长度而言,其值肯定是大于等于0的正整数,那么一个字节长度的Varints最大只能表示64。65的二进制数表示为:0100 0001

经过ZigZag处理后为: 1000 0010 ^ 0000 0000 = 1000 0010

每个字节的低7位是有效数值位, 所以1000 0010进一步转变为: 000 0001 000 0010

而Varints使用小端字节序, 所以需要翻转一下位置: 000 0010 000 0001

设置非最后一个字节的msb 位为1, 最后一个字节的msb 位为0, 最终有: 1000 0010 0000 0001

所以最终65表示为1000 0010 0000 0001, 而64却表示为0100 0000。

回顾Kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要1个字节。根据Varints的规则可以推导出0~63之间的数字占1个字节,64~8191之间的数字占2个字节,8192~1048575之间的数字占3个字节。而Kafka broker端配置 message.max.bytes 的默认大小为1000012 (Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码,那么绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。不过需要注意的是,Varints 并非一直会节省空间,一个 int32 最长会占用5 个字节(大于默认的4个字节),一个int64最长会占用10个字节(大于默认的8个字节)。

日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

当日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段(activeSegment)而言,索引文件还会追加更多的索引项,所以被设定为可读写。在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,索引文件的大小由broker端参数 log.index.size.max.bytes 配置。Kafka 在创建索引文件的时候会为其预分配 log.index.size.max.bytes 大小的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候,Kafka 才会把该索引文件裁剪到实际的数据大小。也就是说,与当前活跃的日志分段对应的索引文件的大小固定为 log.index.size.max.bytes,而其余日志分段对应的索引文件的大小为实际的占用空间。

偏移量索引

偏移量索引每个索引项占用8个字节,分为两个部分。

  • relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用4个字节,当前索引文件的文件名即为baseOffset的值。
  • position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。

偏移量索引的查找过程如下:

假设现在只有 00000000000000000000.index和 00000000000000000000.log:

kafka日志偏移量索引-1

如果要查找偏移量为 23 的消息,首先通过二分法在偏移量索引文件中找到不大于23的最大索引项,即[22,656],然后从日志分段文件中的物理位置656开始顺序查找偏移量为23的消息。

当日志分段文件不止一份的时候,则需要先定位具体的日志分段文件

如下图,假设要查找偏移量为268的消息,那么应该怎么办呢?首先肯定是定位到 baseOffset 为 251 的日志分段,然后计算相对偏移量relativeOffset=268-251=17,之后再在该日志分段对应的索引文件中找到不大于17的索引项,最后根据索引项中的position定位到具体的日志分段文件位置开始查找目标消息。

那么又是如何查找baseOffset 为251的日志分段的呢?这里并不是顺序查找,而是用了跳跃表的结构。Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

kafka日志偏移量索引-2

时间戳索引

时间戳索引项的格式如下图所示:

kafka日志时间戳索引

每个索引项占用12个字节,分为两个部分:

  • timestamp:当前日志分段最大的时间戳。

  • relativeOffset:时间戳所对应的消息的相对偏移量。

时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp,否则不予追加。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTime,那么消息的时间戳必定能够保持单调递增;相反,如果是 CreateTime 类型则无法保证。

生产者可以使用类似 ProducerRecord(String topic,Integer partition,Longtimestamp,K key,V value)的方法来指定时间戳的值。即使生产者客户端采用自动插入的时间戳也无法保证时间戳能够单调递增,如果两个不同时钟的生产者同时往一个分区中插入消息,那么也会造成当前分区的时间戳乱序。

与偏移量索引文件相似,时间戳索引文件大小必须是索引项大小(12B)的整数倍,如果不满足条件也会进行裁剪。同样假设 broker 端参数 log.index.size.max.bytes 配置为67,那么对应于时间戳索引文件,Kafka 在内部会将其转换为60。每当写入一定量的消息时,就会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。两个文件增加索引项的操作是同时进行的,但并不意味着偏移量索引中的 relativeOffset 和时间戳索引项中的 relativeOffset 是同一个值。

假设现在有以下日志分段文件:

kafka日志时间戳索引-2

如果要查找指定时间戳targetTimeStamp=1526384718288开始的消息,首先是找到不小于指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段了,需要分以下几个步骤来完成。

  1. 将targetTimeStamp和每个日志分段中的最大时间戳largestTimeStamp逐一对比,直到找到不小于 targetTimeStamp 的 largestTimeStamp 所对应的日志分段。日志分段中的largestTimeStamp的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。
  2. 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp的最大索引项,即[1526384718283,28],如此便找到了一个相对偏移量28。
  3. 在偏移量索引文件中使用二分算法查找到不大于28的最大索引项,即[26,838]。
  4. 从步骤1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp的消息。

磁盘存储

Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。对于各个存储介质的速度,大体同下图所示,层级越高代表速度越快:

各种存储介质

而实际上,磁盘的速度快慢完全取决于我们如何使用它。有关测试结果表明,一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性(顺序)写入速度可以达到600MB/s,而随机写入速度只有100KB/s,两者性能相差6000倍。操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快

磁盘/内存写入速度

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。这是让Kafka在性能上具备足够竞争力的一大因素,除此顺序写盘之外,kafka 还利用了操作系统的页缓存、零拷贝,进一步提高了性能

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异,现代操作系统越来越“激进地”将内存作为磁盘缓存,甚至会非常乐意将所有可用的内存用作磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式,否则页缓存很难被禁止。此外,使用 Java 开发的程序会有两个问题:

  1. 对象的内存开销非常大,通常会是真实数据大小的几倍甚至更多,空间使用率低下;

  2. Java的垃圾回收会随着堆内数据的增多而变得越来越慢。

基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,可以在 32GB 的机器上使用28GB至30GB的内存而不用担心GC所带来的性能问题。

此外,即使Kafka服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。

Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.messageslog.flush.interval.ms 等参数来控制。

同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

Linux 系统会使用磁盘的一部分作为swap分区,这样可以进行进程的调度:把当前非活跃的进程调入 swap 分区,以此把内存空出来让给活跃的进程。对大量使用系统页缓存的 Kafka而言,应当尽量避免这种内存的交换,否则会对它各方面的性能产生很大的负面影响。可以通过修改 vm.swappiness 参数(Linux系统参数)来进行调节。vm.swappiness 参数的上限为 100,它表示积极地使用 swap 分区,并把内存上的数据及时地搬运到 swap 分区中;vm.swappiness 参数的下限为0,表示在任何情况下都不要发生交换,这样一来,当内存耗尽时会根据一定的规则突然中止某些进程。建议将这个参数的值设置为 1,这样保留了 swap 的机制而又最大限度地限制了它对 Kafka 性能的影响。

磁盘 IO 流程

从编程角度而言,一般磁盘I/O的场景有以下四种:

  1. 用户调用标准C库进行I/O操作,数据流为:应用程序buffer→C库标准IObuffer→文件系统页缓存→通过具体文件系统到磁盘。

  2. 用户调用文件 I/O,数据流为:应用程序 buffer→文件系统页缓存→通过具体文件系统到磁盘。

  3. 用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘。

  4. 用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接写磁盘。

发起I/O请求的步骤可以表述为如下的内容(以最长链路为例)。

  • 写操作:用户调用fwrite把数据写入C库标准IObuffer后就返回,即写操作通常是异步操作;数据写入C库标准IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write 调用)页缓存;数据到达页缓存后也不会立即刷新到磁盘,内核有pdflush 线程在不停地检测脏页,判断是否要写回到磁盘,如果是则发起磁盘I/O请求。
  • 读操作:用户调用fread到C库标准IObuffer中读取数据,如果成功则返回,否则继续;到页缓存中读取数据,如果成功则返回,否则继续;发起 I/O 请求,读取数据后缓存buffer和C库标准IObuffer并返回。可以看出,读操作是同步请求。
  • I/O请求处理:通用块层根据I/O请求构造一个或多个bio结构并提交给调度层;调度器将 bio 结构进行排序和合并组织成队列且确保读写操作尽可能理想:将一个或多个进程的读操作合并到一起读,将一个或多个进程的写操作合并到一起写,尽可能变随机为顺序(因为随机读写比顺序读写要慢),读必须优先满足,而写也不能等太久。

磁盘IO流程如下:

磁盘IO流程

针对不同的应用场景,I/O调度策略也会影响I/O的读写性能,目前Linux系统中的I/O调度策略有4种,分别为 NOOPCFQDEADLINEANTICIPATORY,默认为 CFQ。

NOOP

NOOP 算法的全写为 No Operation。该算法实现了最简单的FIFO队列,所有I/O请求大致按照先来后到的顺序进行操作。之所以说“大致”,原因是NOOP在FIFO的基础上还做了相邻I/O请求的合并,并不是完全按照先进先出的规则满足I/O请求。

假设有如下的I/O请求序列:

100,500,101,10,56,1000

NOOP将会按照如下顺序满足I/O请求:

100(101),500,10,56,1000

CFQ

CFQ 算法的全写为Completely Fair Queuing。该算法的特点是按照I/O请求的地址进行排序,而不是按照先来后到的顺序进行响应。假设有如下的I/O请求序列:

100,500,101,10,56,1000

CFQ将会按照如下顺序满足:

100,101,500,1000,10,56

CFQ是默认的磁盘调度算法,对于通用服务器来说是最好的选择。它试图均匀地分布对/IO带宽的访问。CFQ为每个进程单独创建一个队列来管理该进程所产生的请求,也就是说,每个进程一个队列,各队列之间的调度使用时间片进行调度,以此来保证每个进程都能被很好地分配到I/O带宽。I/O调度器每次执行一个进程的4次请求。在传统的SAS盘上,磁盘寻道花去了绝大多数的I/O响应时间。CFQ的出发点是对I/O地址进行排序,以尽量少的磁盘旋转次数来满足尽可能多的I/O请求。在CFQ算法下,SAS盘的吞吐量大大提高了。相比于NOOP的缺点是,先来的I/O请求并不一定能被满足,可能会出现“饿死”的情况。

DEADLINE

DEADLINE在CFQ的基础上,解决了I/O请求“饿死”的极端情况。除了CFQ本身具有的I/O排序队列,DEADLINE额外分别为读I/O和写I/O提供了FIFO队列。读FIFO队列的最大等待时间为500ms,写FIFO队列的最大等待时间为5s。FIFO队列内的I/O请求优先级要比CFQ队列中的高,而读FIFO队列的优先级又比写FIFO队列的优先级高。优先级可以表示如下:

FIFO(Read) > FIFO(Write) > CFQ

ANTICIPATORY

CFQ和DEADLINE考虑的焦点在于满足零散I/O请求上。对于连续的I/O请求,比如顺序读,并没有做优化。为了满足随机I/O和顺序I/O混合的场景,Linux还支持ANTICIPATORY调度算法。

ANTICIPATORY在DEADLINE的基础上,为每个读I/O都设置了6ms的等待时间窗口。如果在6ms内OS收到了相邻位置的读I/O请求,就可以立即满足。ANTICIPATORY算法通过增加等待时间来获得更高的性能,假设一个块设备只有一个物理查找磁头(例如一个单独的SATA硬盘),将多个随机的小写入流合并成一个大写入流(相当于将随机读写变顺序读写),通过这个原理来使用读取/写入的延时换取最大的读取/写入吞吐量。适用于大多数环境,特别是读取/写入较多的环境。

不同的磁盘调度算法(以及相应的I/O优化手段)对Kafka 这类依赖磁盘运转的应用的影响很大,建议根据不同的业务需求来测试并选择合适的磁盘调度算法。

从文件系统层面分析,Kafka 操作的都是普通文件,并没有依赖于特定的文件系统,但是依然推荐使用EXT4或XFS。尤其是对XFS而言,它通常有更好的性能,这种性能的提升主要影响的是Kafka的写入性能。

零拷贝

除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于 Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。

服务端

时间轮

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。

JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为 O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

kafka时间轮算法-1

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

若时间轮的tickMs为1ms且wheelSize等于20,那么可以计算得出总体时间跨度interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimeTaskList中的任务进行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中,currentTime再过8ms后会指向时间格10。如果插入8ms任务的同时,还有一个定时为19ms的任务插进来,新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间。

如果此时有一个定时为350ms的任务该如何处理?直接扩充wheelSize的大小?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

如下图,第一层的时间轮tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。

kafka时间轮算法-2

对于350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的TimerTaskList。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的TimerTaskList。注意到在到期时间为[400ms,800ms)区间内的多个任务(比如446ms、455ms和473ms的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的TimerTaskList的超时时间为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。

这里就有一个时间轮降级的操作,会将这个剩余时间为50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历40ms之后,此时这个任务又被“察觉”,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。

设计源于生活。常见的钟表就是一种具有三层结构的时间轮,第一层时间轮tickMs=1ms、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。

在 Kafka 中,第一层时间轮的参数同上面的案例一样:tickMs=1ms、wheelSize=20、interval=20ms,各个层级的wheelSize也固定为20,所以各个层级的tickMs和interval也可以相应地推算出来。

Kafka在具体实现时间轮TimingWheel时还有一些小细节:

  • TimingWheel 在创建的时候以当前系统时间为第一层时间轮的起始时间(startMs),这里的当前系统时间并没有简单地调用 System.currentTimeMillis(),而是调用了Time.SYSTEM.hiResClockMs,这是因为 currentTimeMillis()方法的时间精度依赖于操作系统的具体实现,有些操作系统下并不能达到毫秒级的精度,而Time.SYSTEM.hiResClockMs实质上采用了System.nanoTime()/1_000_000来将精度调整到毫秒。
  • TimingWheel中的每个双向环形链表TimerTaskList都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点,那么线性表的第一个元素应该是链表的第二个节点。
  • 除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的currentTime。每一层的currentTime都必须是tickMs的整数倍,如果不满足则会将currentTime修剪为tickMs的整数倍,以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime=startMs-(startMs%tickMs)。currentTime会随着时间推移而推进,但不会改变为tickMs的整数倍的既定事实。若某一时刻的时间为timeMs,那么此时时间轮的currentTime=timeMs-(timeMs%tickMs),时间每推进一次,每个层级的时间轮的currentTime都会依据此公式执行推进。
  • Kafka 中的定时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用,以此层级调用可以实现定时器间接持有各个层级时间轮的引用。

Kafka中的定时器借了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都加入DelayQueue,“每个用到的TimerTaskList”特指非哨兵节点的定时任务项TimerTaskEntry对应的TimerTaskList。DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头。Kafka中会有一个线程来获取DelayQueue 中到期的任务列表,有意思的是这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。当“收割机”线程获取 DelayQueue中超时的任务列表 TimerTaskList之后,既可以根据 TimerTaskList 的 expiration来推进时间轮的时间,也可以就获取的TimerTaskList执行相应的操作,对里面的TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

前面说过,DelayQueue不适合Kafka这种高性能要求的定时任务,为何这里还要引入DelayQueue呢?注意对定时任务项TimerTaskEntry的插入和删除操作而言,TimingWheel时间复杂度为O(1),性能高出DelayQueue很多,如果直接将TimerTaskEntry插入DelayQueue,那么性能显然难以支撑。就算根据一定的规则将若干TimerTaskEntry划分到TimerTaskList这个组中,然后将TimerTaskList插入DelayQueue,如果在TimerTaskList中又要多添加一个TimerTaskEntry时该如何处理呢?对DelayQueue而言,这类操作显然变得力不从心。

但是,Kafka 中的 TimingWheel 专门用来执行插入和删除TimerTaskEntry的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,两者相辅相成

客户端

消息传输保障

一般而言,消息中间件的消息传输保障有3个层级,分别如下:

  • at most once:至多一次。消息可能会丢失,但绝对不会重复传输。

  • at least once:最少一次。消息绝不会丢失,但可能会重复传输。

  • exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

Kafka 的消息传输保障机制非常直观。当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。如果生产者发送消息到 Kafka之后,遇到了网络问题而造成通信中断,那么生产者就无法判断该消息是否已经提交。虽然Kafka无法确定网络故障期间发生了什么,但生产者可以进行多次重试来确保消息已经写入 Kafka,这个重试的过程中有可能会造成消息的重复写入,所以对生产者来说, Kafka 提供的消息传输保障为 at least once

对消费者而言,消费者处理消息和提交消费位移的顺序在很大程度上决定了消费者提供哪一种消息传输保障。如果消费者在拉取完消息之后,应用逻辑先处理消息后提交消费位移,那么在消息处理之后且在位移提交之前消费者宕机了,待它重新上线之后,会从上一次位移提交的位置拉取,这样就出现了重复消费,因为有部分消息已经处理过了只是还没来得及提交消费位移,此时就对应at least once。如果消费者在拉完消息之后,应用逻辑先提交消费位移后进行消息处理,那么在位移提交之后且在消息处理完成之前消费者宕机了,待它重新上线之后,会从已经提交的位移处开始重新消费,但之前尚有部分消息未进行消费,如此就会发生消息丢失,此时就对应at most once。

Kafka 从 0.11.0.0 版本开始引入了幂等和事务这两个特性,以此来实现 EOS(exactly once semantics,精确一次处理语义)。

幂等

所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。

生产者在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幂等性功能之后就可以避免这种情况。开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数 enable.idempotence 设置为 true 即可(这个参数的默认值为false)。

不过如果要确保幂等性功能正常,还需要确保生产者客户端的 retriesacksmax.in.flight.requests.per.connection 这几个参数不被配置错。实际上在使用幂等性功能的时候,用户完全可以不用配置(也不建议配置)这几个参数。

  • 如果用户显式地指定了 retries 参数,那么这个参数的值必须大于 0,否则会报出ConfigException;

  • 如果用户没有显式地指定 retries 参数,那么 KafkaProducer 会将它置为Integer.MAX_VALUE。同时还需要保证 max.in.flight.requests.per.connection 参数的值不能大于5(这个参数的值默认为5),否则也会报出ConfigException;

  • 如果用户还显式地指定了 acks 参数,那么还需要保证这个参数的值为-1(all),如果不为-1(这个参数的值默认为1),那么也会报出ConfigException。如果用户没有显式地指定这个参数,那么KafkaProducer会将它置为-1。

开启幂等性功能之后,生产者就可以如同未开启幂等时一样发送消息了。为了实现生产者的幂等性,Kafka为此引入了producer id(以下简称PID)和序列号(sequence number)这两个概念,分别对应 v2 版的日志格式中RecordBatch的producer id和first seqence这两个字段。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将<PID,分区>对应的序列号的值加1

broker端会在内存中为每一对<PID,分区>维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_new<SN_old+1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new>SN_old+1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction()等方法的调用都会抛出IllegalStateException的异常。

引入序列号来实现幂等也只是针对每一对<PID,分区>而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。但是,发送了两条相同的消息,不过这仅仅是指消息内容相同,但对Kafka 而言是两条不同的消息,因为会为这两条消息分配不同的序列号。Kafka 并不会保证消息内容的幂等。

事务

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

对流式应用(Stream Processing Applications)而言,一个典型的应用模式为“consume-transform-produce”。在这种模式下消费和生产并存:应用程序从某个主题中消费消息,然后经过一系列转换后写入另一个主题,消费者可能在提交消费位移的过程中出现问题而导致重复消费,也有可能生产者重复生产消息。

Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

为了实现事务,应用程序必须提供唯一的 transactionalId,这个 transactionalId通过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将 enable.idempotence 设置为 true (如果未显式设置,则KafkaProducer默认会将它的值设置为true),如果用户显式地将menable.idempotence设置为false,则会报出ConfigException。

transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。另外,为了保证新的生产者启动后,具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。如果使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报出错误。

producer epoch 对应 v2 版的日志格式中RecordBatch的producer epoch字段。从生产者的角度分析,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactionalId 的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。

而从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:

  • 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)。
  • 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失。
  • 消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
  • 消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息。

在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行 commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了 abortTransaction()方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。

日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT和ABORT,分别用来表征事务已经成功提交或已经被成功中止。KafkaConsumer 可以通过这个控制消息来判断对应的事务是被提交了还是被中止了,然后结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端应用,如下图所示:

kafka事务

为了实现事务的功能,Kafka还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的TransactionCoordinator,所有的事务逻辑包括分派PID 等都是由 TransactionCoordinator 来负责实施的。TransactionCoordinator会将事务状态持久化到内部主题__transaction_state 中。

可靠性实现

副本机制

Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。

这里简要地提一下相关的概念:

  • 副本是相对于分区而言的,即副本是特定分区的副本。

  • 一个分区中包含一个或多个副本,其中一个为leader副本,其余为follower副本,各个副本位于不同的broker节点中。只有leader副本对外提供服务,follower副本只负责数据同步。

  • 分区中的所有副本统称为 AR,而 ISR 是指与leader 副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的一员。

  • LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉取到HW之前的消息。

从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息

失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即under-replicated分区。

失效副本不仅是指处于功能失效状态的副本,处于同步失效状态的副本也可以看作失效副本。怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9.x版本开始就通过唯一的broker端参数 replica.lag.time.max.ms(默认10000)来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合,具体参考下图:

kafka多副本机制-1

当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。

Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。

不要错误地认为 follower 副本只要拉取 leader副本的数据就会更新lastCaughtUpTimeMs。当 leader 副本中消息的流入速度大于follower副本中拉取的速度时,就算 follower 副本一直不断地拉取 leader 副本的消息也不能与leader副本同步。如果还将此follower副本置于ISR集合中,那么当leader副本下线而选取此follower副本为新的leader副本时就会造成消息的严重丢失。

Kafka源码注释中说明了一般有两种情况会导致副本失效:

  • follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。

  • follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。

在这里再补充一点,如果通过工具增加了副本因子,那么新增加的副本在赶上leader副本之前也都是处于失效状态的。如果一个follower副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上leader副本之前也处于失效状态。

ISR 伸缩

ISR 缩减

Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为“isr-expiration”和“isr-change-propagation”。

isr-expiration 任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和replica.lag.time.max.ms参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/<topic>/partition/<parititon>/state 节点中。节点中的数据示例如下:

{"controller_epoch":26,"leader":0,"version":1,"leader_epoch":2,"isr":[0,1]}

其中controller_epoch表示当前Kafka控制器的epoch,leader表示当前分区的leader副本所在的broker的id编号,version表示版本号(当前版本固定为1),leader_epoch表示当前分区的leader纪元,isr表示变更后的ISR列表。

除此之外,当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation 任务会周期性(固定值为 2500ms)地检查 isrChangeSet,如果发现 isrChangeSet 中有ISR集合的变更记录,那么它会在ZooKeeper的 /isr_change_notification 路径下创建一个以 isr_change_ 开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000),并将isrChangeSet中的信息保存到这个节点中。

Kafka控制器为/isr_change_notification添加了一个Watcher,当这个节点中有子节点发生变化时会触发Watcher的动作,以此通知控制器更新相关元数据信息并向它管理的broker节点发送更新元数据的请求,最后删除/isr_change_notification路径下已经处理过的节点。

频繁地触发Watcher会影响Kafka控制器、ZooKeeper甚至其他broker节点的性能。为了避免这种情况,Kafka添加了限定条件,当检测到分区的ISR集合发生变化时,还需要检查以下两个条件:

  1. 上一次ISR集合发生变化距离现在已经超过5s
  2. 上一次写入ZooKeeper的时间距离现在已经超过60s

满足以上两个条件之一才可以将ISR集合的变化写入目标节点。

ISR 扩充

随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。

追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW,注意这里并不是和leader副本的LEO相比。ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/<topic>/partition/<parititon>/state节点和isrChangeSet,之后的步骤就和ISR收缩时的相同。当ISR集合发生增减时,或者ISR集合中任一副本的LEO发生变化时,都可能会影响整个分区的HW。

如下图所示:

kafka多副本机制-2

leader副本的LEO为9,follower1副本的LEO为7,而follower2副本的LEO为6,如果判定这3个副本都处于ISR集合中,那么这个分区的HW为6;如果follower2已经被判定为失效副本被剥离出ISR集合,那么此时分区的HW为leader副本和follower1副本中LEO的最小值,即为7。

LEO 和 HW

对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(RemoteReplica),本地副本是指对应的Log分配在当前的broker节点上,远程副本是指对应的Log分配在其他的broker节点上。在Kafka中,同一个分区的信息会存在多个broker节点上,并被其上的副本管理器所管理,这样在逻辑层面每个broker节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

如下图所示:

kafka多副本机制-3

某个分区有3个副本分别位于broker0、broker1和broker2节点中,其中带阴影的方框表示本地副本。假设broker0上的副本1为当前分区的leader副本,那么副本2和副本3就是follower副本,整个消息追加的过程可以概括如下:

  1. 生产者客户端发送消息至leader副本(副本1)中
  2. 消息被追加到leader副本的本地日志,并且会更新日志的偏移量。
  3. follower副本(副本2和副本3)向leader副本请求同步数据。
  4. leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本的信息。
  5. leader副本所在的服务器将拉取结果返回给follower副本。
  6. follower副本收到leader副本返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

再来分析在这个过程中各个副本LEO和HW的变化情况。

假设生产者一直在往leader副本(带阴影的方框)中写入消息,某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。

kafka多副本机制-4

之后follower副本(不带阴影的方框)向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequest请求中的fetch_offset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,如下图所示,这个HW信息对应的是FetchResponse中的high_watermark。

kafka多副本机制-5

此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。

与此同时,follower副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)=0)。接下来follower副本再次请求拉取leader副本中的消息,如下图:

kafka多副本机制-6

此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本,如下图:

kafka多副本机制-7

注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。

两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3(min(LEO,3)=3)。

在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。leader副本中带有其他 follower 副本的 LEO,那么它们是什么时候更新的呢?leader 副本收到 follower副本的FetchRequest请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。

Leader Epoch

在正常情况下的leader副本与follower副本之间的同步过程,如果leader副本发生切换,那么同步过程又该如何处理呢?在0.11.0.0版本之前,Kafka使用的是基于HW的同步机制,但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题。

首先来看一下数据丢失的问题,如下图所示:

kafka leadr epoch-1

Replica B是当前的leader副本(用L标记),Replica A是follower副本。在某一时刻,B中有2条消息m1和m2,A从B中同步了这两条消息,此时A和B的LEO都为2,同时HW都为1;之后A再向B中发送请求以拉取消息,FetchRequest请求中带上了A的LEO信息,B在收到请求之后更新了自己的HW为2;B中虽然没有更多的消息,但还是在延时一段时间之后返回FetchResponse,并在其中包含了HW信息;最后A根据FetchResponse中的HW信息更新自己的HW为2。

可以看到整个过程中两者之间的HW同步有一个间隙,在A写入消息m2之后(LEO更新为2)需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为2。如果在这个时候A宕机了,那么在A重启之后会根据之前HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会将m2这条消息删除,此时A只剩下m1这一条消息,之后A再向B发送FetchRequest请求拉取消息。

kafka leadr epoch-2

此时若 B 再宕机,那么 A 就会被选举为新的leader。B 恢复之后会成为follower,由于follower副本HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为1。这样一来m2这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。(两个轮流挂,有点离谱。。。。)

对于这种情况,也有一些解决方法,比如等待所有follower副本都更新完自身的HW之后再更新leader副本的HW,这样会增加多一轮的FetchRequest/FetchResponse延迟,自然不够妥当。还有一种方法就是follower副本恢复之后,在收到leader副本的FetchResponse前不要截断follower副本(follower副本恢复之后会做两件事情:截断自身和向leader发送FetchRequest请求),不过这样也避免不了数据不一致的问题。

如下图所示:

kafka leadr epoch-3

当前leader副本为A,follower副本为B,A中有2条消息m1和m2,并且HW和LEO都为2,B中有1条消息m1,并且HW和LEO都为1(此时还没收到 leader 的 hw 更新返回)。假设A和B同时“挂掉”,然后B第一个恢复过来并成为leader,如图所示:

kafka leadr epoch-4

之后B写入消息m3,并将LEO和HW更新至2(假设所有场景中的min.insync.replicas参数配置为1)。此时A也恢复过来了,根据前面数据丢失场景中的介绍可知它会被赋予follower的角色,并且需要根据HW截断日志及发送FetchRequest至B,不过此时A的HW正好也为2,那么就可以不做任何调整了,如下图所示:

kafka leadr epoch-5

如此一来A中保留了m2而B中没有,B中新增了m3而A也同步不到,这样A和B就出现了数据不一致的情形。

为了解决上述两种问题,Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leader epoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leaderepoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch=>StartOffset>,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leader epoch变更时,会将对应的矢量对追加到这个文件中,v2版本的消息格式时就提到了消息集中的partition leader epoch字段,而这个字段正对应这里讲述的leader epoch。

再来看一下引入 leader epoch 之后如何应付前面所说的数据丢失和数据不一致的场景。首先讲述应对数据丢失的问题,如下图所示:

kafka leadr epoch-6

同样 A 发生重启,之后 A 不是先忙着截断日志而是先发送OffsetsForLeaderEpochRequest请求给 B(OffsetsForLeaderEpochRequest 请求体结构如下图所示,其中包含 A 当前的LeaderEpoch值)

kafka leadr epoch-7

B作为目前的leader在收到请求之后会返回当前的LEO(LogEndOffset,注意图中LE0和LEO的不同),与请求对应的响应为OffsetsForLeaderEpochResponse,对应的响应体结构可以参考图下图:

kafka leadr epoch-8

如果A中的LeaderEpoch(假设为LE_A)和B中的不相同,那么B此时会查找LeaderEpoch为 LE_A+1 对应的 StartOffset 并返回给 A,也就是 LE_A 对应的LEO,所以可以将OffsetsForLeaderEpochRequest的请求看作用来查找follower副本当前LeaderEpoch的LEO。A在收到2之后发现和目前的LEO相同,也就不需要截断日志了。之后跟前面一样,B发生了宕机,A成为新的leader,那么对应的LE=0也变成了LE=1,对应的消息m2此时就得到了保留,这是原本所不能保留的。之后不管B有没有恢复,后续的消息都可以以LE1为LeaderEpoch陆续追加到A中。

再来看一下leader epoch如何应对数据不一致的场景。如下图所示,当前A为leader,B为follower,A中有2条消息m1和m2,而B中有1条消息m1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为新的leader。

kafka leadr epoch-9

kafka leadr epoch-10

之后B写入消息m3,并将LEO和HW更新至2。注意此时的LeaderEpoch已经从LE0增至LE1了。

紧接着A也恢复过来成为follower并向B发送OffsetsForLeaderEpochRequest请求,此时A的LeaderEpoch为LE0。B根据LE0查询到对应的offset为1并返回给A,A就截断日志并删除了消息m2。之后A发送FetchRequest至B请求来同步数据,最终A和B中都有两条消息m1和m3,HW和LEO都为2,并且LeaderEpoch都为LE1,如此便解决了数据不一致的问题。

为什么不支持读写分离

在Kafka中,生产者写入消息、消费者读取消息的操作都是与leader副本进行交互的,从而实现的是一种主写主读的生产消费模型。数据库、Redis等都具备主写主读的功能,与此同时还支持主写从读的功能,主写从读也就是读写分离,为了与主写主读对应,这里就以主写从读来称呼。Kafka并不支持主写从读,这是为什么呢?

从代码层面上来说,虽然增加了代码复杂度,但在Kafka中这种功能完全可以支持。

对于这个问题,可以从“收益点”这个角度来做具体分析。主写从读可以让从节点去分担主节点的负载压力,预防主节点负载过重而从节点却空闲的情况发生。但是主写从读也有2个很明显的缺点:

  1. 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中A数据的值都为X,之后将主节点中A的值修改为Y,那么在这个变更通知到从节点之前,应用读取从节点中的A数据的值并不为最新的Y,由此便产生了数据不一致的问题。
  2. 延时问题。类似Redis这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在Kafka中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

现实情况下,很多应用既可以忍受一定程度上的延时,也可以忍受一段时间内的数据不一致的情况,那么对于这种情况,Kafka是否有必要支持主写从读的功能呢?主读从写可以均摊一定的负载却不能做到完全的负载均衡,比如对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力,而绝大多数压力还是在主节点上。而在Kafka中却可以达到很大程度上的负载均衡,而且这种均衡是在主写主读的架构上实现的。看一下Kafka的生产消费模型,如图所示:

kafka 生产消费模型

在Kafka集群中有3个分区,每个分区有3个副本,正好均匀地分布在3个broker上,灰色阴影的代表leader副本,非灰色阴影的代表follower副本,虚线表示follower副本从leader副本上拉取消息。当生产者写入消息的时候都写入leader副本,对于上图中的情形,每个broker都有消息从生产者流入;当消费者读取消息的时候也是从leader副本中读取的,对于图中情形,每个broker都有消息流出到消费者。很明显地可以看出,每个broker上的读写负载都是一样的,这就说明Kafka可以通过主写主读实现主写从读实现不了的负载均衡。上图展示是一种理想的部署情况,有以下几种情况(包含但不仅限于)会造成一定程度上的负载不均衡:

  1. broker端的分区分配不均。当创建主题的时候可能会出现某些broker分配到的分区数多而其他broker分配到的分区数少,那么自然而然地分配到的leader副本也就不均。
  2. 生产者写入消息不均。生产者可能只对某些broker中的leader副本进行大量的写入操作,而对其他broker中的leader副本不闻不问。
  3. 消费者消费消息不均。消费者可能只对某些broker中的leader副本进行大量的拉取操作,而对其他broker中的leader副本不闻不问。
  4. leader副本的切换不均。在实际应用中可能会由于broker宕机而造成主从副本的切换,或者分区副本的重分配等,这些动作都有可能造成各个broker中leader副本的分配不均。

对此,可以做一些防范措施。针对第一种情况,在主题创建的时候尽可能使分区分配得均衡,好在Kafka 中相应的分配算法也是在极力地追求这一目标,如果是开发人员自定义的分配,则需要注意这方面的内容。对于第二和第三种情况,主写从读也无法解决。对于第四种情况,Kafka 提供了优先副本的选举来达到 leader副本的均衡,与此同时,也可以配合相应的监控、告警和运维平台来实现均衡的优化。

在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下Kafka都能做到很大程度上的负载均衡。总的来说,Kafka 只支持主写主读有几个优点:可以简化代码的实现逻辑,减少出错的可能;将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;没有延时的影响;在副本稳定的情况下,不会出现数据不一致的情况。为此,Kafka 又何必再去实现对它而言毫无收益的主写从读的功能呢?这一切都得益于 Kafka 优秀的架构设计,从某种意义上来说,主写从读是由于设计上的缺陷而形成的权宜之计。

日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个leader来负责处理数据写入的顺序性。只要leader还处于存活状态,那么follower只需按照leader中的写入顺序来进行同步即可。

通常情况下,只要leader不宕机我们就不需要关心follower的同步问题。不过当leader宕机时,我们就要从follower中选举出一个新的leader。follower的同步状态可能落后leader很多,甚至还可能处于宕机状态,所以必须确保选择具有最新日志消息的follower作为新的leader。日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的leader中能够包含这条消息。这里就有一个需要权衡(tradeoff)的地方,如果leader在消息被提交前需要等待更多的follower确认,那么在它宕机之后就可以有更多的follower替代它,不过这也会造成性能的下降。

对于这种权衡,一种常见的做法是“少数服从多数”,它可以用来负责提交决策和选举决策。虽然Kafka不采用这种方式,但可以拿来探讨和理解权衡的艺术。在这种方式下,如果我们有2f+1个副本,那么在提交之前必须保证有f+1个副本同步完消息。同时为了保证能正确选举出新的leader,至少要保证有f+1个副本节点完成日志同步并从同步完成的副本中选举出新的leader节点。并且在不超过f个副本节点失败的情况下,新的leader需要保证不会丢失已经提交过的全部消息。这样在任意组合的 f+1 个副本中,理论上可以确保至少有一个副本能够包含已提交的全部消息,这个副本的日志拥有最全的消息,因此会有资格被选举为新的 leader来对外提供服务。

“少数服从多数”的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为3,那么延迟就取决于最快的那个follower而不是最慢的那个(除了leader,只需要另一个follower确认即可)。不过它也有一些劣势,为了保证leader选举的正常进行,它所能容忍的失败follower数比较少,如果要容忍1个follower失败,那么至少要有3个副本,如果要容忍2个follower失败,必须要有5个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这也就是“少数服从多数”的这种Quorum模型常被用作共享集群配置(比如ZooKeeper),而很少用于主流的数据存储中的原因。

与“少数服从多数”相关的一致性协议有很多,比如Zab、Raft和ViewstampedReplication等。而Kafka使用的更像是微软的PacificA算法。

在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的 leader。写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为leader,选举过程简单、开销低,这也是Kafka选用此模型的重要因素。Kafka中包含大量的分区,leader副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响Kafka的性能指标。

在采用ISR模型和(f+1)个副本数的配置下,一个Kafka分区能够容忍最大f个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍f个节点失败,“少数服从多数”的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数”需要3个副本和1个follower的确认信息,采用ISR的方式需要2个副本和1个follower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低,“少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对Kafka而言,这种能力可以交由客户端自己去选择。

另外,一般的同步策略依赖于稳定的存储系统来做数据恢复,也就是说,在数据恢复时日志文件不可丢失且不能有数据上的冲突。不过它们忽视了两个问题:首先,磁盘故障是会经常发生的,在持久化数据的过程中并不能完全保证数据的完整性;其次,即使不存在硬件级别的故障,我们也不希望在每次写入数据时执行同步刷盘(fsync)的动作来保证数据的完整性,这样会极大地影响性能。而 Kafka 不需要宕机节点必须从本地数据日志中进行恢复,Kafka 的同步方式允许宕机副本重新加入ISR集合,但在进入ISR之前必须保证自己能够重新同步完leader中的所有数据。

参考文档

《深入理解 Kafka:核心设计与实践原理》——朱忠华