1. 检查点(Checkpoint)机制

1.1 检查点的作用

​ 检查点机制是Flink保证故障恢复和状态一致性的重要部分。通过定期生成检查点,Flink将作业的状态和偏移量保存到外部持久化存储中。在发生故障时,可以从最近一次成功的检查点恢复流处理任务,确保程序状态与输入数据的一致性。

1.2 检查点的运行原理

Flink使用**分布式快照算法(Chandy-Lamport 算法)**来实现检查点,整个过程如下:

  1. 触发检查点

    ​ JobManager 定期(或手动触发)向 Source 任务发送 CheckpointBarrier(检查点屏障)。

    ​ 每个 Source Task 接收到检查点信号后,开始生成检查点快照,并向下游算子传播检查点屏障。

  2. 屏障传播

    ​ CheckpointBarrier会随着数据流传播,确保所有任务(算子)的状态在同一个屏障之前被保存。

    ​ 当屏障到达算子时,算子会先保存当前状态,然后继续处理后续的数据。

  3. 保存状态

    ​ 每个任务(Task)将其状态保存到 State Backend中(如内存、RocksDB、文件系统)。

    ​ 任务会将检查点状态的元信息报告给 JobManager。

  4. 完成检查点

    ​ 当所有任务都完成了检查点保存,JobManager 会标记检查点成功。

    ​ 失败的检查点会被丢弃,不影响程序运行。

1.3 核心源码分析

关键类:CheckpointCoordinator

CheckpointCoordinator 是Flink检查点机制的核心类,负责协调检查点的生成、屏障传播和状态存储。

核心方法:

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
      CheckpointProperties props,
      String externalSavepointLocation,
      boolean isPeriodic,
      boolean advanceToEndOfTime);

作用:负责触发新的检查点。

逻辑

  1. JobManager向Source Task下发 CheckpointBarrier
  2. 收集每个算子的状态元数据。
  3. 将状态元数据存储到持久化存储(如 HDFS)。

屏障传播:BarrierBuffer

BarrierBuffer 是屏障传播的核心类,负责管理检查点屏障。

核心逻辑:

​ 当屏障到达时,阻塞处理流数据,保存当前状态。

​ 屏障通过后,继续处理数据流。

1.4 检查点的对齐与非对齐

  1. 对齐检查点(Aligned Checkpoint)

    ​ 默认情况下,检查点会对齐所有输入流。即:算子只有在所有输入流的屏障到达后,才会保存状态。

    ​ 优点:保证了数据的一致性。

    ​ 缺点:可能导致延迟(需要等待较慢的输入流)。

  2. 非对齐检查点(Unaligned Checkpoint)

    ​ 算子在接收到任意输入流的屏障后立即保存状态,不需要等待所有输入流。

    ​ 优点:适合存在数据倾斜的场景,减少延迟。

    ​ 缺点:会增加状态存储的开销。

非对齐检查点源码

  • 通过 CheckpointOptions 指定是否启用非对齐检查点:

    public static CheckpointOptions forUnalignedCheckpoint() {
        return new CheckpointOptions(
                CheckpointType.CHECKPOINT, true, TARGET_DEFAULT);
    }
    

2. 两阶段提交协议(Two-phase Commit)

两阶段提交协议用于保证Flink的Sink算子(如Kafka Sink、数据库Sink)能够实现exactly-once。

2.1 两阶段提交的原理

两阶段提交分为两个阶段:

  1. 第一阶段(预提交,Pre-commit)

    ​ Sink 算子将数据写入外部存储的事务中,但事务未提交。

    ​ 事务处于“准备提交”的状态。

  2. 第二阶段(提交或回滚,Commit or Rollback)

    ​ 检查点完成后,Flink向Sink算子发送确认信号。

    ​ Sink算子收到信号后,提交事务。

    ​ 如果检查点失败,Flink回滚状态,未提交的事务也会被回滚。


2.2 核心源码分析

事务接口:TwoPhaseCommitSinkFunction

Flink 提供了 TwoPhaseCommitSinkFunction 抽象类,方便开发者实现两阶段提交。

常用方法:

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN> {
    protected abstract TXN beginTransaction() throws Exception;  // 开启事务
    protected abstract void preCommit(TXN transaction) throws Exception;  // 预提交事务
    protected abstract void commit(TXN transaction) throws Exception;  // 提交事务
    protected abstract void abort(TXN transaction) throws Exception;  // 回滚事务
}

事务的生命周期:

  1. 事务开始

    ​ Flink 为每个 Sink 算子分配一个唯一的事务。

  2. 预提交阶段

    ​ 将数据写入事务缓冲区。

    ​ 事务处于未提交状态。

  3. 检查点完成后

    commit 方法被调用,事务数据被正式提交到目标系统。

  4. 故障处理

    ​ 如果检查点失败,调用 abort 方法回滚事务。

2.3 以Kafka Sink为例

Kafka Sink实现了两阶段提交协议,通过Kafka的事务支持实现exactly-once

实现类:FlinkKafkaProducer

FlinkKafkaProducer 使用 Kafka 的事务 API 实现两阶段提交。

​ 核心逻辑:

​ 每个 Sink 算子开启一个 Kafka 事务。

​ 在检查点完成后,提交事务。

​ 如果检查点失败,回滚 Kafka 事务。

public void preCommit(TXN transaction) throws Exception {
    // Kafka 事务预提交逻辑
}

@Override
public void commit(TXN transaction) throws Exception {
    // 提交 Kafka 事务
}

@Override
public void abort(TXN transaction) throws Exception {
    // 回滚 Kafka 事务
}

3. 检查点与两阶段提交的结合

检查点和两阶段提交的结合是实现全流程 exactly-once 的关键:

  1. 检查点保存 Flink 的内部状态。
  2. 两阶段提交保证 Sink 算子的输出数据一致性。
  3. 检查点完成后,事务会被提交,确保状态和输出数据一致。

4. 总结

  1. 检查点机制

    ​ Flink 使用分布式快照算法生成检查点,保存任务的状态和偏移量。

    ​ 对齐检查点保证了一致性,非对齐检查点提高了性能。

  2. 两阶段提交协议

    ​ 通过预提交和提交两个阶段,确保 Sink 输出的最终一致性。

    ​ Flink 提供了 TwoPhaseCommitSinkFunction,便于实现事务支持。

  3. 源码关键点

    ​ 检查点:CheckpointCoordinatorBarrierBuffer

    ​ 两阶段提交:TwoPhaseCommitSinkFunction