Configuração Comum de Checkpoints no Flink
// Configuração de checkpoints
ExecutionEnvironment ambienteExecucao = ...;
ambienteExecucao.enableCheckpointing(180000); // Habilita checkpoint a cada 180000ms
CheckpointConfig configuracaoCheckpoint = ambienteExecucao.getCheckpointConfig();
configuracaoCheckpoint.setMinPauseBetweenCheckpoints(50000); // Define pausa mínima entre checkpoints em 50000 ms
configuracaoCheckpoint.setCheckpointTimeout(600000); // Tempo limite para conclusão do checkpoint: 600000 ms
configuracaoCheckpoint.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Modo de checkpoint: exatamente-uma-vez
configuracaoCheckpoint.setMaxConcurrentCheckpoints(1); // Número máximo de checkpoints concorrentes: 1
configuracaoCheckpoint.setCheckpointStorage("hdfs:///flink-checkpoints/exemplo/"); // Armazenamento em sistema distribuído HDFS
configuracaoCheckpoint.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Manter checkpoints após cancelamento do job
configuracaoCheckpoint.enableUnalignedCheckpoints(); // Habilitar checkpoints desalinhados para melhor desempenho
O Estado (State) representa os dados principais que o Checkpoint persiste como backup, enquanto o Checkpoint é uma operação global iniciada na fonte e propagada por todos os nós downstream.
Para recuperação de tarefas no Flink, é possível restaurar o estado a partir de Checkpoints ou Savepoints, garantindo a continuidade do processamento em tempo real.
Algoritmo de Implementação do Checkpoint
- Utiliza o algoritmo Chandy-Lamport para criar snapshots distribuídos de forma eficiente
- Separa a persistência do checkpoint do fluxo de dados, evitando a necessidade de pausar toda a aplicação
- Barreiras de Checkpoint (Checkpoint Barrier):
- O algoritmo emprega uma estrutura especial chamada barreira para dividir dados em diferentes checkpoints ao longo do fluxo
- Barreiras são disparadas periodicamente nas tarefas de fonte (Source Task) e transmitidas como eventos especiais para operadores downstream
- Alterações de estado causadas por dados anteriores à barreira são incluídas no checkpoint atual
- Dados subsequentes à barriera têm suas modificações registradas em checkpoints futuros
- Em operadores com múltiplas entradas, ocorre o alinhamento de barreiras, visível na interface web do Flink
- O checkpoint é concluído somente quando o operador Sink recebe a barreira e confirma o término
Exemplo de Inicialização a partir de um Checkpoint
Para iniciar uma aplicação Flink a partir de um checkpoitn persistido, utilize o comando abaixo com parâmetros reescritos:
./bin/flink run -s hdfs://ip:8020/user/xx/checkpoint-35 -c classe_principal ./aplicacao.jar
</div>Para recuperação usando savepoints, consulte a documentação sobre como criar e carregar savepoints de forma eficaz.