Kafka架构详解及常用参数

友情链接


Kafka介绍

Kafka 和传统的消息系统不同在于:

  • Kafka是一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 消息的持久化;


kafka架构图

image.png

上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。

如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

broker以消息到达的顺序进行存储,每个主题可以配置分区;

分区内部是有顺序的,分区之间不保证顺序;

Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障;

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic;

Topic 和 Partition

在 Kafka 中的每一条消息都有一个 Topic。一般来说在我们应用中产生不同类型的数据,都可以设置不同的主题。

一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。

image.png

kafka的topic和消费者之间是多对多的关系,假设一个topic分为0,1,2,3个分区, 一个消费者组里面假设有3个消费者,每个分区只能由一个消费者组里的一个消费者消费,每个分区的数据是不一样的,不同分区可能由同一个消费者组的不同消费者消费。

Kafka可以将主题划分为多个分区(Partition),会根据分区规则选择把消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力;

分区策略参考:kafka 分区策略

增加分区可以提供kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险;建议分区数=executor数量

在 Kafka 的机器上,每个 Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。

LogSegment 文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为 Segment 索引文件和数据文件。

任何发布到此 Partition 的消息都会被追加到 Log 文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是我们的 Offset。Offset 是一个 Long 型的数字。

我们通过这个 Offset 可以确定一条在该 Partition 下的唯一消息。在 Partition 下面是保证了有序性,但是在 Topic 下面没有保证有序性。

生产者会根据offset值轮循写入partition。

image.png

消费模型

消息由生产者发送到 Kafka 集群后,会被消费者消费。

Kafka 采取拉取模型(Poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。

比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。

Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。

通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可 (从最小值消费)

image.png

高可靠分布式存储模型

在 Kafka 中保证高可靠模型依靠的是副本机制,有了副本机制之后,就算机器宕机也不会发生数据丢失。

高性能的日志存储

Kafka 一个 Topic 下面的所有消息都是以 Partition 的方式分布式的存储在多个节点上。

但是 Kafka 有个问题,如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。

未开安全集群内的kafka consumer和broker信息都是存储在zk上的:

image.png

开启安全之后信息是存储在broker上的__consumer_offset 这个topic

查询offset以及group信息可以通过命令:

./kafka-console-consumer.sh --bootstrap-server 172.22.17.1:9092 --topic __consumer_offsets --consumer.config /root/TDH-Client/kafka/config/consumer.properties --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

image.png

副本机制

Kafka 的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。

当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance)。

Kafka 每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。

在 Kafka 中并不是所有的副本都能被拿来替代主副本,所以在 Kafka 的 Leader 节点中维护着一个 ISR(In Sync Replicas)集合。

翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:

  • 节点必须和 ZK 保持连接;
  • 在同步的过程中这个副本不能落后主副本太多;
  • 另外还有个 AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合;
  • 所以公式如下:ISR = Leader + 没有落后太多的副本;AR = OSR+ ISR

exactly-once

刚好一次,即使 Producer 重试发送消息,消息也会保证最多一次地传递给 Consumer。该语义是最理想的,也是最难实现的。

在 0.10 之前并不能保证 exactly-once,0.11.0 使用事务保证了。

如何实现 exactly-once

要实现 exactly-once 在 Kafka 0.11.0 中有两个官方策略:

单 Producer 单 Topic

每个 Producer 在初始化的时候都会被分配一个唯一的 PID,对于每个唯一的 PID,Producer 向指定的 Topic 中某个特定的 Partition 发送的消息都会携带一个从 0 单调递增的 Sequence Number。

在我们的 Broker 端也会维护一个维度,每次提交一次消息的时候都会对齐进行校验:

  • 如果消息序号比 Broker 维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时 Broker 拒绝该消息,Producer 抛出 InvalidSequenceNumber;
  • 如果消息序号小于等于 Broker 维护的序号,说明该消息已被保存,即为重复消息,Broker 直接丢弃该消息,Producer 抛出 DuplicateSequenceNumber;

如果消息序号刚好大一,就证明是合法的;

上面所说的解决了两个问题:

  • 当 Prouducer 发送了一条消息之后失败,Broker 并没有保存,但是第二条消息却发送成功,造成了数据的乱序;
  • 当 Producer 发送了一条消息之后,Broker 保存成功,Ack 回传失败,Producer 再次投递重复的消息;

上面所说的都是在同一个 PID 下面,意味着必须保证在单个 Producer 中的同一个 Session 内,如果 Producer 挂了,被分配了新的 PID,这样就无法保证了,所以 Kafka 中又有事务机制去保证。


Kafka常用参数

listeners=SASL_PLAINTEXT://ts-mjx-22:9092 监听端口配置(hostname每个节点配置的都不同)
advertised.listeners=SASL_PLAINTEXT://ts-mjx-22:9092 监听端口配置(hostname每个节点配置的都不同)
num.network.threads=2 接收网络请求的线程数
num.io.threads=2 处理磁盘IO的线程数
socket.send.buffer.bytes=1048576 发送缓冲区大小
socket.receive.buffer.bytes=1048576 接收缓冲区的大小
socket.request.max.bytes=104857600 每个请求最大的字节数 为了防止内存溢出,message.max.bytes必然要小于该值
og.dirs=/vdir/mnt/disk1/hadoop/kmq,/vdir/mnt/disk2
/hadoop/kmq,/vdir/mnt/disk3/hadoop/kmq
收到的消息存储在这
num.partitions=2 创建topic不指定分区数的时候,默认2个分区
num.recovery.threads.per.data.dir=1 恢复磁盘时,并发的线程数。RAID阵列建议提高并发
log.flush.interval.messages=10000 当达到上面的消息数量时,会将数据flush到日志文件中。默认10000
log.flush.interval.ms=1000 当达到上面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
log.flush.scheduler.interval.ms=3000 检查是否需要将日志flush的时间间隔
log.retention.hours=168 日志7天会被删除
log.retention.bytes=-1 达到多少字节会被删除
log.segment.bytes=536870912 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.retention.check.interval.ms=300000 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
log.roll.hours=168 当达到这个时间,会强制新建一个segment
log.index.size.max.bytes=10485760 对于segment日志的索引文件大小限制
log.index.interval.bytes=4096 索引计算的一个缓冲区,一般不需要设置。

zookeeper方面的配置:

zookeeper.connect=ts-mjx-21,ts-mjx-22,ts-mjx-23

zookeeper.connection.timeout.ms=6000 连接zk的超时时间
zookeeper.session.timeout.ms=6000 ZK客户端与服务器的连接超时时间
zookeeper.sync.time.ms=2000 ZooKeeper集群中leader和follower之间的同步时间
replica.high.watermark.checkpoint.interval.ms=5000 每个replica将最高水位进行flush的时间间隔
partition.assignment.strategy=roundrobin 分区策略
controlled.shutdown.max.retries=3 控制器关闭的尝试次数
offsets.retention.minutes=10080 offset保留时间
replica.fetch.max.bytes=1000000 replicas每次获取数据的最大字节数
log.segment.bytes.per.topic=100000 单topic日志segment文件的大小
queued.max.requests=500 等待IO线程处理的请求队列最大数
fetch.purgatory.purge.interval.requests=10000 读取请求清除的间隔
replica.socket.receive.buffer.bytes=65536 leader复制的socket缓存大小
replica.fetch.wait.max.ms=500 replicas 同leader之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes=500 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小
log.roll.hours.per.topic=1 单topic强制新建一个segment的时间
controller.message.queue.size=10 controller-to-broker-channels 消息队列的尺寸大小
replica.socket.timeout.ms=30000 leader与replicas的socket超时时间
num.replica.fetchers=1 leader中进行复制的线程数,增大这个数值会增加relipca的IO
replica.lag.time.max.ms=10000 如果replicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此replicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.
producer.purgatory.purge.interval.requests=10000 生产者请求清理的清理间隔。
message.max.bytes=1000000 消息体的最大大小,单位是字节
controller.socket.timeout.ms=30000 partition management controller 与replicas之间通讯的超时时间
controlled.shutdown.retry.backoff.ms=5000 每次关闭尝试的时间间隔
default.replication.factor=1 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数
auto.create.topics.enable=true 是否允许自动创建topic ,若是false,就需要通过命令创建topic
controlled.shutdown.enable=false 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
auto.commit.enable = true true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset消息的确认模式
request.required.acks = 0 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
1:发送消息,并会等待leader 收到确认后,一定的可靠性
-1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
request.timeout.ms = 10000 消息发送的最长等待时间


评论
登录后可评论
发布者
星小环分享号
官方
文章
193
问答
252
关注者
27
banner
关注星环科技
获取最新活动资讯

加入TDH社区版技术交流群

获取更多技术支持 ->

扫描二维码,立即加入