kafka 实现原理以及知识点小结
kafka 实现原理以及知识点小结
Kafka的特性:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
Kafka的使用场景:
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源
一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用。 当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance
(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是回来带资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。
消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:- 第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型
消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。
消息消费的可靠性,Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。
- Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,无论其他的partition follower是否写成功。当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。这里需要注意的是,如果ack=1的时候,一旦有个broker宕机导致partition的follower和leader切换,会导致丢数据。
kafka源码分析
核心组件
1。SocketServer + KafkaApis 前者接收所有网络请求,后者处理请求2。KafkaController 负责Controller选举
3。ConsumerCoordinator 前面在分析consumer的时候已经讲过,用于consumer group的负载均衡
4。ReplicaManager 机器的管理
5。KafkaSchedule
kafka controller 选举的机制
整个选举过程是通过zk上的一个临时节点来实现的:/controller节点,其data结构为:核心信息就是记录当前的controller的brokerId。 "version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp 1 当controller挂了,其它所有broker监听到此临时节点消失,然后争相创建此临时节点,谁创建成功,谁就成为新的Controller。 除了/controller节点,还有一个辅助的/controller_epoch,记录当前Controller的轮值数。 (1)KafkaController和ZookeeperLeaderElector内部各有1个Listener,一个监听session重连,1个监听/controller节点变化。 (2)当session重连,或者/controller节点被删除,则调用elect()函数,发起重新选举。在重新选举之前,先判断自己是否旧的Controller,如果是,则先调用onResignation退位。Failover(上任)与Resignation(退位)
关键点:接管所有对broker/partition节点的监听关键点:放弃对所有broker/partition的监听
kafka ReplicaManager
ReplicaManager:负责管理当前broker所有分区和副本的信息,会处理KafkaController发起的一些请求,副本状态的切换,添加/读取消息等 接收controller命令1)接受LeaderAndISRCommand命令 2)接受StopReplicaCommand命令 3)开启定时线程maybeShrinkIsr
1)LeaderAndISRCommand命令 接受到LeaderAndIsrRequest指令时,会调用ReplicaManager的becomeLeaderOrFollower函数 主要是筛选出分配给该broker的partition的副本,并且根据lead是否为该brokerId区分为leader和follower,然后分别进入不同的流程
2)接受StopReplicaCommand命令 当broker stop或用户删除某replica时,KafkaServer会接受到StopReplicaRequest指令,此时会调用ReplicaManager的stopReplicas函数:
3)开启定时线程maybeShrinkIsr
partition的leader定时副本过期检查:
通过ReplicaManager启动时,定期调用maybeShrinkIsr函数来进行处理, 当follower的副本向leader的副本进行数据同步操作时,如果副本已经读取到leader的log的最后的offset部分时,表示这个副本同步达到最新的副本状态,会更新每一个副本的心跳时间,这个函数定期检查这个心跳时间是否超过了配置的时间,如果超过了,就会移出这个副本在isr上的选择。Coordinator
1.Broker端的Coordinator:
Kafka的group management protocol包括以下的动作序列:Group Registration:Group的成员需要向cooridnator注册自己,并且提供关于成员自身的元数据(比如,这个消费成员想要消费的topic)
Group/Leader Selection:cooridnator确定这个group包括哪些成员,并且选择其中的一个作为leader。
State Assignment: leader收集所有成员的metadata,并且给它们分配状态(state,可以理解为资源,或者任务)。
Group Stabilization: 每个成员收到leader分配的状态,并且开始处理。
这里边有三个角色:coordinator, group memeber, group leader.
有这么几个情况:
所有的consumer线程要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。 从group memeber里选出来一个做为leader,由leader来执行性能开销大的协调任务, 这样把负载分配到client端,可以减轻broker的压力,支持更多数量的消费组。
所有group member(指的是consumer线程)都需要发心跳给coordinator,这样coordinator才能确定group的成员。
对于Kafka consumer,它的实际上必须跟coordinator保持连接,因为它还需要提交offset给coordinator。所以coordinator实际上负责commit offset,那么,即使leader来确定状态的分配,但是每个partition的消费起始点,还需要coordinator来确定。
2.Consumer消费者的工作过程:
在consumer启动时或者coordinator节点故障转移时,consumer发送ConsumerMetadataRequest给任意一个brokers。在ConsumerMetadataResponse中,它接收对应的Consumer Group所属的Coordinator的位置信息。 Consumer连接Coordinator节点,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化平衡。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前Consumer Group的新的generation编号。这个时候Consumer Group管理已经完成,Consumer就可以开始fetch数据,并为它拥有的partitions提交offsets。 如果HeartbeatResponse没有错误返回,Consumer会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。3.Coordinator存储的信息有什么
对于每个Consumer Group,Coordinator会存储以下信息:-
对每个存在的topic,可以有多个消费组group订阅同一个topic(对应消息系统中的广播)
-
对每个Consumer Group,元数据如下:
订阅的topics列表
Consumer Group配置信息,包括session timeout等
组中每个Consumer的元数据。包括主机名,consumer id
每个正在消费的topic partition的当前offsets
Partition的ownership元数据,包括consumer消费的partitions映射关系
kafka执行流程
producer生产消息流程
1、构建一个KafkaProducer对象,初始化一些用到的组件,比如缓存区,Sender线程等2、如果配置了拦截器,可用对发送的消息进行可定制化的拦截或更改
3、对Key,value进行序列化
4、根据传入的参数,为消息选择合适的分区,具体怎么选,后面分析
5、将消息按照分区发送到RecordAccmulator暂存,消息按照每个分区进行汇总
6、后台Sender线程被触发后从RecordAccmulator里面获取消息然后构建成ClientRequest,怎么构建后面分析
7、将ClientRequest封装成NetWorkClient准备发送
8、NetWorkClient将请求放入KafkaChannel准备发送,然后执行网络IO,最后发送到kafka server
特殊情况处理流程
1.若有broker宕机 集群会如何处理
BrokerChangeListener监听类,监听/brokers/ids下得brokerid kafka集群具备高可用特性,下线broker上的leader分区自动切换到新的broker节点,客户端链接随之切换至新的节点继续提供服务,还有相关信息未补全,待补全.......