Estratégias de Tratamento de Rejeição para Thread Pools Java

Por Que Customizar Thread Pools em Aplicações Java?

A gestão eficiente de threads é fundamental para o desempenho e a estabilidade de aplicações Java. As diretrizes de desenvolvimento, como as estabelecidas pelo manual da Alibaba para Java, frequentemente enfatizam a criação manual de thread pools em detrimento dos métodos de fábrica fornecidos pela classe Executors do JDK.

Os métodos estáticos de Executors, como newFixedThreadPool, newSingleThreadExecutor, newCachedThreadPool e newScheduledThreadPool, embora convenientes, apresentam riscos significativos de exaustão de recursos:

  • FixedThreadPool e SingleThreadExecutor utilizam filas de trabalho com capacidade Integer.MAX_VALUE. Isso pode levar ao acúmulo ilimitado de tarefas na fila, resultando em erros de falta de memória (OOM).
  • CachedThreadPool e ScheduledThreadPool permitem a criação de um número ilimitado de threads (até Integer.MAX_VALUE). Em situações de alta carga, isso pode gerar um volume excessivo de threads, consumindo memória e CPU e potencialmente causando OOM.

Por essas razões, é amplamente recomendado que os desenvolvedores criem suas instâncias de thread pool usando a classe ThreadPoolExecutor diretamente, oferecendo maior controle e prevenindo problemas de estabilidade.

Definindo um Thread Pool Customizado com ThreadPoolExecutor

A classe ThreadPoolExecutor oferece construtores que permitem uma configuração detalhada. O construtor mais completo ilustra os parâmetros essenciais para uma configuração robusta:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

Cada parâmetro desempenha um papel crucial na operação do pool:

  • corePoolSize: O número de threads que o pool mantém ativas mesmo quando ociosas. São as threads centrais.
  • maximumPoolSize: O número máximo de threads que o pool pode expandir. Quando a fila de trabalho está cheia e corePoolSize é atingido, novas threads são criadas até este limite.
  • keepAliveTime: O tempo máximo que threads acima de corePoolSize podem permanecer ociosas antes de serem terminadas.
  • unit: A unidade de tempo para o parâmetro keepAliveTime (ex: TimeUnit.SECONDS).
  • workQueue: A fila usada para armazenar tarefas antes de serem executadas pelas threads. Se todas as threads do pool estiverem ocupadas, as tarefas são enfileiradas aqui.
  • threadFactory: Uma fábrica para criar novas threads. Permite personalizar a criação de threads, como nomeá-las.
  • RejectedExecutionHandler: A estratégia a ser aplicada quando o pool não pode aceitar uma nova tarefa (por exemplo, quando o maximumPoolSize é atingido e a workQueue está cheia).

A seguir, exploramos em detalhe as diversas estratégias de rejeição.

Compreendendo as Estratégias de Rejeição de Tarefas

Quando um ThreadPoolExecutor está esgotado — ou seja, todas as threads estão ocupadas e a fila de trabalho está cheia — ele precisa decidir o que fazer com uma nova tarefa que chega. Essa decisão é delegada a uma instância da interface RejectedExecutionHandler.

A Interface RejectedExecutionHandler

Todas as políticas de rejeição implementam esta interface simples:

public interface RejectedExecutionHandler {
    /**
     * Método invocado quando uma tarefa não pode ser executada por um ThreadPoolExecutor.
     * @param task A tarefa Runnable que foi solicitada para execução.
     * @param executor O ThreadPoolExecutor que tentou executar a tarefa.
     * @throws RejectedExecutionException se nenhuma solução for encontrada.
     */
    void rejectedExecution(Runnable task, ThreadPoolExecutor executor);
}

O método rejectedExecution recebe a tarefa que foi rejeitada (task) e a instância do ThreadPoolExecutor (executor) que a rejeitou. Dependendo da implementação, a tarefa pode ser descartada, executada pelo chamador, ou causar uma exceção.

Políticas de Rejeição Padrão do JDK

O Java oferece quatro implementações padrão de RejectedExecutionHandler, aninhadas dentro da classe ThreadPoolExecutor:

1. AbortPolicy

Esta é a política padrão. Ela simplesmente lança uma RejectedExecutionException. Isso interrompe o fluxo do chamador, exigindo tratamento de exceção. Geralmente, não é recomendada a menos que haja um requisito explícito para falha imediata e visível.

public static class AbortPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejeitada de " + e.toString());
    }
}

2. CallerRunsPolicy

Com esta política, a tarefa rejeitada é executada diretamente no thread do chamador (aquele que tentou submeter a tarefa ao pool). Isso efetivamente atrasa o chamador até que a tarefa seja concluída. Se o thread pool estiver encerrado, a tarefa é descartada silenciosamente. Esta política ajuda a reduzir a taxa de submissão de tarefas, aplicando "backpressure" ao produtor.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run(); // Executa a tarefa no thread do chamador
        }
    }
}

É importante notar que r.run() executa o método run da tarefa diretamente no thread atual, não em uma nova thread ou em uma thread do pool.

3. DiscardOldestPolicy

Esta política remove a tarefa mais antiga da fila de trabalho do pool e então tenta submeter a nova tarefa novamente. Se a re-submissão falhar (por exemplo, a fila ainda está cheia ou a tarefa é novamente rejeitada por outra razão), a nova tarefa também pode ser descartada ou tratada por outra política. Esta abordagem é útil quando se priorizam tarefas mais recentes.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll(); // Remove a tarefa mais antiga da fila
            e.execute(r);       // Tenta executar a nova tarefa novamente
        }
    }
}

O método poll() em uma BlockingQueue remove o elemento da cabeça da fila, que é a tarefa mais antiga devido à natureza FIFO das filas.

4. DiscardPolicy

A política mais simples: ela descarta silenciosamente a tarefa rejeitada. Nenhuma exceção é lançada, e a tarefa não é executada. É útil em cenários onde a perda de dados é aceitável, e a aplicação pode continuar sem processar todas as tarefas.

public static class DiscardPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // A tarefa é simplesmente descartada, sem qualquer ação
    }
}

Implementando Políticas de Rejeição Customizadas

Os desenvolvedores podem criar suas próprias lógicas de tratamento de rejeição implementando a interface RejectedExecutionHandler. Isso oferece flexibilidade para atender a requisitos específicos de negócios ou operacionais.

Exemplo 1: NewThreadRunsPolicy (Inspirado no Netty)

Este é um exemplo de uma política customizada que, em vez de rejeitar a tarefa ou executá-la no chamador, tenta iniciá-la em um novo thread temporário. Isso pode ser útil para garantir que tarefas críticas sejam processadas, mesmo sob sobrecarga, mas deve ser usado com cautela para evitar a explosão de threads.

public static class NewThreadRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
        try {
            // Tenta iniciar a tarefa em um novo thread temporário
            final Thread newWorkerThread = new Thread(task, "ExecutorTemporario-Task");
            newWorkerThread.start();
        } catch (Throwable ex) {
            throw new RejectedExecutionException("Falha ao iniciar novo thread para a tarefa", ex);
        }
    }
}

Exemplo 2: AbortPolicyWithReport (Inspirado no Dubbo)

Este exemplo estende a AbortPolicy padrão, adicionando funcionalidades de log detalhado e a capacidade de gerar um dump da pilha de execução (JStack) quando uma tarefa é rejeitada. Isso é inestimável para depuração em ambientes de produção, fornecendo informações contextuais sobre o estado do pool no momento da rejeição.

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    private final String poolName; // Nome do pool para identificação
    // Logger e URL simulados para o exemplo
    private static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(AbortPolicyWithReport.class.getName());

    public AbortPolicyWithReport(String name) {
        this.poolName = name;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String diagnosticMsg = String.format("Pool de Threads EXAURIDO! " +
                        "Nome do Pool: %s, Tamanho do Pool: %d (ativos: %d, core: %d, max: %d, maior: %d), " +
                        "Tarefas: %d (concluídas: %d), Status do Executor:(shutdown:%s, terminado:%s, terminando:%s)",
                poolName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
        
        logger.warning(diagnosticMsg);
        // dumpJStack(); // Chamada para um método que geraria um dump de pilha real
        throw new RejectedExecutionException(diagnosticMsg);
    }

    // Método dummy para representar a geração de JStack
    private void dumpJStack() {
        logger.info("Gerando JStack... (funcionalidade simulada)");
    }
}

Este tipo de política customizada é extremamente útil para monitorar e diagnosticar problemas de desempenho em sistemas distribuídos, fornecendo visibilidade iemdiata sobre a sobrecarga do pool de threads.

Tags: java Concorrência ThreadPoolExecutor EstratégiasRejeição Multithreading

Publicado em 6-1 16:06 por Thomas