kafka
Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
概念
主题(Topic)与分区(Partition)
在 Kafka 中,消息以**主题(Topic)**来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数 目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。
我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入**分区(Partition)**的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。
Broker 和集群(Cluster)
一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。
若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:
Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些 限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。
多集群
随着业务发展,我们往往需要多集群,通常处于下面几个原因:
- 基于数据的隔离;
- 基于安全的隔离;
- 多数据中心(容灾)
当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。
上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。
术语总结
- 消息记录 (record):由一个 key,一个 value 和一个时间戳构成,消息最终存储在主题下的分区中
- 记录在生产者中称为生产者记录 (ProducerRecord),在消费者中称为消费者记录 (ConsumerRecord)
- 在一个可配置的时间段,Kafka 集群保持所有的消息,直到它们过期, 无论消息是否被消费了
- 比如,如果消息的保存策略被设置为 2 天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka 的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
- 生产者 (producer):生产者用于发布 (send) 消息
- 消费者 (consumer):消费者用于订阅 (subscribe) 消息
- 消费者组 (consumer group):相同的 group.id 的消费者将视为同一个消费者组,每个消费者都需要设置一个组 id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
- 主题 (topic):消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
- 分区 (partition):消息的一种物理分组, 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加
- 分区中的每个消息都被分配了一个唯一的 id,称之为偏移量 (offset) 在每个分区中偏移量都是唯一的。
- 每个分区对应一个逻辑 log,有多个 segment 组成。
- 偏移量 (offset):分区中的每个消息都一个一个唯一 id,称之为偏移量,它代表已经消费的位置。
- 可以自动或者手动提交偏移量(即自动或者手动控制一条消息是否已经被成功消费)
- 代理 (broker):一台 kafka 服务器称之为一个 broker
- 副本(replica):副本只是一个分区(partition)的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
- 领导者(leader):Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当 Leader, producer 和 consumer 只跟 leader 交互
- 追随者 (follower):跟随领导者指令的节点被称为 Follower。 如果领导失败,一个追随者将自动成为新的领导者。
- 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。它是 replica 中的一个角色,从 leader 中复 制数据。
- zookeeper:Kafka 代理是无状态的,所以他们使用 ZooKeeper 来维护它们的集群状态。ZooKeeper 用于管理和协调 Kafka 代理
基础架构
-
Producer:生产者,消息的产生者,是消息的入口。
-
kafka cluster:
- Broker:Broker 是 kafka实例,每个服务器上有一个或多个 kafka 的实例,我们姑且认为每个 broker 对应一台服务器。
- 每个 kafka 集群内的 broker 都有一个不重复的编号,如图中的 broker-0、broker-1 等……
- Topic:消息的主题,可以理解为消息的分类,kafka 的数据就保存在 topic。在每个 broker 上都可以创建多个 topic。
- Partition:Topic 的分区,每个 topic 可以有多个分区,分区的作用是做负载,提高 kafka 的吞吐量。
- 同一个 topic 在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹!
- Replication:每一个分区都有多个副本,副本的作用是做备胎。
- 当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。
- 在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,
- follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包 括自己)。
- Broker:Broker 是 kafka实例,每个服务器上有一个或多个 kafka 的实例,我们姑且认为每个 broker 对应一台服务器。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据,这也是为了提高 kafka 的吞吐量!
Zookeeper:kafka 集群依赖 zookeeper 来保存集群的的元信息,来保证系统的可用性。
Kafka 的一些设计与实现
Kafka 存储在文件系统上
是的,您首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以与网络速度一样快。
现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所 有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。
上述的 Topic 其实是逻辑上的概念,面相消费者和生产者,物理上存储的其实是 Partition,每一个 Partition 最终对应一个目录,里面存储所有的消息和索引文件。默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹,这里的命名规则是:<topic_name>-<partition_id>
。
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。
每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition 中。
底层存储设计
假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「topic1」和「topic2」,Partition 数量分别为 1、2,那么我们的根目录下就会创建如下三个文件夹:
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。
现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
**Segment 是 Kafka 文件存储的最小单位。**Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
其中以索引文件中元数据 <3, 497>
为例,依次在数据文件中表示第 3 个 message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497。
注意该 index 文件并不是从 0 开始,也不是每次递增 1 的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。
因为其文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。
由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这种顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。
Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构 ,在 Partition 中的每一条 message 都包含了以下三个属性:
- offset:表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 id;
- MessageSize:表示 message 内容 data 的大小;
- data:message 的具体内容
生产者设计概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:
流程如下:
- 首先,我们需要创建一个 ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
- 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
- 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的 Kafka broker。
- 当 broker 接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的 RecordMetadata 对象,否则返回异常。
- 生产者接收到结果后,对于异常可能会进行重试。
消费者设计概要
消费者与消费组
假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将 会收到这 4 个分区的消息,如下所示:
如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。
另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
**Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。**换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。
最后,总结起来就是:
- 如果应用需要读取全量消息,那么请为该应用设置一个消费组;
- 如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。**不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。**而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳(heartbeat)到一个作为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者 会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
Partition 与消费模型
上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition 中被消费的消息是何时删除的? Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。 Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:
为什么 Kafka 是 pull 模型
消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。
**push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。**push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
**对于 Kafka 而言,pull 模式更合适。**pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。