Componentes Básicos de um Programa Flink
Um aplicativo Flink processa transformações em fluxos de dados (DataStreams). Sua estrutura essencial inclui:
Ambiente de Execução
Criação do Ambiente
import org.apache.flink.streaming.api.scala._
val ambiente = StreamExecutionEnvironment.getExecutionEnvironment
getExecutionEnvironment: Retorna ambiente local (execução independente) ou cluster (submissão via JAR)createLocalEnvironment: Ambiente local com paralelismo customizávelcreateRemoteEnvironment: Ambiente de cluster com especificação de JobManager
Modos de Execução
ambiente.setRuntimeMode(RuntimeExecutionMode.BATCH)
- STREAMING: Padrão para fluxos contínuos
- BATCH: Processamento similar a MapReduce
- AUTOMATCI: Seleção automática baseada nos dados de antrada
Configuração recomendada via linha de comando: bin/flink run -Dexecution.runtime-mode=BATCH
Fontes de Dados
Coleções
case class Evento(usuario: String, url: String, timestamp: Long)
val eventos = List(Evento("Ana", "/home", 1000L), Evento("Carlos", "/carrinho", 2000L))
val fluxo = ambiente.fromCollection(eventos)
Arquivos
val texto = ambiente.readTextFile("dados.txt")
texto.flatMap(_.split(" ")).print()
Suporte para HDFS requer dependência Hadoop:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
Fonte Personalizada
class FonteEventos extends SourceFunction[Evento] {
private var ativo = true
override def run(ctx: SourceFunction.SourceContext[Evento]): Unit = {
while(ativo) {
ctx.collect(Evento(usuarios.aleatorio, urls.aleatorio, System.currentTimeMillis))
Thread.sleep(500)
}
}
override def cancel(): Unit = ativo = false
}
Transformações de Dados
Operações Básicas
- Mapeamento:
fluxo.map(_.toUpperCase) - Filtragem:
fluxo.filter(_.contains("error")) - FlatMap:
fluxo.flatMap(_.split(","))
Agrupamento e Agregação
fluxo.keyBy(_.usuario)
.reduce((e1, e2) => Evento(e1.usuario, e1.url, e1.timestamp + e2.timestamp))
.print()
Funções de agregação:
sum: Soma de campos numéricosmax/min: Valores extremosreduce: Operação personalizada
Particionamento
dados.shuffle() // Aleatório
dados.rebalance() // Round-robin
dados.broadcast() // Todos os nós
dados.rescale() // Escalonamento local
Particionamento custoimzado:
dados.partitionCustom((chave, particoes) => chave % 3, _)
Saída de Dados
StreamingFileSink
val sinkArquivo = StreamingFileSink.forRowFormat(
new Path("/saida"),
new SimpleStringEncoder[String]()
).build()
fluxoTexto.addSink(sinkArquivo)