Fundamentos da API DataStream no Apache Flink

Componentes Básicos de um Programa Flink

Um aplicativo Flink processa transformações em fluxos de dados (DataStreams). Sua estrutura essencial inclui:

Ambiente de Execução

Criação do Ambiente

import org.apache.flink.streaming.api.scala._

val ambiente = StreamExecutionEnvironment.getExecutionEnvironment
  • getExecutionEnvironment: Retorna ambiente local (execução independente) ou cluster (submissão via JAR)
  • createLocalEnvironment: Ambiente local com paralelismo customizável
  • createRemoteEnvironment: Ambiente de cluster com especificação de JobManager

Modos de Execução

ambiente.setRuntimeMode(RuntimeExecutionMode.BATCH)
  • STREAMING: Padrão para fluxos contínuos
  • BATCH: Processamento similar a MapReduce
  • AUTOMATCI: Seleção automática baseada nos dados de antrada

Configuração recomendada via linha de comando: bin/flink run -Dexecution.runtime-mode=BATCH

Fontes de Dados

Coleções

case class Evento(usuario: String, url: String, timestamp: Long)
val eventos = List(Evento("Ana", "/home", 1000L), Evento("Carlos", "/carrinho", 2000L))
val fluxo = ambiente.fromCollection(eventos)

Arquivos

val texto = ambiente.readTextFile("dados.txt")
texto.flatMap(_.split(" ")).print()

Suporte para HDFS requer dependência Hadoop:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>3.3.5</version>
</dependency>

Fonte Personalizada

class FonteEventos extends SourceFunction[Evento] {
  private var ativo = true
  override def run(ctx: SourceFunction.SourceContext[Evento]): Unit = {
    while(ativo) {
      ctx.collect(Evento(usuarios.aleatorio, urls.aleatorio, System.currentTimeMillis))
      Thread.sleep(500)
    }
  }
  override def cancel(): Unit = ativo = false
}

Transformações de Dados

Operações Básicas

  • Mapeamento: fluxo.map(_.toUpperCase)
  • Filtragem: fluxo.filter(_.contains("error"))
  • FlatMap: fluxo.flatMap(_.split(","))

Agrupamento e Agregação

fluxo.keyBy(_.usuario)
  .reduce((e1, e2) => Evento(e1.usuario, e1.url, e1.timestamp + e2.timestamp))
  .print()

Funções de agregação:

  • sum: Soma de campos numéricos
  • max/min: Valores extremos
  • reduce: Operação personalizada

Particionamento

dados.shuffle()      // Aleatório
dados.rebalance()    // Round-robin
dados.broadcast()    // Todos os nós
dados.rescale()      // Escalonamento local

Particionamento custoimzado:

dados.partitionCustom((chave, particoes) => chave % 3, _)

Saída de Dados

StreamingFileSink

val sinkArquivo = StreamingFileSink.forRowFormat(
  new Path("/saida"),
  new SimpleStringEncoder[String]()
).build()
fluxoTexto.addSink(sinkArquivo)

Tags: Apache Flink DataStream API Processamento de Fluxo Scala Transformações de Dados

Publicado em 6-13 08:22 por Thomas