Implementação de uma Pool de Threads em C++11 e C++17 com Análise do Dimensionamento

Pool de Threads com Fila Única de Tarefas

Utilizando as bibliotecas padrão modernas do C++ (como threads, mutexes e variáveis de condição), é possível construir uma pool de threads com uma única fila de tarefas de maneira eficiente. O conceito básico envolve inicializar um conjunto de threads na construção da pool e finalizá-los na destruição, expondo uma interface para submissão de tarefas.

Design da Interface

A interface principal inclui um construtor que aceita o número de threads e um método de enqueue para adicionar tarefas. O método de enqueue é um template que retorna um future, permitindo a obtenção do resultado assíncrono.

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());

template<typename F, typename... Args>
auto enqueue(F &&f, Args &&...args);

O parâmetro de entrada aceita um objeto chamável e seus argumentos, aproveitando templates de parâmetros variáveis do C++11 para flexibilidade.

Implementação Básica (C++11)

A classe contém um vetor de threads, uma fila de tarefas, um mutex para sincronização, uma variável de condição para notificação e uma flag para parada. A abordagem RAII garante que a pool seja destruída automaticamente.

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads);
    template<class F, class... Args>
    auto submit(F&& func, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();

private:
    std::vector<std::thread> thread_pool;
    std::queue<std::function<void()>> task_queue;
    std::mutex queue_mutex;
    std::condition_variable task_available;
    bool terminated;
};

inline ThreadPool::ThreadPool(size_t num_threads) : terminated(false) {
    for (size_t i = 0; i < num_threads; ++i) {
        thread_pool.emplace_back([this] {
            while (true) {
                std::function<void()> current_task;
                {
                    std::unique_lock<std::mutex> lock(queue_mutex);
                    task_available.wait(lock, [this] {
                        return terminated || !task_queue.empty();
                    });
                    if (terminated && task_queue.empty()) return;
                    current_task = std::move(task_queue.front());
                    task_queue.pop();
                }
                current_task();
            }
        });
    }
}

template<class F, class... Args>
auto ThreadPool::submit(F&& func, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type> {
    using result_type = typename std::result_of<F(Args...)>::type;
    auto packaged = std::make_shared<std::packaged_task<result_type()>>(
        std::bind(std::forward<F>(func), std::forward<Args>(args)...)
    );
    std::future<result_type> future_result = packaged->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if (terminated) throw std::runtime_error("Pool já encerrada");
        task_queue.emplace([packaged]() { (*packaged)(); });
    }
    task_available.notify_one();
    return future_result;
}

inline ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        terminated = true;
    }
    task_available.notify_all();
    for (auto& worker : thread_pool) worker.join();
}

#endif

Atualização para C++17

No C++17, std::result_of foi depreciado, substituído por std::invoke_result_t. Além disso, a captura inicializadora em lambdas e std::invoke simplificam o código.

#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP

#include <functional>
#include <future>
#include <queue>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads);
    template<class F, class... Args>
    decltype(auto) submit(F&& func, Args&&... args);
    ~ThreadPool();

private:
    std::vector<std::thread> worker_threads;
    std::queue<std::packaged_task<void()>> pending_tasks;
    std::mutex sync_mutex;
    std::condition_variable worker_cond;
    std::condition_variable drain_cond;
    bool stop_flag;
};

inline ThreadPool::ThreadPool(size_t num_threads) : stop_flag(false) {
    for (size_t i = 0; i < num_threads; ++i) {
        worker_threads.emplace_back([this] {
            while (true) {
                std::packaged_task<void()> task;
                {
                    std::unique_lock<std::mutex> lock(sync_mutex);
                    worker_cond.wait(lock, [this] {
                        return stop_flag || !pending_tasks.empty();
                    });
                    if (stop_flag && pending_tasks.empty()) return;
                    task = std::move(pending_tasks.front());
                    pending_tasks.pop();
                    if (pending_tasks.empty()) drain_cond.notify_one();
                }
                task();
            }
        });
    }
}

template<class F, class... Args>
decltype(auto) ThreadPool::submit(F&& func, Args&&... args) {
    using return_t = std::invoke_result_t<F, Args...>;
    std::packaged_task<return_t()> task(
        std::bind(std::forward<F>(func), std::forward<Args>(args)...)
    );
    auto future = task.get_future();
    {
        std::unique_lock<std::mutex> lock(sync_mutex);
        if (stop_flag) throw std::runtime_error("Envio em pool parada");
        pending_tasks.emplace(std::move(task));
    }
    worker_cond.notify_one();
    return future;
}

inline ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(sync_mutex);
        drain_cond.wait(lock, [this] { return pending_tasks.empty(); });
        stop_flag = true;
    }
    worker_cond.notify_all();
    for (auto& w : worker_threads) w.join();
}

#endif

Exemplo de Uso

ThreadPool pool(4);
auto result = pool.submit([](int val) { return val * 2; }, 21);
std::cout << result.get() << std::endl; // Saída: 42

Dimensionamento da Pool de Threads

O número ideal de threads depende de fatores como configuração do servidor, recursos disponíveis e natureza das tarefas. Consideram-se aspectos como quantidade de CPUs, memória, IOPS de I/O, e se as tarefas são intensivas em CPU, I/O ou mistas.

Tarefas Intensivas em CPU

Para tarefas com alta utilização de CPU e pouco I/O, o excesso de threads causa troca de contexto desnecessária. Recomenda-se definir o número de threads como número de CPUs + 1.

Tarefas Intensivas em I/O

Tarefas que dependem de operações de I/O (como consultas a banco de dados) deixam a CPU ociosa durante as esperas. Para maximizar o uso da CPU, aumente o número de threads, tipicamente 2 × número de CPUs + 1, equilibrando a sobrecarga de troca de contexto.

Tarefas Mistas

Se possível, divida tarefas mistas em componentes de CPU e I/O para otimização. Se os tempos de execução forem desequilibrados, o ganho pode ser mínimo.

Fórmula de Impedância

Uma abordagem empírica para dimensionamento baseia-se na proporção de tempo de CPU versus I/O:

N = Número de CPUs disponíveis
P = Fração de tempo de CPU ocupado (0 < P ≤ 1)
T = Número recomendado de threads
T = N / P

Essa fórmula ajuda a ajustar a pool para impedância ideal entre CPU e I/O.

Tags: C++11 C++17 ThreadPool Multithreading Concurrency

Publicado em 6-10 22:38 por Thomas