消息队列
1. 为什么要使用消息队列
- 通过异步处理提高系统性能(削峰、减少响应所需的时间)
- 降低系统的耦合性
- 消息总线
通过异步处理提高系统性能(削峰、减少响应所需时间)
如上图,在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。
通过以上分析可以得出消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。
举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
降低系统耦合性
消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。
另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
消息总线
引入消息队列的缺点:
- 系统可用性降低:系统可用性在某种程度上降低。在加入MQ之前,不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后就需要去考虑了
- 系统复杂性提高:加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
- 一致性问题:消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
2. 消息队列实现机制/ AMPQ 协议
AMPQ,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的限制。
AMQP 其实和Http一样都是一种协议, 只不过 Http 是针对网络传输的, 而 AMQP 是基于消息队列的
AMQP 协议中的基本概念:
- Broker: 接收和分发消息的应用,我们在介绍消息中间件的时候所说的消息系统就是 Message Broker。
- Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
- Connection: publisher/consumer 和 broker 之间的 TCP 连接。断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题。
- Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
- Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
- Queue: 消息最终被送到这里等待 consumer 取走。一个 message 可以被同时拷贝到多个 queue 中。
- Binding: exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
AMPQ 提供了五种消息模型:
- direct exchange
- fnout exchange
- topic change
- headers exchange
- system exchange
3. RabbitMQ交换机exchange的类型有哪些?
RabbitMQ常用的Exchange类型有fanout
,direct
,topic
,headers
。
fanout
fanout
类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以fanout
类型是所有的交换机类型里面速度最快的。fanout
类型常用来广播消息。
direct
direct
类型的Exchange路由规则也很简单,它会把消息路由到那些Bindingkey
与RoutingKey
完全匹配的Queue
中。
以上图为例,如果发送消息的时候设置路由键为warning
,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为Info
或者debug
,消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
direct
类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
topic
direct
类型的交换器路由规则是完全匹配BindingKey
和RoutingKey
,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic
类型的交换器在匹配规则上进行了扩展,它与direct
类型的交换器相似,也是将消息路由到BindingKey
和RoutingKey
相匹配的队列中,但这里的匹配规则有些不同,它约定:
RoutingKey
为一个点号.
分隔的字符串(被点号.
分隔开的每一段独立的字符串称为一个单词),如com.rabbitmq.client
、java.util.concurrent
、com.hidden.client
;BindingKey
和RoutingKey
一样也是点号.
分隔的字符串;BindingKey
中可以存在两种特殊字符串*
和#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)。
headers
headers
类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers
属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers
(也是一个键值对的形式)对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers
类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
4. RabbitMQ 如何保证消息可靠性传输?/RabbitMQ 消息确认机制(事务+Confirm)
在 Rabbitmq 中可以通过持久化来解决因为服务器异常而导致的数据丢失问题。但是还存在另外一个问题:生产者将消息发送出去之后,消息是否真正到达Rabbitmq服务器是无法确定的(即Rabbitmq不会反馈任何消息给生产者),默认情况下是不知道消息有没有正确到达的。
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本没有到达Rabbitmq服务器。
Rabbitmq提供了两种解决方式:
- 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案。
- 通过将channel设置成confirm模式来实现。
事务机制
RabbitMQ中与事务机制有关的方法有三个:txSelect(),txCommit()以及txRollback()。txSelect()用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect()开启事务之后,便可以发布消息给broker代理服务器了,如果txCommit()提交成功了,则表明消息到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候便可以捕获异常通过txRollback回滚事务了。
缺点:采用这种模式比较耗时,降低了Rabbitmq的吞吐量
Confirm模式
Confirm模式是相对比较高效的结局方案,分为三种情况:普通Confirm模式,批量Confirm模式以及异步Confirm模式。
producer端confirm模式的实现原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于它是异步的,一旦发布了一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
普通confirm模式
每发送一条消息后,调用 waitForConfirms
方法,等待服务器端confirm,实际上是一种串行confirm了。
该技术实现相对比较简单,但是有一个主要缺点:由于消息确认会阻止所有后续消息的发布,因此它会大大减慢发布速度。这种方法不会提供每秒超过数百条已发布消息的吞吐量。
批量confirm模式
每发送一批消息后,调用 waitForConfirms
方法,等待服务器端confirm。批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来 publish 消息,然后等待服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm 效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。
异步confirm模式
提供了一个回调方法,服务端confirm了一条或者多条消息之后Client端会回调这个方法。
Channel 对象提供的 ConfirmListener() 回调方法只包含 deliveryTag(当前Channel发出的消息序号),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加1,每回调一次 handleAck 方法,unconofirm 集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个 unconfirm 集合最好采用有序集合SortedSet 存储结构。实际上,SDK中的 waitForConfirms() 方法也是通过SortedSet维护这个消息序号的。
5. 几种消息队列对比
ActiveMQ:的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
RabbitMQ:在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
RocketMQ:阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的。
kafka:特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。
6. RabbitMQ如何保证消息队列的高可用?
RabbitMQ 有三种模式:单机模式,普通集群模式,镜像集群模式。
对于 RabbitMQ,要保证高可用,就需要使用集群模式。
普通集群模式
就是在多台机器上启动多个 rabbitmq 实例,每个机器启动一个。但是你创建的 queue,只会放在一个rabbtimq 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。当你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 rabbitmq 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
这方案主要是提高吞吐量的,并没有什么高可用可言,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式
这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的话,好处在于,任何一个机器宕机了,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,增加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展 queue。
7. RabbitMQ 如何保证消息队列消费的顺序性?
在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证是按照顺序来的吧?不然本来是:增加、修改、删除;换了顺序给执行成删除、修改、增加,就全错了。
本来这个数据同步过来,应该最后这个数据被删除了;结果搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
先看看顺序会错乱的俩场景:
RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1、data2、data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1、data3。这不明显乱了。
解决方案是拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
8. Kafka 应用场景,和其他消息队列对比的优势
Kafka 被定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。Kafka 在现代的系统中主要承担三大角色:
- 消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
- 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
和其他消息队列相比,Kafka的优势在哪里?
现在经常提到 Kafka 的时候就已经默认它是一个非常优秀的消息队列了,也会经常拿它跟 RocketMQ、RabbitMQ 对比。Kafka 相比其他消息队列主要的优势如下:
- 极致的性能 :基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
- 生态系统兼容性无可匹敌 :Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
实际上在早期的时候 Kafka 并不是一个合格的消息队列,早期的 Kafka 在消息队列领域就像是一个衣衫褴褛的孩子一样,功能不完备并且有一些小问题比如丢失消息、不保证消息可靠性等等。当然,这也和 LinkedIn 最早开发 Kafka 用于处理海量的日志有很大关系,哈哈哈,人家本来最开始就不是为了作为消息队列滴,谁知道后面误打误撞在消息队列领域占据了一席之地。
随着后续的发展,这些短板都被 Kafka 逐步修复完善。所以,Kafka 作为消息队列不可靠这个说法已经过时!
9. Kafka 的消息模型(Producer、Consumer、Broker、Topic、Partition的概念)
kafka 采用发布-订阅的消息模型,使用主题(Topic) 作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后,才订阅的用户则是收不到该条消息的。
在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。
RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。
Kafka 将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题),如下图所示
Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
Consumer:消费者,也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
Consumer Group:消费者组,与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每一个分区只能被一个消费组中的一个消费者所消费。
Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,习惯使用首字母小写的 broker 来表示服务代理节点。
在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。
Topoc: Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
Partition: Partition 属于 Topic 的一部分。主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。
Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。
10. Kafka 的多副本机制
Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。
副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。
生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
多分区+多副本机制带来的好处
- Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
- Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
11. Kafka 如何保证消息的顺序消费
在 kafka 中,同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
每次添加消息到 Partition(分区) 的时候都会采用尾加法。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
所以,有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个可以采用表/对象的 id 来作为 key 。
对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
12. 使用 Kafka 如何保证消息不丢失(消息可靠性传输)
生产者丢失消息的情况
生产者(Producer) 调用send
方法发送消息之后,消息可能因为网络问题并没有发送过去。
所以,不能默认在调用send
方法发送消息之后消息发送成功了。为了确定消息是发送成功,要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send
方法发送消息实际上是异步的操作,可以通过 get()
方法获取调用结果,但这样相当于变成了同步操作。Kafka 中提供了回调方法,可以通过回调的方式来实现。
另外推荐为 Producer 的retries
(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次3次一下子就重试完了。
消费者丢失消息的情况
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,这样会带来消息被重新消费的问题。比如刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
kafka 副本发生主从切换导致的消息丢失
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个 Leader,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
这里有几种解决方式:
设置 acks = all
acks 是 Kafka 生产者(Producer) 很重要的一个参数。acks 的默认值即为1,代表消息被leader副本接收之后就算被成功发送。当配置 acks = all
代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
设置 replication.factor >= 3
为了保证 leader 副本能有 follower 副本能同步消息,一般会为 topic 设置 replication.factor >= 3
。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1
一般情况下还需要设置 min.insync.replicas> 1
,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas
的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,需要确保 replication.factor > min.insync.replicas
。
假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性,一般推荐设置成 replication.factor = min.insync.replicas + 1
。
设置 unclean.leader.election.enable = false
Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false
发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当配置了 unclean.leader.election.enable = false
的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
13. Kafka 中如何保证消息不被重复消费(消息的幂等性)
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由开发来保证的。
Kafka 中出现重复消费的情况有两种:
- 当ack=-1时,如果在follower同步完成后,broker发送ack之前,leader发生故障,导致没有返回ack给Producer,由于失败重试机制,又会给新选举出来的leader发送数据,造成数据重复。
- (手动管理offset时,先消费后提交offset)消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)
Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,默认情况下, consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示”我已经消费过了,下次要是重启,就继续从上次消费到的 offset 来继续消费”。
但是凡事总有意外,比如有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。
举个例子:
有这么个场景。数据 1,2,3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152,153,154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1,2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka ,接着把上次消费到的那个地方后面的数据继续给传递过来。由于之前的 offset 没有提交成功,那么数据 1,2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。
如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,可能就把数据 1,2 在数据库里插入了 2 次,那么数据就错啦。
举个例子。假设有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
所以第二个问题来了,怎么保证消息队列消费的幂等性?
- 当把数据插入数据库时,先根据主键查一下,如果这数据都有了,你就别插入了,update 一下就好。
- 如果是写 Redis,那没问题了,每次都是 set,天然幂等性。
- 如果不是上面两个场景,那做的稍微复杂一点,需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那就别处理了,保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
总的来说,kafka出现消息重复消费的原因有两种:
- 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
- Kafka 侧由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
- 消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
- 将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。这里会有个问题:什么时候提交offset合适?- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
14. Zookeeper 在 kafka 中的作用
ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。其主要可以归类为一下几个:
Broker 注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokers/ids
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids
下创建属于自己的节点。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
Topic 注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]
的形式被记录,如/brokers/topics/login
和/brokers/topics/search
等。Broker 服务器启动后,会到对应Topic节点(/brokers/topics
)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2
,这个节点表示Broker ID为3的一个Broker服务器,对于”login”这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下
- 注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如
/consumers/[group_id]/ids/[consumer_id]
,完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。 - 对消费者分组 中的 消费者 的变化注册监听。每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对
/consumers/[group_id]/ids
节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。 - 对Broker服务器变化注册监听。消费者需要对
/broker/ids/[0-N]
中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。 - 进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。
生产者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
记录分区与消费者的关系
consumer group 下有多个 Consumer(消费者)。对于每个消费者组 (Consumer Group),Kafka 都会为其分配一个全局唯一的 Group ID,Group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他 group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用”Hostname:UUID”形式表示。
在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录消息分区与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]
就是一个消息分区的标识,节点内容就是该消息分区上消费者的Consumer ID。
消息消费进度 offer 记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 Offset 记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset 在 Zookeeper 中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
,节点内容就是Offset的值。
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题
__consumer_offsets
中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。
副本中 ISR 的伸缩
当 kafka 检测到 ISR 集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/<topic>/partition/<parititon>/state
节点中。
ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/<topic>/partition/<parititon>/state
节点。
早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了 zookeeper 的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。
15. Kafka ISR 机制
Kafka 分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。
与 leader 副本同步滞后过多的副本(不包括leader副本)组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。
leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了leader副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。
默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。
ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
如下图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset(LogStartOffset)为 0,最后一条消息的 offset 为 8,offset 为 9 的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 offset 在 0 至 5 之间的消息,而 offset 为 6 的消息对消费者而言是不可见的。
LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,图中 offset 为 9 的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
如下图所示,假设某个分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都为 3:
消息 3 和消息 4 从生产者发出之后会被先存入 leader 副本:
在消息写入 leader 副本之后,follower 副本会发送拉取请求来拉取消息 3 和消息 4 以进行消息同步。
在同步过程中,不同的 follower 副本的同步效率也不尽相同。如下图所示,在某一时刻 follower1 完全跟上了 leader 副本而 follower2 只同步了消息 3,如此 leader 副本的 LEO 为5,follower1 的 LEO 为5,follower2 的 LEO 为 4,那么当前分区的 HW 取最小值4,此时消费者可以消费到offset为0至3之间的消息。
写入消息(情形4)如下所示,所有的副本都成功写入了消息3和消息4,整个分区的HW和LEO都变为5,因此消费者可以消费到offset为4的消息了。
Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。而在异步复制方式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写入就被认为已经成功提交。在这种情况下,如果 follower 副本都还没有复制完而落后于 leader 副本,突然 leader 副本宕机,则会造成数据丢失。Kafka 使用的这种 ISR 的方式则有效地权衡了数据可靠性和性能之间的关系。
16. 什么是失效副本
正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即under-replicated分区。
失效副本不仅是指处于功能失效状态的副本,处于同步失效状态的副本也可以看作失效副本。
怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9.x版本开始就通过唯一的broker端参数 replica.lag.time.max.ms
(默认10000)来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合,具体参考下图:
当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。
Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。
不要错误地认为 follower 副本只要拉取 leader副本的数据就会更新lastCaughtUpTimeMs。当 leader 副本中消息的流入速度大于follower副本中拉取的速度时,就算 follower 副本一直不断地拉取 leader 副本的消息也不能与leader副本同步。如果还将此follower副本置于ISR集合中,那么当leader副本下线而选取此follower副本为新的leader副本时就会造成消息的严重丢失。
Kafka源码注释中说明了一般有两种情况会导致副本失效:
follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。
follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。
在这里再补充一点,如果通过工具增加了副本因子,那么新增加的副本在赶上leader副本之前也都是处于失效状态的。如果一个follower副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上leader副本之前也处于失效状态。
17. Kafka 中高可靠性的实现
多副本机制
Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。
从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息。
ISR 的伸缩
ISR 缩减
Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为“isr-expiration”和“isr-change-propagation”。
isr-expiration 任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和 replica.lag.time.max.ms
参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合。
如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/<topic>/partition/<parititon>/state
节点中。节点中的数据示例如下:
{"controller_epoch":26,"leader":0,"version":1,"leader_epoch":2,"isr":[0,1]}
其中 controller_epoch 表示当前Kafka控制器的 epoch,leader 表示当前分区的leader副本所在的broker的id编号,version表示版本号(当前版本固定为1),leader_epoch 表示当前分区的 leader 纪元,isr表示变更后的ISR列表。
除此之外,当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation 任务会周期性(固定值为 2500ms)地检查 isrChangeSet,如果发现 isrChangeSet 中有ISR集合的变更记录,那么它会在ZooKeeper的 /isr_change_notification
路径下创建一个以 isr_change_
开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000
),并将isrChangeSet中的信息保存到这个节点中。
Kafka控制器为/isr_change_notification
添加了一个Watcher,当这个节点中有子节点发生变化时会触发Watcher的动作,以此通知控制器更新相关元数据信息并向它管理的broker节点发送更新元数据的请求,最后删除/isr_change_notification
路径下已经处理过的节点。
频繁地触发Watcher会影响Kafka控制器、ZooKeeper甚至其他broker节点的性能。为了避免这种情况,Kafka添加了限定条件,当检测到分区的 ISR 集合发生变化时,还需要检查以下两个条件:
- 上一次ISR集合发生变化距离现在已经超过5s
- 上一次写入 ZooKeeper 的时间距离现在已经超过60s
满足以上两个条件之一才可以将ISR集合的变化写入目标节点。
ISR 扩充
随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。
追赶上leader副本的判定准则是此副本的 LEO 是否不小于 leader 副本的 HW,注意这里并不是和leader副本的LEO相比。
ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/<topic>/partition/<parititon>/state
节点和isrChangeSet,之后的步骤就和ISR收缩时的相同。当ISR集合发生增减时,或者ISR集合中任一副本的LEO发生变化时,都可能会影响整个分区的HW。
如下图所示:
leader副本的LEO为9,follower1副本的LEO为7,而follower2副本的LEO为6,如果判定这3个副本都处于ISR集合中,那么这个分区的HW为6;如果follower2已经被判定为失效副本被剥离出ISR集合,那么此时分区的HW为leader副本和follower1副本中LEO的最小值,即为7。
LEO 和 HW
对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(RemoteReplica),本地副本是指对应的 Log 分配在当前的 broker 节点上,远程副本是指对应的 Log 分配在其他的broker节点上。
在Kafka中,同一个分区的信息会存在多个broker节点上,并被其上的副本管理器所管理,这样在逻辑层面每个broker节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。
如下图所示:
某个分区有3个副本分别位于broker0、broker1和broker2节点中,其中带阴影的方框表示本地副本。假设broker0上的副本1为当前分区的leader副本,那么副本2和副本3就是follower副本,整个消息追加的过程可以概括如下:
- 生产者客户端发送消息至leader副本(副本1)中
- 消息被追加到leader副本的本地日志,并且会更新日志的偏移量。
- follower副本(副本2和副本3)向leader副本请求同步数据。
- leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本的信息。
- leader副本所在的服务器将拉取结果返回给follower副本。
- follower副本收到leader副本返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。
再来分析在这个过程中各个副本LEO和HW的变化情况。
假设生产者一直在往leader副本(带阴影的方框)中写入消息,某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。
之后follower副本(不带阴影的方框)向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequest请求中的fetch_offset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,如下图所示,这个HW信息对应的是FetchResponse中的high_watermark。
此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。
与此同时,follower 副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)=0)。
接下来follower副本再次请求拉取leader副本中的消息,如下图:
此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本,如下图:
注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。
两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3(min(LEO,3)=3)。
在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。leader副本中带有其他 follower 副本的 LEO,那么它们是什么时候更新的呢?leader 副本收到 follower副本的FetchRequest请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。
引入 Leader Epoch 解决数据丢失或leader副本和follower副本数据不一致的问题
在正常情况下的leader副本与follower副本之间的同步过程,如果leader副本发生切换,那么同步过程又该如何处理呢?
在0.11.0.0版本之前,Kafka使用的是基于HW的同步机制,但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题。
如下图所示:
Replica B是当前的leader副本(用L标记),Replica A是follower副本。在某一时刻,B中有2条消息m1和m2,A从B中同步了这两条消息,此时A和B的LEO都为2,同时HW都为1;之后A再向B中发送请求以拉取消息,FetchRequest请求中带上了A的LEO信息,B在收到请求之后更新了自己的HW为2;B中虽然没有更多的消息,但还是在延时一段时间之后返回FetchResponse,并在其中包含了HW信息;最后A根据FetchResponse中的HW信息更新自己的HW为2。
可以看到整个过程中两者之间的HW同步有一个间隙,在A写入消息m2之后(LEO更新为2)需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为2。如果在这个时候A宕机了,那么在A重启之后会根据之前HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会将m2这条消息删除,此时A只剩下m1这一条消息,之后A再向B发送FetchRequest请求拉取消息。
此时若 B 再宕机,那么 A 就会被选举为新的leader。B 恢复之后会成为follower,由于follower副本HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为1。这样一来m2这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。(两个轮流挂,有点离谱。。。。)
对于这种情况,也有一些解决方法,比如等待所有follower副本都更新完自身的HW之后再更新leader副本的HW,这样会增加多一轮的FetchRequest/FetchResponse延迟,自然不够妥当。还有一种方法就是follower副本恢复之后,在收到leader副本的FetchResponse前不要截断follower副本(follower副本恢复之后会做两件事情:截断自身和向leader发送FetchRequest请求),不过这样也避免不了数据不一致的问题。
如下图所示:
当前leader副本为A,follower副本为B,A中有2条消息m1和m2,并且HW和LEO都为2,B中有1条消息m1,并且HW和LEO都为1(此时还没收到 leader 的 hw 更新返回)。假设A和B同时“挂掉”,然后B第一个恢复过来并成为leader,如图所示:
之后B写入消息m3,并将LEO和HW更新至2(假设所有场景中的min.insync.replicas参数配置为1)。此时A也恢复过来了,根据前面数据丢失场景中的介绍可知它会被赋予follower的角色,并且需要根据HW截断日志及发送FetchRequest至B,不过此时A的HW正好也为2,那么就可以不做任何调整了,如下图所示:
如此一来A中保留了m2而B中没有,B中新增了m3而A也同步不到,这样A和B就出现了数据不一致的情形。
为了解决上述两种问题,Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。
leader epoch代表leader的纪元信息(epoch),初始值为0。每当leader变更一次,leaderepoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch=>StartOffset>
,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leader epoch变更时,会将对应的矢量对追加到这个文件中,v2版本的消息格式时就提到了消息集中的partition leader epoch
字段,而这个字段正对应这里讲述的leader epoch。
再来看一下引入 leader epoch 之后如何应付前面所说的数据丢失和数据不一致的场景。首先讲述应对数据丢失的问题,如下图所示:
同样 A 发生重启,之后 A 不是先忙着截断日志而是先发送OffsetsForLeaderEpochRequest请求给 B(OffsetsForLeaderEpochRequest 请求体结构如下图所示,其中包含 A 当前的LeaderEpoch值)
B作为目前的leader,在收到请求之后会返回当前的LEO(LogEndOffset,注意图中LE0和LEO的不同),与请求对应的响应为OffsetsForLeaderEpochResponse,对应的响应体结构可以参考图下图:
如果A中的LeaderEpoch(假设为LE_A)和B中的不相同,那么B此时会查找LeaderEpoch为 LE_A+1 对应的 StartOffset 并返回给 A,也就是 LE_A 对应的LEO,所以可以将OffsetsForLeaderEpochRequest的请求看作用来查找follower副本当前LeaderEpoch的LEO。A在收到2之后发现和目前的LEO相同,也就不需要截断日志了。
之后跟前面一样,B发生了宕机,A成为新的leader,那么对应的LE=0也变成了LE=1,对应的消息m2此时就得到了保留,这是原本所不能保留的。之后不管B有没有恢复,后续的消息都可以以LE1为LeaderEpoch陆续追加到A中。
再来看一下leader epoch如何应对数据不一致的场景。如下图所示,当前A为leader,B为follower,A中有2条消息m1和m2,而B中有1条消息m1。假设A和B同时“挂掉”,然后B第一个恢复过来并成为新的leader。
之后B写入消息m3,并将LEO和HW更新至2。注意此时的LeaderEpoch已经从LE0增至LE1了。
紧接着A也恢复过来成为follower并向B发送OffsetsForLeaderEpochRequest请求,此时A的LeaderEpoch为LE0。B根据LE0查询到对应的offset为1并返回给A,A就截断日志并删除了消息m2。之后A发送FetchRequest至B请求来同步数据,最终A和B中都有两条消息m1和m3,HW和LEO都为2,并且LeaderEpoch都为LE1,如此便解决了数据不一致的问题。
客户端消息传输保障
一般而言,消息中间件的消息传输保障有3个层级,分别如下:
at most once:至多一次。消息可能会丢失,但绝对不会重复传输。
at least once:最少一次。消息绝不会丢失,但可能会重复传输。
exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。
Kafka 的消息传输保障机制非常直观。当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。如果生产者发送消息到 Kafka之后,遇到了网络问题而造成通信中断,那么生产者就无法判断该消息是否已经提交。虽然Kafka无法确定网络故障期间发生了什么,但生产者可以进行多次重试来确保消息已经写入 Kafka,这个重试的过程中有可能会造成消息的重复写入,所以对生产者来说, Kafka 提供的消息传输保障为 at least once。
对消费者而言,消费者处理消息和提交消费位移的顺序在很大程度上决定了消费者提供哪一种消息传输保障。
如果消费者在拉取完消息之后,应用逻辑先处理消息后提交消费位移,那么在消息处理之后且在位移提交之前消费者宕机了,待它重新上线之后,会从上一次位移提交的位置拉取,这样就出现了重复消费,因为有部分消息已经处理过了只是还没来得及提交消费位移,此时就对应at least once。
如果消费者在拉完消息之后,应用逻辑先提交消费位移后进行消息处理,那么在位移提交之后且在消息处理完成之前消费者宕机了,待它重新上线之后,会从已经提交的位移处开始重新消费,但之前尚有部分消息未进行消费,如此就会发生消息丢失,此时就对应at most once。
Kafka 从 0.11.0.0 版本开始引入了幂等和事务这两个特性,以此来实现 EOS(exactly once semantics,精确一次处理语义)。
幂等
所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。
生产者在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幂等性功能之后就可以避免这种情况。开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数 enable.idempotence
设置为 true 即可(这个参数的默认值为false)。
不过如果要确保幂等性功能正常,还需要确保生产者客户端的 retries
、acks
、max.in.flight.requests.per.connection
这几个参数不被配置错。实际上在使用幂等性功能的时候,用户完全可以不用配置(也不建议配置)这几个参数。
如果用户显式地指定了 retries 参数,那么这个参数的值必须大于 0,否则会报出ConfigException;
如果用户没有显式地指定 retries 参数,那么 KafkaProducer 会将它置为Integer.MAX_VALUE。同时还需要保证
max.in.flight.requests.per.connection
参数的值不能大于5(这个参数的值默认为5),否则也会报出ConfigException;如果用户还显式地指定了 acks 参数,那么还需要保证这个参数的值为-1(all),如果不为-1(这个参数的值默认为1),那么也会报出ConfigException。如果用户没有显式地指定这个参数,那么KafkaProducer会将它置为-1。
开启幂等性功能之后,生产者就可以如同未开启幂等时一样发送消息了。为了实现生产者的幂等性,Kafka为此引入了producer id(以下简称PID)和序列号(sequence number)这两个概念,分别对应 v2 版的日志格式中RecordBatch的producer id和first seqence这两个字段。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将<PID,分区>
对应的序列号的值加1。
broker端会在内存中为每一对<PID,分区>
维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_new<SN_old+1
,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new>SN_old+1
,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction()等方法的调用都会抛出IllegalStateException的异常。
引入序列号来实现幂等也只是针对每一对<PID,分区>
而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。但是,发送了两条相同的消息,不过这仅仅是指消息内容相同,但对Kafka 而言是两条不同的消息,因为会为这两条消息分配不同的序列号。Kafka 并不会保证消息内容的幂等。
事务
幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。
对流式应用(Stream Processing Applications)而言,一个典型的应用模式为“consume-transform-produce”。在这种模式下消费和生产并存:应用程序从某个主题中消费消息,然后经过一系列转换后写入另一个主题,消费者可能在提交消费位移的过程中出现问题而导致重复消费,也有可能生产者重复生产消息。
Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
为了实现事务,应用程序必须提供唯一的 transactionalId,这个 transactionalId通过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将 enable.idempotence
设置为 true (如果未显式设置,则KafkaProducer默认会将它的值设置为true),如果用户显式地将menable.idempotence
设置为false,则会报出ConfigException。
transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。另外,为了保证新的生产者启动后,具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。如果使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报出错误。
producer epoch 对应 v2 版的日志格式中RecordBatch的producer epoch字段。从生产者的角度分析,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactionalId 的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。
而从消费者的角度分析,事务能保证的语义相对偏弱。出于以下原因,Kafka 并不能保证已提交的事务中的所有消息都能够被消费:
- 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)。
- 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失。
- 消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
- 消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息。
在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。
举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行 commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了 abortTransaction()方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。
日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT和ABORT,分别用来表征事务已经成功提交或已经被成功中止。KafkaConsumer 可以通过这个控制消息来判断对应的事务是被提交了还是被中止了,然后结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端应用,如下图所示:
为了实现事务的功能,Kafka还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的TransactionCoordinator,所有的事务逻辑包括分派PID 等都是由 TransactionCoordinator 来负责实施的。TransactionCoordinator会将事务状态持久化到内部主题__transaction_state 中。
18. Kafka 分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是 roundrobin,一是range,最新还有一个StickyAssignor策略。
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:
- 同一个 Consumer Group 内新增消费者
- 消费者离开当前所属的Consumer Group,包括 shuts down 或 crashes
- 订阅的主题新增分区
目前还不能自定义分区分配策略,只能通过partition.assignment.strategy
参数选择 range 或 roundrobin。**partition.assignment.strategy
参数默认的值是range**。
Kafka 提供了消费者客户端参数 partition.assignment.strategy
用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor
,即采用 RangeAssignor 分配策略。除此之外,Kafka 中还提供了另外两种分配策略: RoundRobinAssignor
和 StickyAssignor
。消费者客户端参数partition.asssignment.strategy
可以配置多个分配策略,彼此之间以逗号分隔。
Range(范围) 分配策略
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1,那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9
如果有11个分区将会是:
C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)
可以看出, C1-0消费者线程比其他消费者线程多消费了2个分区
如上,只是针对 1 个 topic 而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了。
RoundRobin(循环) 分配策略
RoundRobinAssignor 策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。
RoundRobinAssignor策略对应的 partition.assignment.strategy
参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor
。
使用RoundRobin策略有两个前提条件必须满足:
- 同一个消费者组里面的所有消费者的 num.streams(消费者消费线程数)必须相等;
- 每个消费者订阅的主题必须相同。
所以这里假设前面提到的 2 个消费者的 num.streams = 2。
RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,最后按照round-robin风格将分区分别分配给不同的消费者线程。
以前面的案例,加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;
RoundRobin的两种情况
如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。
举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2
如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。
举例,假设消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:
消费者C0:t0p0 消费者C1:t1p0 消费者C2:t1p1、t2p0、t2p1、t2p2
可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。
StickyAssignor(粘性)分配策略
我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
- 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
- 分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。举例来看一下StickyAssignor策略的实际效果。
假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同,但事实是否真的如此呢?
此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
到目前为止所分析的都是消费者的订阅信息都是相同的情况,来看一下订阅信息不同的情况下的处理。
举例,同样消费组内有3个消费者:C0、C1和C2,集群中有3个主题:t0、t1和t2,这3个主题分别有1、2、3个分区,也就是说集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。消费者C0订阅了主题t0,消费者C1订阅了主题t0和t1,消费者C2订阅了主题t0、t1和t2。
如果此时采用RoundRobinAssignor策略,那么最终的分配结果如下所示(和讲述RoundRobinAssignor策略时的一样,这样不妨赘述一下):
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
如果此时采用的是StickyAssignor策略,那么最终的分配结果为:
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2
可以看到这是一个最优解(消费者C0没有订阅主题t1和t2,所以不能分配主题t1和t2中的任何分区给它,对于消费者C1也可同理推断)。
假如此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:
消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2
可以看到RoundRobinAssignor策略保留了消费者C1和C2中原有的3个分区的分配:t2p0、t2p1和t2p2(针对结果集1)。而如果采用的是StickyAssignor策略,那么分配结果为:
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2
可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。
从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。
19. 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
大量消息在 mq 里积压了几个小时了还没解决
几千万条数据在 MQ 里积压了七八个小时,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。
但这种方式显示不能紧急恢复问题,一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
mq 中的消息过期失效了
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。可以采取一个方案,就是批量重导,就是大量积压的时候,当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 都快写满了
如果消息积压在 mq 里,很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让第一个方案执行的太慢了,临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据。
20. Kafka如何实现每秒上百万的超高并发写入?
Kafka 是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的 Kafka 集群甚至可以做到每秒几十万、上百万的超高并发写入。
那么 Kafka 到底是如何做到这么高的吞吐量和性能的呢?
页缓存
操作系统本身有一层缓存,叫做 Page Cache,是在内存里的缓存,也可以称之为 OS Cache,意思就是操作系统自己管理的缓存,用来减少对磁盘 I/O 的操作。
具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异,现代操作系统越来越“激进地”将内存作为磁盘缓存,甚至会非常乐意将所有可用的内存用作磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。
在写入磁盘文件的时候,可以直接写入这个 OS Cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 OS Cache 里的数据真的刷入磁盘文件中。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式,否则页缓存很难被禁止。此外,使用 Java 开发的程序会有两个问题:
对象的内存开销非常大,通常会是真实数据大小的几倍甚至更多,空间使用率低下;
Java的垃圾回收会随着堆内数据的增多而变得越来越慢。
基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,可以在 32GB 的机器上使用28GB至30GB的内存而不用担心GC所带来的性能问题。
此外,即使Kafka服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.messages
、log.flush.interval.ms
等参数来控制。
同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。
Linux 系统会使用磁盘的一部分作为swap分区,这样可以进行进程的调度:把当前非活跃的进程调入 swap 分区,以此把内存空出来让给活跃的进程。对大量使用系统页缓存的 Kafka而言,应当尽量避免这种内存的交换,否则会对它各方面的性能产生很大的负面影响。可以通过修改 vm.swappiness
参数(Linux系统参数)来进行调节。vm.swappiness
参数的上限为 100,它表示积极地使用 swap 分区,并把内存上的数据及时地搬运到 swap 分区中;vm.swappiness
参数的下限为0,表示在任何情况下都不要发生交换,这样一来,当内存耗尽时会根据一定的规则突然中止某些进程。建议将这个参数的值设置为 1,这样保留了 swap 的机制而又最大限度地限制了它对 Kafka 性能的影响。
磁盘顺序写
Kafka 依赖于文件系统(更底层地来说就是磁盘)来存储和缓存消息。对于各个存储介质的速度,大体同下图所示,层级越高代表速度越快:
而实际上,磁盘的速度快慢完全取决于我们如何使用它。有关测试结果表明,一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性(顺序)写入速度可以达到600MB/s,而随机写入速度只有100KB/s,两者性能相差6000倍。操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快。
Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,即使数据刷入磁盘的时候,性能也是极高的,也跟写内存是差不多的。所以就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑。这是让Kafka在性能上具备足够竞争力的一大因素。
零拷贝
除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(Zero-Copy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。
从 Kafka 里经常要消费数据,那么消费的时候实际上就是要从 Kafka 的磁盘文件里读取某条数据然后发送给下游的消费者,假设要是 Kafka 什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程如下所示:
- 先看看要读的数据在不在 OS Cache 里,如果不在的话就从磁盘文件里读取数据后放入 OS Cache。
- 接着从操作系统的 OS Cache 里拷贝数据到应用程序进程的缓存里,再从应用程序进程的缓存里拷贝数据到操作系统层面的 Socket 缓存里。
- 从 Socket 缓存里提取数据后发送到网卡,发送出去给下游消费。
很明显可以看到上面的过程有两次没必要的拷贝,一次是从操作系统的 Cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。
而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行,所以这种方式来读取数据是比较消耗性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。也就是说,直接让操作系统的 Cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。
零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于 Java 语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。
21. Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
- 分区器:根据键值确定消息应该处于哪个分区中,默认情况下使用轮询分区,可以自行实现分区器接口自定义分区逻辑
- 序列化器:键序列化器和值序列化器,将键和值都转为二进制流,还有反序列化器将二进制流转为指定类型数据
- 拦截器: 拦截器拦截两个方法, doSend()方法会在序列化之前完成,onAcknowledgement()方法在消息确认或失败时调用可以添加多个拦截器按顺序执行
调用顺序: 拦截器doSend() -> 序列化器 -> 分区器
22. Kafka 中的分区数是否可以改变?
kafka 中的分区数可以通过命令脚本进行增加,但是不支持删减,原因在于被删除的分区数据难以处理。
23. Kafka的日志目录结构
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset)。
如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。
为了防止 Log 过大, Kafka 又引入了**日志分段(LogSegment)**的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上, Log 和 LogSegnient 也不是纯粹物理意义上的概念, Log 在物理上只以文件夹的形式存储, 而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以 . txnindex
为后缀的事务索引文件)。下图描绘了主题、分区与副本之间,Log 和 LogSegment 的关系。
向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作, 在此之前所有的 LogSegment 都不能写入数据。为了方便描述, 将最后一个 LogSegment 称为 activeSegment
,即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegment 满足一定的条件时,就需要创建新的 activeSegment,之后追加的消息将写入新的 activeSegment。
为了便于消息的检索, 每个 LogSegment 中的日志文件 (以 .log
为文件后缀)都有对应的两个索引文件:偏移量索引文件(以 .index
为文件后缀)和时间戳索引文件(以 . timeindex
为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的offset。 偏移量是一个 64 位的长整数, 日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用0填充。 比如第一个 LogSegment 的基准偏移量为 0, 对应的日志文件为 00000000000000000000.log。
注意每个 LogSegment 中不只包含 .log
,.index
,.timeindex
这 3 种文件,还可能包含 .deleted
,.cleaned
,.swap
等临时文件,以及可能的 .snapshot
,.txnindex
,leader-epoch-checkpoint
等文件。
24. 指定 offset 的查找
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;
时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes
指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes
的值,对应地可以增加或缩小索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
当日志分段文件达到一定的条件时需要进行切分,那么其对应的索引文件也需要进行切分。对非当前活跃的日志分段而言,其对应的索引文件内容已经固定而不需要再写入索引项,所以会被设定为只读。而对当前活跃的日志分段(activeSegment)而言,索引文件还会追加更多的索引项,所以被设定为可读写。
在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件。
偏移量索引
偏移量索引每个索引项占用8个字节,分为两个部分。
- relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用4个字节,当前索引文件的文件名即为baseOffset的值。
- position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
偏移量索引的查找过程如下:
假设现在只有 00000000000000000000.index和 00000000000000000000.log:
如果要查找偏移量为 23 的消息,首先通过二分法在偏移量索引文件中找到不大于23的最大索引项,即[22,656],然后从日志分段文件中的物理位置656开始顺序查找偏移量为23的消息。
当日志分段文件不止一份的时候,则需要先定位具体的日志分段文件:
如下图,假设要查找偏移量为268的消息,那么应该怎么办呢?
首先肯定是定位到 baseOffset 为 251 的日志分段,然后计算相对偏移量relativeOffset=268-251=17,之后再在该日志分段对应的索引文件中找到不大于17的索引项,最后根据索引项中的position定位到具体的日志分段文件位置开始查找目标消息。
那么又是如何查找baseOffset 为251的日志分段的呢?这里并不是顺序查找,而是用了跳跃表的结构。Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
时间戳索引
时间戳索引项的格式如下图所示:
每个索引项占用12个字节,分为两个部分:
timestamp:当前日志分段最大的时间戳。
relativeOffset:时间戳所对应的消息的相对偏移量。
时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp,否则不予追加。如果 broker 端参数 log.message.timestamp.type
设置为 LogAppendTime,那么消息的时间戳必定能够保持单调递增;相反,如果是 CreateTime 类型则无法保证。
生产者可以使用类似 ProducerRecord(String topic,Integer partition,Longtimestamp,K key,V value)的方法来指定时间戳的值。即使生产者客户端采用自动插入的时间戳也无法保证时间戳能够单调递增,如果两个不同时钟的生产者同时往一个分区中插入消息,那么也会造成当前分区的时间戳乱序。
与偏移量索引文件相似,时间戳索引文件大小必须是索引项大小(12B)的整数倍,如果不满足条件也会进行裁剪。同样假设 broker 端参数 log.index.size.max.bytes
配置为67,那么对应于时间戳索引文件,Kafka 在内部会将其转换为60。
每当写入一定量的消息时,就会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。两个文件增加索引项的操作是同时进行的,但并不意味着偏移量索引中的 relativeOffset 和时间戳索引项中的 relativeOffset 是同一个值。
假设现在有以下日志分段文件:
如果要查找指定时间戳targetTimeStamp=1526384718288开始的消息,首先是找到不小于指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段了,需要分以下几个步骤来完成。
- 将targetTimeStamp和每个日志分段中的最大时间戳largestTimeStamp逐一对比,直到找到不小于 targetTimeStamp 的 largestTimeStamp 所对应的日志分段。日志分段中的largestTimeStamp的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。
- 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp的最大索引项,即[1526384718283,28],如此便找到了一个相对偏移量28。
- 在偏移量索引文件中使用二分算法查找到不大于28的最大索引项,即[26,838]。
- 从步骤1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp的消息。
25. Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
整个生产者客户端由两个线程协调运行,这两个线程分别为:
- 主线程:在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
- Sender 线程:负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory
配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms
的配置,此参数的默认值为 60000,即 60 秒。
主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque<ProducerBatch>
。消息写入缓存时,追加到双端队列的尾部;
Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory
参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer
实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 RecordAccumulator 的内部还有一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size
参数来指定,默认值为16384B,即16KB。可以适当地调大 batch.size
参数以便多缓存一些消息。
ProducerBatch 的大小和 batch.size
参数也有着密切的关系。当一条消息(ProducerRecord)流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。
在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size
参数的大小,如果不超过,那么就以 batch.size
参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本 <分区,Deque<ProducerBatch>>
的保存形式转变成 <Node,List<ProducerBatch>
的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成 <Node,List<ProducerBatch>>
的形式之后,Sender 还会进一步封装成 <Node,Request>
的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<NodeId,Deque<Request>>
,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.fight.requests.per.connection
,默认值为 5,即每个连接最多只能缓存 5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque<Request>
的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
26. Kafka 中 broker (Controller) 的作用
负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。
当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
27. Kafka 中服务端的延时设计(时间轮)
Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。
JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为 O(nlogn)
并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)
。
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize计算得出。
时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。
若时间轮的tickMs为1ms且wheelSize等于20,那么可以计算得出总体时间跨度interval为20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的TimerTaskList中。
随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimeTaskList中的任务进行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中,currentTime再过8ms后会指向时间格10。如果插入8ms任务的同时,还有一个定时为19ms的任务插进来,新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间。
如果此时有一个定时为350ms的任务该如何处理?直接扩充wheelSize的大小?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
如下图,第一层的时间轮tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即20ms。每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。
对于350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的TimerTaskList。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的TimerTaskList。注意到在到期时间为[400ms,800ms)区间内的多个任务(比如446ms、455ms和473ms的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的TimerTaskList的超时时间为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。
这里就有一个时间轮降级的操作,会将这个剩余时间为50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历40ms之后,此时这个任务又被“察觉”,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。
设计源于生活。常见的钟表就是一种具有三层结构的时间轮,第一层时间轮tickMs=1ms、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。
在 Kafka 中,第一层时间轮的参数同上面的案例一样:tickMs=1ms、wheelSize=20、interval=20ms,各个层级的wheelSize也固定为20,所以各个层级的tickMs和interval也可以相应地推算出来。
Kafka在具体实现时间轮TimingWheel时还有一些小细节:
- TimingWheel 在创建的时候以当前系统时间为第一层时间轮的起始时间(startMs),这里的当前系统时间并没有简单地调用 System.currentTimeMillis(),而是调用了Time.SYSTEM.hiResClockMs,这是因为 currentTimeMillis()方法的时间精度依赖于操作系统的具体实现,有些操作系统下并不能达到毫秒级的精度,而Time.SYSTEM.hiResClockMs实质上采用了System.nanoTime()/1_000_000来将精度调整到毫秒。
- TimingWheel 中的每个双向环形链表TimerTaskList都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点,那么线性表的第一个元素应该是链表的第二个节点。
- 除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的currentTime。每一层的currentTime都必须是tickMs的整数倍,如果不满足则会将currentTime修剪为tickMs的整数倍,以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime=startMs-(startMs%tickMs)。currentTime会随着时间推移而推进,但不会改变为tickMs的整数倍的既定事实。若某一时刻的时间为timeMs,那么此时时间轮的currentTime=timeMs-(timeMs%tickMs),时间每推进一次,每个层级的时间轮的currentTime都会依据此公式执行推进。
- Kafka 中的定时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用,以此层级调用可以实现定时器间接持有各个层级时间轮的引用。
Kafka中的定时器借了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都加入DelayQueue,“每个用到的TimerTaskList”特指非哨兵节点的定时任务项TimerTaskEntry对应的TimerTaskList。DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头。
Kafka中会有一个线程来获取DelayQueue 中到期的任务列表,有意思的是这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。当“收割机”线程获取 DelayQueue中超时的任务列表 TimerTaskList之后,既可以根据 TimerTaskList 的 expiration来推进时间轮的时间,也可以就获取的TimerTaskList执行相应的操作,对里面的TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。
前面说过,DelayQueue不适合Kafka这种高性能要求的定时任务,为何这里还要引入DelayQueue呢?注意对定时任务项TimerTaskEntry的插入和删除操作而言,TimingWheel时间复杂度为O(1),性能高出DelayQueue很多,如果直接将TimerTaskEntry插入DelayQueue,那么性能显然难以支撑。就算根据一定的规则将若干TimerTaskEntry划分到TimerTaskList这个组中,然后将TimerTaskList插入DelayQueue,如果在TimerTaskList中又要多添加一个TimerTaskEntry时该如何处理呢?对DelayQueue而言,这类操作显然变得力不从心。
但是,Kafka 中的 TimingWheel 专门用来执行插入和删除TimerTaskEntry的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,两者相辅相成。
参考内容
主要参考以来两篇博客以及相关博客推荐,因找的博客比较多,没注意记录,最后好多忘了在哪2333,如果有侵权,请及时联系我,非常抱歉。
https://github.com/Snailclimb/JavaGuidehttps://github.com/CyC2018/CS-Notes
《深入理解 Kafka:核心设计与实践原理》——朱忠华