kafka如何保证消息不丢失

Posted by Clear Blog on May 20, 2018

高可用的实现

kafka的高可用机制即每个topic会配置多个partition,每个partition负责存储topic的部分数据, 而每个partition又会有多副本做冗余,其中有一个副本为leader副本,其他为follower副本。

副本间的数据同步

如果有一个客户端往一个Partition写入数据,此时一般就是写入这个Partition的Leader副本。 然后Leader副本接收到数据之后,Follower副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。

ISR

我们查看topic信息的时候都会打印出一个ISR的集合, ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。

acks

往kafka写数据的时候,就可以来设置这个acks参数。然后这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

  1. acks = 0

意思就是我的KafkaProducer在客户端,只要把消息发送出去, 不管那条数据有没有在哪怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了。 如果你采用这种设置的话,那么你必须注意的一点是,可能你发送出去的消息还在半路。 结果呢,Partition Leader所在Broker就直接挂了,然后结果你的客户端还认为消息发送成功了,此时就会导致这条消息就丢失了。

  1. acks = 1

kafka的默认配置就是acks=1,意思就是说只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了, 不管他其他的Follower有没有同步过去这条消息了。这里有一个问题,万一Partition Leader刚刚接收到消息, Follower还没来得及同步过去,结果Leader所在的broker宕机了,此时也会导致这条消息丢失,因为人家客户端已经认为发送成功了。

  1. acks = all

意思就是说,Partition Leader接收到消息之后, 还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。 如果说Partition Leader刚接收到了消息,但是结果Follower没有收到消息, 此时Leader宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。 此时可能Partition 2的Follower变成Leader了, 此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了。

__consumer_offsets的高可用配置

修改server.properties中的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# num.partitions=1 默认为1
num.partitions=3


############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.建议修改为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

数据刷新策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# 消息直接被写入文件系统,但是默认情况下我们仅仅调用fsync()以延迟的同步系统缓存
# There are a few important trade-offs here:
# 这些有一些重要的权衡
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
#    1. 持久性:如果不使用复制,未刷新的数据可能会丢失。
#    2. 延迟:非常大的刷新间隔可能会在刷新时导致延迟,因为将会有大量数据刷新。
#    3. 吞吐量:刷新通常是最昂贵的操作,而一个小的刷新间隔可能会导致过多的搜索。
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# 下面的设置允许你去配置刷新策略,每隔一段时间刷新或者一次N个消息(或者两个都配置)。这可以在全局范围内完成,并在每个主题的基础上重写。
 
# The number of messages to accept before forcing a flush of data to disk
# 在强制刷新数据到磁盘之前允许接收消息的数量
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
# 在强制刷新之前,消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000

日志持久化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# 以下的配置控制了日志段的处理。策略可以配置为每隔一段时间删除片段或者到达一定大小之后。
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# 当满足这些条件时,将会删除一个片段。删除总是发生在日志的末尾。
 
# The minimum age of a log file to be eligible for deletion
# 一个日志的最小存活时间,可以被删除
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 每一个日志段大小的最大值。当到达这个大小时,会生成一个新的片段。
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 检查日志段的时间间隔,看是否可以根据保留策略删除它们
log.retention.check.interval.ms=300000