# 数据文件存储-可靠性保证-高可用

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
相关源码参考: KAFKA (opens new window)


# 数据存储流程和日志机制

Kafka 使用 Topic → Partition → Segment 的分级结构:

  • 每个分区(Partition)对应多个段(Segment)
  • 每个段包含两个文件:.log(数据)和 .index(稀疏索引)
  • index 文件不会为每条消息建索引,而是每隔固定字节建立索引,节省内存,查询时做少量顺序扫描。
index文件中并没有为每一条message建立索引,采用了稀疏存储的方式
每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中
缺点是没有建立索引的数据在查询的过程中需要小范围内的顺序扫描操作。
1
2
3

  • 配置文件 server.properties
# The maximum size of a log segment file. When this size is reached a new log segment will be created. 默认是1G,当log数据文件大于1g后,会创建一个新的log文件(即segment,包括index和log)可以调整segment大小防止频繁创建segment
log.segment.bytes=1073741824
1
2
  • 例子(当segment超过最大容量时)
#分段一
00000000000000000000.index  00000000000000000000.log
#分段二 数字 1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息的offset+1
00000000000000001234.index  00000000000000001234.log
#分段三
00000000000000088888.index  00000000000000088888.log
1
2
3
4
5
6

# 分布式系统的 CAP 理论

CAP定理: 指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可同时获得

  • C 一致性:所有节点都可以访问到最新的数据;锁定其他节点,不一致之前不可读
  • A 可用性:每个请求都是可以得到响应的,不管请求是成功还是失败;被节点锁定后 无法响应
  • P 分区容错:除了全部整体网络故障,其他故障都不能导致整个系统不可用,;节点间通信可能失败,无法避免

现实中,网络故障不可避免 → P必须保证,因此只能在C 和 A 中取舍

CA: 如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的

CP: 如果不要求A(可用),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统

AP:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。
1
2
3
4
5
  • 结论:
    • 分布式系统中P,肯定要满足,所以只能在CA中二选一
    • 没有最好的选择,最好的选择是根据业务场景来进行架构设计
    • CP : 适合支付、交易类,要求数据强一致性,宁可业务不可用,也不能出现脏数据
    • AP: 互联网业务,比如信息流架构,不要求数据强一致,更想要服务可用

# 副本机制 Replica + ACK

Kafka 通过 副本机制(Replica)ACK 应答机制 来提升数据可靠性。

  1. Kafka 的副本机制(Replica)

    • 副本数量:每个 Topic 的 Partition 可以配置多个副本(replica),一般小于 broker 数量。

      副本角色

      • 每个 Partition 中有 1 个 Leader 副本,负责接收和处理读写请求。
      • 剩余的是 Follower 副本,从 Leader 同步数据,不对外提供服务。

      故障转移:如果 Leader 宕机,会从同步良好的 Follower 中选出新的 Leader。

  2. Producer 发送数据的 ACK 应答机制

    当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别

    Producer 发送数据后,根据 acks 参数确认数据是否成功写入:

    acks 设置 含义 优点 缺点
    0 不等待任何确认 吞吐量最高 数据易丢失
    1(默认) Leader 写入磁盘即返回成功 性能较好 Leader 宕机前未同步可能丢数据
    all(或 -1 所有 ISR 副本成功写入后才确认 最可靠 吞吐量降低、可能重复发送
    • 不同情况可能产生的问题

      • ack=0

        • 发送出去的消息还在半路,或者还没写入磁盘, Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失
      • ack=1(默认)

        • 万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了
      • ack= all(即-1)

        • leader会维持一个与其保持同步的replica集合,该集合就是ISR,leader副本也在isr里面

        • 问题一:如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复

          • 数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复
        • 问题二:acks=all 就可以代表数据一定不会丢失了吗

          • Partition只有一个副本,也就是一个Leader,任何Follower都没有
          • 接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用
          • 在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数设定 ISR中的最小副本数是多少,默认值为1,改为 >=2,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常

# in-sync replica set机制

什么是 ISR?

  • ISR(In-Sync Replica Set,同步副本集合)是 Kafka 为了保证数据可靠性设计的重要机制。

    • 每个分区的 Leader 副本 会维护一个 ISR 列表,表示哪些 Follower 副本当前与 Leader 保持数据同步

    • ISR 会随着副本同步状态的变化而动态调整。

ISR 的工作机制:

  • 同步机制
    • 当 Producer 写入数据到 Leader 后,Leader 会同步数据给 ISR 中的所有 Follower。
    • Follower 同步成功后会向 Leader 发送 ACK。
  • 踢出机制
    • 如果某个 Follower 长时间未同步数据(超过 replica.lag.time.max.ms 时间),就会被踢出 ISR,变成不同步副本。
  • 选举机制
    • 如果 Leader 宕机,Kafka 会从 ISR 中选出新的 Leader,确保数据一致性。
    • 只有 ISR 中的副本才有资格成为新 Leader。

相关概念对比

缩写 全称 含义
AR Assigned Replicas 某个分区的所有副本,包括 Leader 和所有 Follower
ISR In-Sync Replicas 当前与 Leader 保持同步的副本子集
OSR Out-of-Sync Replicas 与 Leader 同步滞后的副本(未在 ISR 中)

# High Watermark(HW)机制详解

  1. 为什么需要 High Watermark?

    • ACK 机制 保证了生产者消息发送的可靠性;
    • 多副本机制 保证了消息存储的可靠性。

    但还有一个问题:消费者如何确保读取到的数据是可靠的、不丢失也不错误的? 这时,就需要 High Watermark(简称 HW) 来保证消费者读取的是 “所有副本都确认” 的消息。

  2. 什么是 HW?

    HW:所有 ISR 副本中 LEO(Log End Offset,日志末尾偏移量)的最小值。

    也就是说,只有当消息被 ISR 中所有副本都同步成功,才会更新 HW。 消费者最多只能消费到 HW 之前的数据,HW 之后的消息即使存在于 Leader 中,消费者也不可见,以防数据丢失。

  3. HW 的作用

    保证消费者数据一致

    举例说明:

    • 消费者从 Leader 消费到了 offset=15,准备消费 offset=16。
    • 恰好此时 Leader 宕机,新的 Leader 由 ISR 中某个 follower 提升。
    • 如果新的 Leader 没有 offset=16,而消费者已经消费了,就会发生数据“回退”或“丢失”。

    有了 HW:

    • 消费者只会消费到 HW(即所有副本都确认的最小位置)。
    • 即使 Leader 换了,只要副本都能回到 HW,就能保证数据一致,避免消费错误或缺失。
  4. HW 与副本故障的处理

    • # Follower 宕机

      • Follower 被临时踢出 ISR。
      • 恢复后,从本地磁盘读取上次记录的 HW。
      • 截断超过 HW 的数据,从 HW 开始重新向 Leader 同步。
      • 当其追上 Leader(LEO ≥ HW)后,重新加入 ISR。
    • # Leader 宕机

      • 从 ISR 中选出一个新的 Leader。
      • 新 Leader 保留数据不变。
      • 其他 Follower 会截断超过 HW 的部分,确保和新 Leader 一致。
      • 然后重新同步数据,重建 ISR。

# 日志数据清理

  • Kafka将数据持久化到了硬盘上,为了控制磁盘容量,需要对过去的消息进行清理

  • 问题:如果让你去设计这个日志删除策略,你会怎么设计?【原理思想】很重要的体现,下面是kafka答案

    • 内部有个定时任务检测删除日志,默认是5分钟 log.retention.check.interval.ms
    • 支持配置策略对数据清理
    • 根据segment单位进行定期清理
  • 启用cleaner

    • log.cleaner.enable=true
    • log.cleaner.threads = 2 (清理线程数配置)
  • 日志删除

    • log.cleanup.policy=delete
    #清理超过指定时间的消息,默认是168小时,7天,
    #还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级高到低
    log.retention.hours=168
    
    #超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制
    log.retention.bytes=1073741824
    
    还有基于日志起始位移(log start offset),未来社区还有更多
    
    1
    2
    3
    4
    5
    6
    7
    8
    • 基于【时间删除】 日志说明
    配置了7天后删除,那7天如何确定呢?
    
    每个日志段文件都维护一个最大时间戳字段,每次日志段写入新的消息时,都会更新该字段
    
    一个日志段segment写满了被切分之后,就不再接收任何新的消息,最大时间戳字段的值也将保持不变
    
    kafka通过将当前时间与该最大时间戳字段进行比较,从而来判定是否过期
    
    1
    2
    3
    4
    5
    6
    7
    • 基于【大小超过阈值】 删除日志 说明
    假设日志段大小是500MB,当前分区共有4个日志段文件,大小分别是500MB,500MB,500MB和10MB
    
    10MB那个文件就是active日志段。
    
    此时该分区总的日志大小是3*500MB+10MB=1500MB+10MB
    
    如果阈值设置为1500MB,那么超出阈值的部分就是10MB,小于日志段大小500MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;
    
    如果阈值设置为1000MB,那么超过阈值的部分就是500MB+10MB > 500MB,此时Kafka会删除最老的那个日志段文件
    
    注意:超过阈值的部分必须要大于一个日志段的大小
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    • log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除
  • 日志压缩

    • log.cleanup.policy=compact 启用压缩策略
    • 按照消息key进行整理,有相同key不同value值,只保留最后一个

# 高性能奥秘 —— 零拷贝(ZeroCopy)原理解析

  1. Kafka 为什么性能高?

    Kafka 能处理高吞吐的关键原因之一,就是使用了 零拷贝(ZeroCopy) 技术,大大减少了数据在内存中的复制次数,从而降低了 CPU 开销和上下文切换。

  2. 传统文件读取流程(4 次拷贝)

    当操作系统读取文件并通过网络发送时,如果不使用零拷贝,数据会经历以下过程:

    1. 文件从磁盘读取到 内核态缓冲区(Kernel Buffer)
    2. 内核态缓冲区数据复制到 用户态缓冲区(User Buffer)
    3. 用户程序调用 write 方法,数据从用户态复制回 内核 Socket Buffer
    4. 最终 Socket Buffer 数据传输到 网卡(NIC)

    ✅ 总共涉及了 4 次数据拷贝 和多次用户态 ↔ 内核态的上下文切换,效率较低。

  3. ZeroCopy 技术原理

    ZeroCopy(零拷贝) 的目标是:跳过用户态,让内核直接将数据从磁盘传输到网卡,避免无意义的复制。

    Kafka 使用的是操作系统提供的 sendfile() 系统调用实现 ZeroCopy。

    # 过程变为:

    1. 数据从磁盘直接送入内核;
    2. 内核直接将数据送入 Socket Buffer;
    3. 数据直接传送到网卡 —— 跳过用户态,拷贝次数减少为 2 次

    ✅ 避免了用户态和内核态之间的两次拷贝,大幅提升了传输效率

# 高性能原理分析归纳总结

  • kafka高性能

    • 存储模型,topic多分区,每个分区多segment段

    • index索引文件查找,利用分段和稀疏索引

    • 磁盘顺序写入

    • 异步操作少阻塞sender和main线程,批量操作(batch)

    • 页缓存Page cache,没利用JVM内存,因为容易GC影响性能

    • 零拷贝ZeroCopy(SendFile)

# 事务消息实战

  1. POM

    server:
      port: 8080
    
    logging:
      config: classpath:logback.xml
    
    spring:
      kafka:
        bootstrap-servers: xxx:9092
    
        producer:
          # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
          retries: 1
          #一个批次可以使用的内存大小
          batch-size: 16384
          # 设置生产者内存缓冲区的大小。
          buffer-memory: 33554432
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
          acks: all
          #事务id
          transaction-id-prefix: yym-tran-
    
        # 如果不需要事务,可非事务模式 以创建另一个非事务 KafkaProducer
        producer-non-transactional:
          retries: 1
          batch-size: 16384
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          acks: all
    
        consumer:
          # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
          auto-commit-interval: 1S
    
          # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
          auto-offset-reset: earliest
    
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: false
    
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
        listener:
          #手工ack,调用ack后立刻提交offset
          ack-mode: manual_immediate
          #容器运行的线程数
          concurrency: 4
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
  2. kafka配置

    @Configuration
    public class KafkaConfig {
    
        @Value("${spring.kafka.producer.transaction-id-prefix}")
        private String transactionPrefix;
    
        @Value("${spring.kafka.producer-non-transactional.transaction-id-prefix:}")
        private String nonTransactionalPrefix;
    
        // 事务 KafkaTemplate
        @Bean(name = "transactionalKafkaTemplate")
        public KafkaTemplate<String, Object> transactionalKafkaTemplate() {
            Map<String, Object> props = new HashMap<>();
            // 配置事务属性
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "自身kafka ip:9092");
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionPrefix);
    
            ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
            return new KafkaTemplate<>(producerFactory);
        }
    
        // 非事务 KafkaTemplate
        @Bean(name = "nonTransactionalKafkaTemplate")
        public KafkaTemplate<String, Object> nonTransactionalKafkaTemplate() {
            Map<String, Object> props = new HashMap<>();
            // 配置非事务属性
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "自身kafka ip:9092");
            // 注意这里没有设置 transaction-id-prefix
    
            ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
            return new KafkaTemplate<>(producerFactory);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
  3. 生产者

    @RestController
    public class UserController {
    
        private static final String TOPIC_NAME = "yym-topic-test";
        private static final String TOPIC1_NAME = "yym-topic-test1";
    
        @Autowired
        @Qualifier("nonTransactionalKafkaTemplate")
        private KafkaTemplate<String,Object> nonTransactionalKafkaTemplate;
    
        @Autowired
        @Qualifier("transactionalKafkaTemplate")
        private KafkaTemplate<String,Object> transactionalKafkaTemplate;
    
        @GetMapping("/api/v1/{num}")
        public void sendMessage1(@PathVariable("num") String num){
    
            nonTransactionalKafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{
    
                String topic = success.getRecordMetadata().topic();
                int partition = success.getRecordMetadata().partition();
                long offset = success.getRecordMetadata().offset();
                System.out.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset);
            },failure->{
                System.out.println("发送消息失败:"+failure.getMessage());
            });
        }
    
        /**
         * 注解方式的事务
         * @param i
         */
        @GetMapping("/kafka/transaction1")
        @Transactional(rollbackFor = RuntimeException.class)
        public void sendMessage1(int i) {
    
            transactionalKafkaTemplate.send(TOPIC1_NAME, "这个是事务里面的消息:1  i="+i);
            if (i == 0) {
                throw new RuntimeException("fail");
            }
            transactionalKafkaTemplate.send(TOPIC1_NAME, "这个是事务里面的消息:2  i="+i);
    
        }
    
    
        /**
         * 声明式事务
         * @param num
         */
        @GetMapping("/api/v1/tran2")
        public void sendMessage3( int num){
    
            transactionalKafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
                @Override
                public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {
    
                    // 1. 如果有数据库操作, 比如订单服务, 可以把订单状态变更操作和发送扣库存信息用事务绑定
                    kafkaOperations.send(TOPIC1_NAME,"这个是事务消息 1 i="+num);
                    if(num == 0){
                        throw new RuntimeException();
                    }
                    kafkaOperations.send(TOPIC1_NAME,"这个是事务消息 2 i="+num);
                    return true;
                }
            });
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
  4. 消费者

    @Component
    public class MqListener {
    
        @Autowired
        @Qualifier("nonTransactionalKafkaTemplate")
        private KafkaTemplate<String, Object> nonTransactionalKafkaTemplate;
    
    
        @KafkaListener(topics = {"yym-topic-test"}, groupId = "yym-g1")
        public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            // 打印出消息内容
            System.out.println("消费:" + record.topic() + "-" + record.partition() + "-" + record.value());
            ack.acknowledge();
        }
    
        @KafkaListener(topics = {"yym-topic-test1"}, groupId = "yym-g2")
        public void onMessage2(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    
            // 订单 ID
            String orderId = (String) record.key();
    
            // 1. 检查消费记录表, 判断该消息是否被消费过
            if (isOrderProcessed(orderId)) {
                // 消息已处理,直接返回
                System.out.println("该订单已被消费");
                return;
            }
            try {
                // 2.在库存库插入消费记录
                createConsumeRecord(orderId);
                // 3. 扣除库存操作, 扣除库存表数据
                System.out.println("扣除库存成功");
                // 4. 标记消费记录表已消费
                System.out.println("消费成功:" + record.topic() + "-" + record.partition() + "-" + record.value());
            } catch (Exception e) {
                // 5. 重试扣除库存表
                // 6. 仍旧失败, 根据订单id, 在消费记录表中记录异常信息, 触发钉钉推送, 研发人员处理
                logErrors(orderId, e);
    
            }
            // 打印出消息内容
            System.out.println("消费:" + record.topic() + "-" + record.partition() + "-" + record.value());
            ack.acknowledge();
        }
    
        private void createConsumeRecord(String orderId) {
            // 创建消费记录
        }
    
    
        private void logErrors(String orderId, Exception e) {
            // 记录消费失败操作
            System.out.println("记录消息消费失败日志");
        }
    
        private boolean isOrderProcessed(String orderId) {
            // 数据库建立一个订单消费者表, 当订单领域改变订单状态时, 向库存领域消费者表中添加一条订单id记录, 标记为未消费
            // 消费者根据订单id查询消费记录, 已消费则返回true
            Boolean result = true;
            // 未消费, 则fasle
            result = false;
            return result;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64

    针对事务的使用给了一个具体的案例, 大部分的项目不会引入分布式事务, 而是采用这种基于消息的最终一致性方案