Pool de Threads com Fila Única de Tarefas
Utilizando as bibliotecas padrão modernas do C++ (como threads, mutexes e variáveis de condição), é possível construir uma pool de threads com uma única fila de tarefas de maneira eficiente. O conceito básico envolve inicializar um conjunto de threads na construção da pool e finalizá-los na destruição, expondo uma interface para submissão de tarefas.
Design da Interface
A interface principal inclui um construtor que aceita o número de threads e um método de enqueue para adicionar tarefas. O método de enqueue é um template que retorna um future, permitindo a obtenção do resultado assíncrono.
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());
template<typename F, typename... Args>
auto enqueue(F &&f, Args &&...args);
O parâmetro de entrada aceita um objeto chamável e seus argumentos, aproveitando templates de parâmetros variáveis do C++11 para flexibilidade.
Implementação Básica (C++11)
A classe contém um vetor de threads, uma fila de tarefas, um mutex para sincronização, uma variável de condição para notificação e uma flag para parada. A abordagem RAII garante que a pool seja destruída automaticamente.
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads);
template<class F, class... Args>
auto submit(F&& func, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
std::vector<std::thread> thread_pool;
std::queue<std::function<void()>> task_queue;
std::mutex queue_mutex;
std::condition_variable task_available;
bool terminated;
};
inline ThreadPool::ThreadPool(size_t num_threads) : terminated(false) {
for (size_t i = 0; i < num_threads; ++i) {
thread_pool.emplace_back([this] {
while (true) {
std::function<void()> current_task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
task_available.wait(lock, [this] {
return terminated || !task_queue.empty();
});
if (terminated && task_queue.empty()) return;
current_task = std::move(task_queue.front());
task_queue.pop();
}
current_task();
}
});
}
}
template<class F, class... Args>
auto ThreadPool::submit(F&& func, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using result_type = typename std::result_of<F(Args...)>::type;
auto packaged = std::make_shared<std::packaged_task<result_type()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
std::future<result_type> future_result = packaged->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (terminated) throw std::runtime_error("Pool já encerrada");
task_queue.emplace([packaged]() { (*packaged)(); });
}
task_available.notify_one();
return future_result;
}
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
terminated = true;
}
task_available.notify_all();
for (auto& worker : thread_pool) worker.join();
}
#endif
Atualização para C++17
No C++17, std::result_of foi depreciado, substituído por std::invoke_result_t. Além disso, a captura inicializadora em lambdas e std::invoke simplificam o código.
#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP
#include <functional>
#include <future>
#include <queue>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads);
template<class F, class... Args>
decltype(auto) submit(F&& func, Args&&... args);
~ThreadPool();
private:
std::vector<std::thread> worker_threads;
std::queue<std::packaged_task<void()>> pending_tasks;
std::mutex sync_mutex;
std::condition_variable worker_cond;
std::condition_variable drain_cond;
bool stop_flag;
};
inline ThreadPool::ThreadPool(size_t num_threads) : stop_flag(false) {
for (size_t i = 0; i < num_threads; ++i) {
worker_threads.emplace_back([this] {
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(sync_mutex);
worker_cond.wait(lock, [this] {
return stop_flag || !pending_tasks.empty();
});
if (stop_flag && pending_tasks.empty()) return;
task = std::move(pending_tasks.front());
pending_tasks.pop();
if (pending_tasks.empty()) drain_cond.notify_one();
}
task();
}
});
}
}
template<class F, class... Args>
decltype(auto) ThreadPool::submit(F&& func, Args&&... args) {
using return_t = std::invoke_result_t<F, Args...>;
std::packaged_task<return_t()> task(
std::bind(std::forward<F>(func), std::forward<Args>(args)...)
);
auto future = task.get_future();
{
std::unique_lock<std::mutex> lock(sync_mutex);
if (stop_flag) throw std::runtime_error("Envio em pool parada");
pending_tasks.emplace(std::move(task));
}
worker_cond.notify_one();
return future;
}
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(sync_mutex);
drain_cond.wait(lock, [this] { return pending_tasks.empty(); });
stop_flag = true;
}
worker_cond.notify_all();
for (auto& w : worker_threads) w.join();
}
#endif
Exemplo de Uso
ThreadPool pool(4);
auto result = pool.submit([](int val) { return val * 2; }, 21);
std::cout << result.get() << std::endl; // Saída: 42
Dimensionamento da Pool de Threads
O número ideal de threads depende de fatores como configuração do servidor, recursos disponíveis e natureza das tarefas. Consideram-se aspectos como quantidade de CPUs, memória, IOPS de I/O, e se as tarefas são intensivas em CPU, I/O ou mistas.
Tarefas Intensivas em CPU
Para tarefas com alta utilização de CPU e pouco I/O, o excesso de threads causa troca de contexto desnecessária. Recomenda-se definir o número de threads como número de CPUs + 1.
Tarefas Intensivas em I/O
Tarefas que dependem de operações de I/O (como consultas a banco de dados) deixam a CPU ociosa durante as esperas. Para maximizar o uso da CPU, aumente o número de threads, tipicamente 2 × número de CPUs + 1, equilibrando a sobrecarga de troca de contexto.
Tarefas Mistas
Se possível, divida tarefas mistas em componentes de CPU e I/O para otimização. Se os tempos de execução forem desequilibrados, o ganho pode ser mínimo.
Fórmula de Impedância
Uma abordagem empírica para dimensionamento baseia-se na proporção de tempo de CPU versus I/O:
N = Número de CPUs disponíveis
P = Fração de tempo de CPU ocupado (0 < P ≤ 1)
T = Número recomendado de threads
T = N / P
Essa fórmula ajuda a ajustar a pool para impedância ideal entre CPU e I/O.