1. consumer的offset存在哪里?
以前是放在Kafka里, 虽然Kafka持久化是没问题的, 但频繁读写不是太合适, 所以Kafka后来将offset 保存在了一个topic: __consumer_offsets.
2. Kafka文件存储机制
topic在某个partition中以log形式存储, 如果只有一个大log文件, 会遇到以下问题
- 清理过期数据不方便
- 查找定位, 寻址不方便 所以topic采取segment方式, 分片存储数据. Segment分为.log/.index, 分别为数据/索引文件.
3. 如何找到offset=3的Message?
根据.index文件名, 找到offset=3的Message位于哪一个Segment, 再根据index文件内容找到该Message的位置.
4. message存在哪个partition?分为以下情况:
- 没有指明partition但有key的情况下, 将key的hash值与topic的partition数进行取余得到partition值.
- 没有partition及key的情况下, 第一次调用时会生成一个随机数, 以后每次调用递增, 将这个值与topic可用的partition总数取余得到partition值, 即轮询round-robin算法.
5. 什么时候发送ack?
全部的follower同步完成, 才可以发送ack. 原因如下:
-
- 虽然延迟高, 但一般集群内通信速度很快
-
- 选举新的leader时, 容忍n台节点的故障, 只需要n+1个副本
6. 什么是ISR
ISR是in-sync replica set, 即和leader保持同步的follower集合. 当ISR中的follower完成数据的同步后, leader就会给follower发送ack. 如果follower没有向leader同步数据, 则follower将被踢出ISR, 该时间阈值由replica.lag.time.max.ms参数设定. 当leader挂掉后, 将从ISR中选举新的leader.
7. acks的级别
为了在可靠性和延迟中取得平衡, acks提供了3中配置:
- 0: 不等待broker的ack, 当broker故障时可能会丢失数据.
- 1: producer等待broker的ack, partition的leader落盘成功后返回ack, 如果follower同步成功之前leader故障, 则会丢失数据.
- -1: 等待所有的follower同步完成返回ack. 如果在follower同步成功之后, acks发送之前leader发生故障, 则会发送重复数据.
8. 故障处理细节: HW 和 LEO
HW: High Watermark LEO: Log End Offset follower在同步leader时, 有快有慢, 进度不同, 所以不同follower的LEO可能不同. HW则是所有follower中的进度最慢的位置, consumer只能看到HW之前的数据.
当follower故障时, 会被踢出ISR, 等到该follower恢复后, follower会读取本地磁盘记录的HW, 并将log文件高于HW的部分截取掉, 从HW开始向leader进行同步.
当leader故障时, 为了保证不同follower之间数据一致, 其余follower会将各自log高于HW的部分截掉.
这只能保证数据一致性, 不能保证数据丢失或重复.
9. Exactly Once语义
0.11版本后, Kafka引入了幂等性机制(idempotent), 配合acks=-1时at least once语义, 实现了Exactly once语义.
使用时, 只需要enable.idempotence属性设置为true.
10. 分区分配策略
将1-9分给3个consumer:
- roundrobin 1,4,7 2,5,8 3,6,9
- range 1,2,3 4,5,6 7,8,9
11. Kafka高效读写数据
- 顺序读写磁盘: 顺序读写速度达到600M/s, 而随机读取只有100k/s
- 零复制技术: 复制文件时不涉及user space中的Application Cache, 直接copy kernel space中的Page cache
12. Zookeeper在Kafka中的作用
Kafka集群中会有一个broker被选举为Controller, 负责管理集群broker的上下线, 所以topic的分区副本分配和leader选举等工作. Controller的管理工作都是依赖于Zookeeper.
Kafka JavaApi
KafkaProducer发送消息流程
KafkaProducer包括
- main线程
- sender线程: Interceptors, Serializer, Partitioner
- RecordAccumulator 相关参数:
- batch.size: 数据累积到batch.size后, sender才会发送
- linger.ms: 如果数据没达到batch.size, sender等待linger.ms后就会发送数据