kafka的架构及常见面试题

一、介绍

Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目 。

Kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用 。

二、架构

1)生产、消费

image-20230819164024482

首先得了解这个,比较简单的一个集群图

  • 生产者(Producer):生产消息,发送消息的服务

  • 消费者(Comsumer):消费消息,处理消息的服务

2)每一个kafka实例中有什么

image-20230819165134280

如上图,只画了其中一个,具体看看里面是什么

  • broker:一个kafka进程就是一个broker,也就可以这样理解,集群中每一台kafka服务就是broker

  • 主题(topic):在发布订阅的模式下,我们需要对消息进行一个区分,同一个功能的消息,我们发往同一个主题下

  • 分区(Partition):可以看到每一个主题topic下,有多个分区。消息会推送到这些分区里面,可以增加生产者,消费者的对消息生产处理的吞吐量

在没有画出来了另外两个kafka中,我们会推选出领导Leader,以及追随者Follower,这个我们后面再聊

3)简单的消费

image-20230819170659700

如上图,对于消费者如何消费分片中的消息的,其中有下面几点的解释

  1. 一个Partition只能由一个Consumer来消费,一个Consumer可以消费多个不同的Partition。所以,我们应该保证每个主题的Partition的数量大于Consumer的数量

  2. Consumer越多,则吞吐量越高,消费得越快。当然,要结合第一点

  3. Consumer增加或减少时,PartitionConsumer的消费关系会自动调整

4)带group的消费

在上一节看到了简单的消费,那只不过是同一个group下,接下来引入group这个概念

image-20230819172057856

上面第三节说的都不错,在这里就是要加一个前提条件,一个Partition的消息同一个group中的一个Consumer来消费,

交给了同组的某个Consumer,就不能交给同组的其他Consumer

每一个group都可以完整消费主题中的所有消息

5)消费partition里面的消息

image-20230819174659549

如上图,有以下几点特性

  • 一个Partition内部的消息是有序的,越新的消息offset越大。不同partition的消息根据offset无法比较新l旧

  • Consumer顺序地消费partition里的每一条消息,可以每读一条就向kafka上报(commit),当前读到了哪个位置(offset),也可以间隔性上报

    • 可以读几条再上报offset,比如说每读5条,上报更新一下offset
    • 也可以时间间隔的方式上报offset,比如说每隔5s上报更新
  • Consumer重启时kafka根据该group上一次提交的最大offset来决定从哪个地方开始消费。

    • 这里就会出现重复消费的问题,而解决这个重复消费的问题,是面试中的高频问题。
  • 不同group之间,记录的offset是不同的,这也是上一节每个group独立消费topic的消息的原因

6)生产消息,写入Partition

image-20230819194314285

关于生产者生产消息至Partition,有三种情况,按照优先级这样排序

  1. 生产者可以指定Partition进行写入

  2. 通过消息携带的key,再通过hash分发器计算得到结果,来决定去哪个Partition

  3. 按照时间片轮动选择Partition。比如说当前5分钟,往Partition 0中写入;下一个5分钟,往Partition 1中写入

7)生产消息,写入Partition应答ack

在上面一节,我们确定了partition存储是哪个,接下来还有一个问题,就是如果是kafka集群架构的话,我们会出现同个Partition,有一个Leader,多个Follower

  1. 在上面确定partition后,我们要去寻找它的Leader

  2. Leader partition将消息写入本地磁盘

    1. 当写入完成后,向Producer进行应答响应
  3. Follower partition会将消息从Leader那拉回来,写入自己的本地磁盘

    1. 当写入完成后,向Leader进行应答响应
    2. leader收到所有的Follower应答后,再向Producer应答

那么在此刻,生产消息的应答ack有三种策略

  1. 完全不管ack应答

  2. Producer只需要Leader Partition应答即可,不用管Follower Partition是否写入成功

  3. partition需要保证所有的Follower才进行应答

8)Partition备份机制

kafka集群中,我们有Partition的备份机制,如下

image-20230819195525972

同一个主题下,集群中的每个broker,都会维护自己的Partition

  1. 其中,他们会选出LeaderFollower,生产者的数据优先推送给Leader

  2. 每一个Partition都有自己的Leader

  3. 同一个Topic下的,不同Partition尽量分布在不同的broker

当有leaderbroker宕机后,kafka集群会重新竞选那台broker上原本是leaderPartition,和下面ISR队列有关。

9)消息的磁盘存储文件结构

  • 分区Partition,一个Topic中有多个Partition,可以有效地避免了消息的堆积

  • 分段segment,消息在Partition里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt一个segment包括一个log文件,两个index文件,三个文件成套出现。前面数字的文件名代表着offset偏移量开始索引位置

    • 000000101.log:存储具体消息的数据文件
    • 000000101.index:存储Consumeroffset便宜量的索引文件
    • 000000101.timeindex:存储消息时间戳的索引文件
  • 索引indexkafka分段后的数据建立的索引文件

如下图

image-20230820142002237

可以看到上面有两个索引文件,

  • index文件是记录offset消息和log文件中消息位置的映射关系的文件

  • timeindex文件时记录时间戳和offset关系的文件

请注意,这边的索引并不会记录每一条消息的索引,而是采取稀疏索引,也就是隔一段消息才会记录消息的索引。

这个消息索引的稠密程度,影响kafka存储读取的速度

索引越稠密,则读取的速度越快

索引越稀疏,则文件存储的空间越大

由于上面存储文件都是采用offset偏移量来命名,所以kafka会采取二分查找方法,可以大大提交检索效率。

三、面试题

1)如何避免kafka消息丢失

1.1)出现消息丢失的原因

从上面架构上来看,kafka丢失消息的原因主要可以分为下面几个场景

  1. Producer在把消息发送给kafka集群时,中间网络出现问题,导致消息无法到达

    1. 网络抖动原因
    2. Producer消息超出大小限制,broker收到以后没法进行存储
  2. kafka集群接收到消息后,保存消息至本地磁盘出现异常

    1. 集群接收到数据后会将数据进行持久化存储到磁盘,消息都是先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘。如果此时Broker宕机,且选举了一个落后leader副本限多的follower副本成为新的leader副本,那么落后的消息数据就会丢失。
  3. Consumer在消费消息时发生异常,导致Consumer端消费失败

    1. 消费者配置了offset自动提交参数,enable.auto.commit=true。消费者接受到了消息,进行了自动提交。但其实消费者并没有处理完成,就宕机了,此时kafka认为Consumer已经消费了这条消息了,后续便不再分配,造成了消息的丢失

1.2)解决方法——Producer消息发送消息失败

关于上面Producer消息发送消息失败的解决方法,总结归纳出五种,可以结合使用

  1. 生产者调用异步回调消息。伪代码如下:producer.send(msg, callback)

  2. 生产者增加消息确认机制,设置生产者参数:acks=allpartitionleader接收到消息,等待所有的follower副本都同步到了消息之后,才认为本次生产者发送消息成功了。

  3. 生产者设置重试次数。比如:retrie>=3,增加重试次数以保证消息的不丢失

  4. 定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。

  5. 后台提供一个补偿消息的工具,可以手工补偿。

1.3)解决方法——broker写入磁盘失败

  1. 同步刷盘(不太建议)。同步刷盘可以提高消息的可靠性,防止由于机器没有及时写入磁盘的消息丢失。但是会严重影响性能

  2. 利用Partition的多副本机制(建议)。使用下面的这段配置,

    1. unclean.leader.election.enable=false:表示不允许非ISR中的副本被选举为leader,以免数据丢失
    2. replication.factor>=3:消息分区的副本个数,建议设置大于等于3
    3. min.insync.replicas>1:这个值大于1,要求leader至少能和一个Follower副本保证联系

1.4)解决方法——Consumer消费异常

  1. 消费者需要关闭自动提交,采用手动提交offsetenable.auto.commit=false,并在代码中写入

    1
    2
    3
    4
    // 同步提交
    consumer.commitSync();
    // 异步提交
    consumer.commitAsync();

2)如何避免重复消费消息

这实际上是一个消息的幂等性问题

幂等性是指一个操作可以被重复执行,但结果不会改变的特性。在消息队列中,幂等性是指在消息消费过程中,保证消息的唯一性,不会出现重复消费的情况 。

我们有以下几个方案可以解决

  1. 对于一些业务相关的消息,我们通常有需要处理的消息业务主键。比如说,发送短信的发送流水号,支付业务的订单流水号等。

    1. 当消费者接受到消息后,使用这个消息主键建立获取分布式锁,同时将消息业务主键写入库。
    2. 如果第一步成功,消费者进行消费
    3. 当消费者处理完成后,释放分布式锁
    4. 如果有一条重复的消息进入,那么在第一步中就会失败,要么是分布式锁,要么是数据库主键冲突
  2. 针对没有业务的消息,可以再生产消息的时候给予一个分布式全局ID,后面的处理方法与第一条类似

  3. 在有状态流转的业务当中,一个消费者只消费一种业务状态,当这个消息的业务状态已经更新、已经处理。那么直接丢弃掉此次消息即可

  4. 乐观锁,消息在生产的时候携带业务上一次查询出的版本号,在消费时携带版本号去更新数据库。如果乐观锁原因导致失败,那么不需要进行后续处理

  5. insert ... on duplicate key update,消费插入数据时,数据已存在则进行更新

3)kafka的零拷贝是什么原理

  1. 第一次:将磁盘文件,读取到操作系统内核缓冲区;

  2. 第二次:将内核缓冲区的数据,copy 到 application 应用程序的 buffer;

  3. 第三步:将 application 应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区(属于操作系统内核的缓冲区);

  4. 第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。

如下图

image-20230820130332350

取消掉两次CPU的拷贝,从而减小CPU的消耗。

零拷贝是操作系统提供的,如Linux上的sendfile命令,是将读到内核空间的数据,转到 socket buffer,进行网络发送

还有Java NIO中的transferTo()方法

4)kafka如何在分布式的情况下保证顺序消费

kafkabroker中,主题下可以设置多个不同的partition,而kafka只能保证Partition中的消息时有序的,但没法保证不同Partition的消息顺序性。

比如说,有一个主题Topic A,里面有两个Partition,但消费端只有一个Consumer。根据上面的架构可以知道,这个Consumer会消费两个Partition中的消息,这样就肯定会出现消费乱序的情况。

那么针对上面这种乱序的情况,我们可以这样进行设置

  1. 一个主题只建立一个Partition,这样所有的消息也就只会发送到一个Partition中,也就保证了消息的顺序性。

    1. Producer也可以指定往一个partition中发送消息。具体可以查看第二章第6节
  2. 可以保证一个Partition只能被一个Consumer消费,也可以保证消息的有序性消费。但也要避免Rebalance,原本一对一好好的,Consumer宕机或者下线导致Rebalance就会导致消费的乱序。

5)kafka为什么这么快

主要原因有下面几个

  1. 磁盘写入采用了顺序读写,保证了消息的堆积

    • 顺序读写,磁盘会预读,预读即在读取的起始地址连续读取多个页面,主要时间花费在了传输时间,而这个时间两种读写可以认为是一样的。
    • 随机读写,因为数据没有在一起,将预读浪费掉了。需要多次寻道和旋转延迟。而这个时间可能是传输时间的许多倍。
  2. 零拷贝:第3节提到过,避免了两次CPU拷贝,减少了CPU的消耗

  3. 分区、分段、索引,再配合二分查找检索,提高消息的检索效率

    • 分区Partition,有效避免了消息的堆积
    • 分段segment,消息在Partition里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt
    • 索引indexkafka分段后的数据建立的索引文件,就是第二章第9节的文件存储结构
  4. 批量压缩读写

    • 多条数据一起压缩,存储,读取
    • kafka是直接操作的page cache,而不是堆内对象,读写速度更高。且进程重启后,缓存也不会丢失

6)什么是ISR,它有什么用

kafka中,除了有ISR,还有OSRAR,功能如下

  • ISR(InSyncRepli):在kafka中,当一个broker宕机挂掉的时候,原本在其brokerLeader Partition会重新进行竞选。这个竞选基本从ISR队列中选举。那么现在可以这样说,ISR是一个维护了Follower Partition的队列,其中的Partition都与Leader Partition消息保持一致。

  • OSR(OutSyncRepli):没在ISR队列中的其他Follower Partition组成的队列

  • AR(AllRepli):全部分区的Follower Partition,也就是ISROSRPartition总和

7)kafka中的Rebalance是什么,什么时候会触发

Rebalance是指PartitionConsumer之间的关系需要重新调整分配,这个重新调整分配的动作称为Rebalance

那么当出现下面几种情况的时候,会触发Rebalance

  1. 当一个Group中的Consumer新增后

  2. 当一个Group中的Consumer离开后,比如说宕机

  3. Topic下的Partition数量发生变化后

总之,两边的关系数量发生变化的话,都会触发Rebalance

8)当kafka出现消息积压时,该怎么办

当出现上面这种情况的时候,要么就是Consumer挂掉了或者消费水平太低,要么就是Producer消息太多,间接导致Consumer消费不及时。

针对上面这种情况,我们可以有以下的解决方案,可以结合使用

  1. 提高Consumer的数量,可以通过增加消费者组中的Consumer数量或者增加Consumer实例来实现。这样每个Consumer可以并行处理消息,提高整体消费能力。

  2. 增加Partition分区数量,在kafka中,可以设置主题下的Partition,将消息分散至更多的Partition中,配合第一点方案提高整体的消费能力

  3. 提高Consumer的消费能力,优化消费者的处理能力,确保Consumer能够快速处理每条消息。将Consumer处理消息的速度优化至高于Producer生产消息的速度。在不破坏代码业务逻辑的情况下,也可以使用异步处理来消费消息。

在面试过程中,第三点方案是至关重要的,很多企业由于硬件资源的原因,没有增加Consumer的数量,没有增加Partition数量的空间。故此,Consumer优秀的消费能力,就成了他们考察的目标了。

四、最后

我是半月,你我一同共勉!!!