# Kafka 概念与实战

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
相关源码参考: KAFKA (opens new window)


# 快速认知:分布式流处理平台

  • Kafka 简介

    • 最初由 LinkedIn 开发,2010 年贡献给 Apache 基金会,成为顶级开源项目。
    • 一个开源的 分布式事件流平台,由 Scala 和 Java 编写。
    • 核心用途:处理用户行为等大数据流式数据(如网页浏览、搜索),广泛应用于 大数据实时处理领域
  • Kafka 官网与文档

  • 核心角色初识

    • Broker:Kafka 服务端程序,即一个 MQ 节点,负责存储 Topic 数据。
    • Producer:消息生产者,将消息发送到指定 Topic。
    • Consumer:消息消费者,从 Topic 中消费消息。

# 核心概念详解

  • Broker

    • Kafka 的服务端程序,负责存储 Topic 数据。一个节点即一个 Broker。
  • Producer(生产者)

    • 创建消息并发布到 Kafka 的 Topic 中。
  • Consumer(消费者)

    • 从 Kafka 的队列中消费消息。
  • Consumer Group(消费者组)

    • 多个消费者组成一个组,同一组内每条消息只被一个消费者处理;不同组间为广播模式。
  • Topic

    • 消息的分类标识,Producer 按 Topic 发布,Consumer 按 Topic 订阅。
  • Partition(分区)

    • Kafka 存储的基本单元。一个 Topic 被划分为多个 Partition,可分布在不同节点。
    • 每个 Partition 是有序的,Consumer 数量应 ≤ Partition 数量。
  • Replication(副本)

    • Partition 的备份,提供容灾能力。默认每个 Topic 副本数为 1(节省资源)。
    • 例如:3 个 Broker 的集群中,副本数最大为 3,超过会报错。
  • Leader / Follower(副本角色)

    • 每个 Partition 的副本中,只有一个 Leader 负责与 Producer/Consumer 交互,Follower 从 Leader 同步。
  • ReplicationManager

    • 管理 Broker 中所有分区副本的信息和副本状态切换。
  • Offset(偏移量)

    • Consumer 记录其消费到 Partition 的位置,Kafka 将 offset 保存在消费者组中。

# 特性总结

  • 多订阅者:一个 Topic 可被多个消费者组订阅。
  • 高吞吐、低延迟:每秒处理数十万条消息。
  • 高并发:支持成千上万客户端并发读写。
  • 容错性强:多副本、分区容灾机制,N 副本容忍 N-1 节点失败。
  • 扩展性好:支持集群热扩容。

# 消费模型说明

  • 队列模型:所有消费者在同一组,消息仅由一个 Consumer 处理。
  • 发布订阅模型:不同组的消费者各自接收同一条消息,实现广播。

以该图为例, 有2个kafka节点(Broker), 以topic1为例, 有4个partition(leader), 4个leader的Replication (follower), 因为是备份, 同一个topic的partition的备份不需要放在同一个Broker上。zookeeper会在kafka集群中选一个controller, 该controller会负责分区分配以及分区的选举。partition可以认为是队列, kafka高吞吐相当于在topic内部做了分表

# 云服务器部署

  1. jdk-11

    • 下载安装自行查看

    • 配置全局变量(vim /etc/profile)

      JAVA_HOME=/www/server/java/jdk-11.0.19
      CLASSPATH=$JAVA_HOME/lib/
      PATH=$PATH:$JAVA_HOME/bin
      export PATH JAVA_HOME CLASSPATH
      
      1
      2
      3
      4
    • 环境变量生效(source /etc/profile)

  2. zookeeper安装

    • 下载apache-zookeeper-3.7.0-bin.tar.gz并解压
    • 复制/zookeeper/conf/zoo_sample.cfg 命名为 zoo.cfg
    • 启动(bin/zkServer.sh start)
    • 记得开放安全组(2181 默认端口)
  3. kafka安装

    • 下载kafka_2.13-2.8.0.tgz并解压

    • 修改/kafka/config/server.properties

      #修改下面两个配置 ( listeners 配置的ip和advertised.listeners相同时启动kafka会报错)
      listeners(内网Ip)
      advertised.listeners(公网ip)
      
      #修改zk地址,默认地址 如果zk和kafka在一台机器上
      zookeeper.connection=localhost:2181
      
      1
      2
      3
      4
      5
      6
    • 守护进程方式启动

      ./kafka-server-start.sh -daemon ../config/server.properties &
      
      1
    • 记得开放安全组(9092默认端口)

# 点对点-发布订阅模型和写入存储流程

# 命令行生产者发送消息和消费者消费消息

简介: Kafka命令行生产者发送消息和消费者消费消息实战

  • 创建topic

    ./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic xdclass-topic
    
    1
  • 查看topic

    ./kafka-topics.sh --list --zookeeper 112.74.55.160:2181
    
    1
  • 生产者发送消息

    ./kafka-console-producer.sh --broker-list 112.74.55.160:9092  --topic version1-topic
    
    1
  • 消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费)

    ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1
    
    1
  • 删除topic

    ./kafka-topics.sh --zookeeper 112.74.55.160:2181 --delete --topic t1
    
    1
  • 查看broker节点topic状态信息

    ./kafka-topics.sh --describe --zookeeper 112.74.55.160:2181  --topic xdclass-topic
    
    1

# 点对点模型和发布订阅模型

简介: Kafka点对点模型和发布订阅模型讲解

  • JMS规范目前支持两种消息模型
    • 点对点(point to point)
      • 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
      • 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
    • 发布/订阅(publish/subscribe)
      • 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
      • 和点对点方式不同,发布到topic的消息会被所有订阅者消费。

# Kafka消费者组配置实现点对点消费模型

简介: Kafka消费者组配置实现点对点消费模型

  • 编辑消费者配置(确保同个名称group.id一样)
    • 编辑 config/consumer.properties
  • 创建topic, 1个分区
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t1
1
  • 指定配置文件启动 两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties
1
  • 现象

    • 只有一个消费者可以消费到数据,一个分区只能被同个消费者组下的某个消费者进行消费

# Kafka消费者组配置实现发布订阅消费模型

简介: Kafka消费者组配置实现发布订阅消费模型

  • 编辑消费者配置(确保group.id 不一样)
    • 编辑 config/consumer-1.properties
    • 编辑 config/consumer-2.properties
  • 创建topic, 2个分区
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 2 --topic t2
1
  • 指定配置文件启动 两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties

./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties
1
2
3
  • 现象
    • 两个不同消费者组的节点,都可以消费到消息,实现发布订阅模型

# 数据存储流程和原理概述和LEO+HW

简介: Kafka数据存储流程、原理、LEO+HW讲解

  • Partition

    • topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列

    • 是以文件夹的形式存储在具体Broker本机上

  • LEO(LogEndOffset)

    • 表示每个partition的log最后一条Message的位置。如上图, partition 最后的虚线框部分。
  • HW(HighWater mark)

    • 表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置
    • HW之前的数据才是Commit后的,对消费者才可见
    • ISR集合里面最小leo

  • offset

    • 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
    • partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
    • 可以认为offset是partition中Message的id
  • Segment:每个partition又由多个segment file组成;

    • segment file 由2部分组成,分别为index file和data file(log file),
    • 两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件
    • 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
  • Kafka高效文件存储设计特点:(结合concurrentHashmap 1.7 1.8思考)

    • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message
    • producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S

# Producer原理与核心API实战

# Producer整体结构原理讲解

  • Producer 发送消息流程:

    1. 构造消息(Record)
      • 包括 topic、key(可选)、value
    2. 序列化
      • key 和 value 序列化为字节数组(通过配置的 Serializer 类完成)
    3. 分区器(Partitioner)选择 Partition
      • 如果指定了 key,默认 hash 取模分区
      • 如果没指定 key,采用轮询 Round-Robin 算法
    4. 发送消息到 RecordAccumulator
      • Kafka 会把消息先放入缓冲池,异步批量发送,提升吞吐
    5. Sender线程发送请求
      • Sender 是 Kafka 内部专门的线程,用于将消息打包成 batch 发送给对应的 Broker
    6. 网络 I/O
      • 通过 Kafka 客户端的 NIO 机制发送数据
    7. Broker 接收写入消息,返回 ack
  • 生产者到broker发送流程

    • Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch里面,再一次性发送到Broker上去的,这样性能才可能题高

  • 生产者常见配置

    • 官方文档 http://kafka.apache.org/documentation/#producerconfigs
#kafka地址,即broker地址
bootstrap.servers  

#当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
acks

#请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
retries

#每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
batch.size

# 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
# 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
#如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
linger.ms

# buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
# 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
# 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
# buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
buffer.memory

# key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使
#消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
key.serializer
value.serializer  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 核心API模块-producer API

  • 封装配置属性
public static Properties getProperties(){
    Properties props = new Properties();

    props.put("bootstrap.servers", "自己kafka的ip:9092");
    //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

    // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
    // 0: 不确认  1: leader确认  all/-1 所有副本确认
    props.put("acks", "all");
    //props.put(ProducerConfig.ACKS_CONFIG, "all");

    // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
    props.put("retries", 0);
    //props.put(ProducerConfig.RETRIES_CONFIG, 0);


    // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
    props.put("batch.size", 16384);


    /**
     * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
     * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
     * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求
     * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
     */
    props.put("linger.ms", 5);

    /**
     * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
     * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
     * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
     * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
     * 需要结合实际业务情况压测进行配置
     */
    props.put("buffer.memory", 33554432);


    /**
     * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,
     * 即使消息中没有指定key,序列化器必须是一个实
     org.apache.kafka.common.serialization.Serializer接口的类,
     * 将key序列化成字节数组。
     */
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

    return props;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  • 生产者投递消息API实战(同步发送)
/**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     *
     *
     */
    @Test
    public void testSend(){

        Properties properties = getProperties();
        Producer<String,String> producer = new KafkaProducer<>(properties);
        for(int i=0;i<3 ;i++){
            // ProducerRecord 即消息, 指定key 分区时会对key进行hash, 不指定则轮询将消息放到不同的分区
            // 未指定分区数, 默认为1 只有1个分区 消息自然全部进入同一个分区, 但是保证了消息的有序性
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,"yym-key-noautocommit"+i, "yym-value"+i));

            try {
                //不关心结果则不用写这些内容
                RecordMetadata recordMetadata =  future.get();
                // topic - 分区编号@offset
                System.out.println("发送状态:"+recordMetadata.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# ProducerRecord 和 key 的作用

  • ProducerRecord(PR)

    ProducerRecord 是发送给 Kafka Broker 的消息对象,包含以下信息:

    • Topic(消息所属的主题)
    • PartitionID(可选,指定消息的分区)
    • Key(可选,用于消息分配)
    • Value(消息内容)

    key 的作用

    • 默认情况下,key 为 null,Kafka 会采用 RoundRobin 算法将消息均匀分配到各个分区。
    • 如果设置了 key,Kafka 会对其进行哈希计算,决定消息写入哪个分区。拥有相同 key 的消息会被写到同一分区,从而确保顺序消费。

# 核心API模块-producerAPI回调函数

  • 生产者发送消息是异步调用,怎么知道是否有异常?
    • 发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
    • 回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败
  • 异步发送配置回调函数
/**
 * 发送消息携带回调函数
 */
@Test
public void testSendWithCallback(){

    Properties properties = getProperties();
    Producer<String,String> producer = new KafkaProducer<>(properties);

    for(int i=0;i<3 ;i++) {
        producer.send(new ProducerRecord<>(TOPIC_NAME, "yym-key" + i, "yym-value" + i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                // 回调函数异常为null即消息成功
                if(exception == null){
                    System.err.println("发送状态:"+metadata.toString());

                } else {
                    exception.printStackTrace();
                }
            }
        });
    }
    producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# producer生产者发送指定分区

  • 创建topic,配置5个分区,1个副本
  • 发送代码
/**
 * 发送消息携带回调函数,指定某个分区
 * 实现顺序消息
 */
@Test
public void testSendWithCallbackAndPartition(){

    Properties properties = getProperties();
    Producer<String,String> producer = new KafkaProducer<>(properties);

    // 指定分区发送消息 就可以实现顺序消费消息
    for(int i=0;i<10 ;i++) {
        producer.send(new ProducerRecord<>("yym-v1-sp-topic-test", 4,"yym-key" + i, "yym-value" + i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null){
                    System.err.println("发送状态:"+metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        });

    }
    producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 生产者自定义partition分区规则

  • 源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
1
  • 自定义分区规则

    • 创建类,实现Partitioner接口,重写方法
    • 配置 partitioner.class 指定类即可
    public class YymPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            throw new IllegalArgumentException("key 参数不能为空");
        }
    
        if("yym".equals(key)){
            return 0;
        }
    
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
    
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;    }
    
    @Override
    public void close() {
    
    }
    
    @Override
    public void configure(Map<String, ?> map) {
    
    }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
     *
     * 自定义分区策略
     */
    @Test
    public void testSendWithPartitionStrategy(){
        Properties properties = getProperties();
        properties.put("partitioner.class", "com.yym.kafka.config.YymPartitioner");
        Producer<String,String> producer = new KafkaProducer<>(properties);
    
        for(int i=0;i<10 ;i++) {
            producer.send(new ProducerRecord<>("yym-v1-sp-topic-test", "yym", "yym-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception == null){
                        System.err.println("发送状态:"+metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

# 核心 API 消费者模块

# Consumer 消费者机制和分区策略

  1. 消费者如何从 Broker 获取数据?

    • Pull 模式: Kafka 消费者采用 pull 拉取数据的方式,而不是 Broker 主动推送。
      • 优点:消费者可以根据自己的消费能力调整拉取频率,避免处理不过来。
      • 缺点:如果没有数据,消费者会根据配置的 timeout 阻塞等待一段时间。
    • 为什么不是 Push 模式? 如果是 Broker 主动推送消息,虽然可以快速处理,但容易导致消费者消费能力不足,消息堆积,可能造成延迟。
  2. 消费者从哪个分区进行消费?

    • 一个 Topic 可以有多个 Partition,而一个消费者组(Consumer Group)可以包含多个消费者。

    • 每个 Partition 由一个消费者消费,但每个消费者组中的消费者不会重复消费同一个分区。

  3. 消费者分配策略

    • 顶层接口

      org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
      
      1
    • RoundRobinAssignor(轮询分配)

      将所有 PartitionConsumer 按照消费者组进行轮流分配。

      • 弊端

        1. 如果同一个消费者组的消费者监听同一个topic

          • 当partition没法被所有的消费者均摊时, 会造成消费者分配不均
        2. 如果同一个消费者组的消费者监听不同的topic

          • 以下图为例, c1 订阅 topic1, c2 订阅 topic1 topic2, c3 全部订阅会导致分配不均

    • RangeAssignor(范围分配)

      按照每个 TopicPartition 数量,依次将 Partition 分配给消费者。

      • 按照主题的partition数量进行分配, 举例
        • topic1(p0 p1) topic2(p0 p1 p2), 消费者组c1 c2 都订阅了
        • c1(t1.p0) c2(t1.p1) c1(t2.p0) c2(t2.p1) c1(t2.p2)
      • 弊端
        • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
        • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

# Consumer 重新分配策略和 Offset 维护机制

  1. Rebalance 操作

    • Rebalance(重新平衡)是指 Kafka 在消费者组内消费者数量发生变化时,重新分配 Partition,确保每个消费者尽可能均衡地消费数据,避免过度负载。

    • Rebalance 触发条件

      • 消费者组内消费者数量变化(例如添加或移除消费者)。
      • TopicPartition 数量发生变化。
  2. 消费者宕机后的恢复

    • Offset 记录: Kafka 会记录消费者的消费进度(offset),确保消费者恢复后能从上次消费的位置继续消费。
      • offset 默认保存在 Kafka 内部的 __consumer_offsets 主题中,而不是 ZooKeeper 或本地。
      • __consumer_offsets 主题有多个分区,Kafka 通过消费者组 ID 和分区哈希确定每个消费者的消费位置。
      • 每个消费位点由 group.id + topic + partition 唯一标识,记录消费的 offsetmetadata(如时间戳)
  3. 面试

    • 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?

      • Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
        • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
        • 分区数量发生变化时(即 topic 的分区数量发生变化时)
    • 当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?

      • 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
      • 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
        • 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
        • 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
        • 由 消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
        • __consumer_offsets 的 key:group.id+topic+分区号,而 value 就是 offset + metadata(时间戳)
        • 不同的消费者组会消费同一个分区, 所以需要加groupId

# 消费者Consumer消费消息配置

  • 配置
public static Properties getProperties() {

    Properties props = new Properties();
    //broker地址
    props.put("bootstrap.servers", "自己kafka的ip:9092");
    //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
    props.put("group.id", "yym-g1");
    //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效
    props.put("auto.offset.reset","earliest");
    //开启自动提交offset
    //开启自动提交时,Kafka 会在指定时间间隔自动提交 offset,只要调用了 poll() 方法,
    // 即使消费者还未处理或处理失败,Kafka 也会将 offset 向前提交,这可能导致未处理的消息被标记为已消费,从而造成消息丢失风险。
    //props.put("enable.auto.commit", "true");
    props.put("enable.auto.commit", "false");
    //自动提交offset延迟时间
    //props.put("auto.commit.interval.ms", "1000");
    //反序列化
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    return props;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  • 消费订阅
@Test
public void simpleConsumerTest(){

    Properties properties = getProperties();
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    //订阅主题
    kafkaConsumer.subscribe(Arrays.asList(_02KafkaProducerTest.TOPIC_NAME));

    while (true){
        //消费者向kafka拉取消息消费,响应的阻塞超时时间, 有消息及时返回, 没有则阻塞100ms
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord record : records){
            System.err.printf("topic=%s, patition=%d, offset=%d,key=%s,value=%s %n",record.topic(),record.partition(), record.offset(),record.key(),record.value());
        }

        //同步阻塞提交offset
        //kafkaConsumer.commitSync();

        if(!records.isEmpty()){
            //异步提交offset
            kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                    if(exception == null){
                        System.err.println("手工提交offset成功:"+offsets.toString());
                    }else {
                        System.err.println("手工提交offset失败:"+offsets.toString());
                    }
                }
            });
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# Consumer从头消费配置和手工提交offset配置

  • 如果需要从头消费partition消息,怎操作?

    • auto.offset.reset 配置策略即可
    • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
    //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");
    
    1
  • 自动提交offset问题

    • 没法控制消息是否正常被消费
    • 适合非严谨的场景,比如日志收集发送
  • 手工提交offset配置和测试

    • 初次启动消费者会请求broker获取当前消费的offset值
  • 手工提交offset

    • 同步 commitSync 阻塞当前线程 (自动失败重试)
    • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)