Ciclo de Evento em Grupo de Threads
class CicloDeEventoGrupoThread : noncopyable {
public:
using CallbackInicializacaoThread = std::function<void>;
CicloDeEventoGrupoThread(CicloDeEvento* cicloBase, const std::string &nomeArg);
~CicloDeEventoGrupoThread();
void definirNumeroThreads(int numThreads) {
numThreads_ = numThreads;
}
void iniciar(const CallbackInicializacaoThread& cb = CallbackInicializacaoThread());
// Se operando em multi-threading, o ciclo base distribui canais para subciclos por rodízio
CicloDeEvento* obterProximoCiclo();
std::vector<ciclodeevento> obterTodosCiclos();
bool iniciado() const {
return iniciado_;
}
std::string nome() const {
return nome_;
}
private:
CicloDeEvento* cicloBase_; // ciclo principal
std::string nome_;
bool iniciado_;
int numThreads_;
int proximo_;
std::vector<:unique_ptr>> threads_;
std::vector<ciclodeevento> ciclos_; // subciclos
};
</ciclodeevento></:unique_ptr></ciclodeevento></void>
Inicialização do Construtor
CicloDeEventoGrupoThread::CicloDeEventoGrupoThread(CicloDeEvento* cicloBase, const std::string &nomeArg)
:cicloBase_(cicloBase)
,nome_(nomeArg)
,iniciado_(false)
,numThreads_(0)
,proximo_(0)
{
}
Destruição
O destrutor não precisa fazer nada especial, pois os CicloDeEvento são criados na pilha e liberados automaticmaente quando saem do escopo.
CicloDeEventoGrupoThread::~CicloDeEventoGrupoThread() {
}
Eniciando as Threads
Aqui realmente criamos as threads subjacentes, vinculando cada uma a um CicloDeEvento. O método ciclos_.push_back(t->iniciarCiclo()) retorna o endereço desse ciclo. Se numThreads_ for 0, significa que só existe o ciclo principal (uma única thread).
void CicloDeEventoGrupoThread::iniciar(const CallbackInicializacaoThread& cb) {
iniciado_ = true;
for (int i = 0; i < numThreads_; ++i) {
char buf[nome_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", nome_.c_str(), i);
CicloDeEventoThread *t = new CicloDeEventoThread(cb, buf);
threads_.push_back(std::unique_ptr<ciclodeeventothread>(t));
ciclos_.push_back(t->iniciarCiclo()); // Cria a thread subjacente, vincula um novo CicloDeEvento e retorna seu endereço
}
// Caso numThreads_ == 0, só existe o cicloBase como thread única
if (numThreads_ == 0 && cb) {
cb(cicloBase_);
}
}
</ciclodeeventothread>
Obtendo o Próximo Ciclo por Rodízio
Inicialmente, só existe o ciclo principal. Em seguida, começamos a obter ciclos por rodízio, voltando ao início quando chegamos ao final.
CicloDeEvento* CicloDeEventoGrupoThread::obterProximoCiclo() {
CicloDeEvento* ciclo = cicloBase_;
// Rodízio
if (!ciclos_.empty()) { // Obtém o próximo ciclo para processamento de eventos por rodízio
ciclo = ciclos_[proximo_];
++proximo_;
if (proximo_ >= ciclos_.size()) {
proximo_ = 0;
}
}
return ciclo;
}
Retornando Todos os Ciclos Criados
std::vector<ciclodeevento> CicloDeEventoGrupoThread::obterTodosCiclos() {
if (ciclos_.empty()) {
return std::vector<ciclodeevento>(1, cicloBase_);
} else {
return ciclos_;
}
}
</ciclodeevento></ciclodeevento>
Encapsulamento de Socket de Baixxo Nível
class Soquete : noncopyable {
public:
explicit Soquete(int descritor)
:descritor_(descritor) {
}
~Soquete();
int descritor() const {
return descritor_;
}
void vincularEndereco(EnderecoInet& enderecoLocal);
void escutar();
int aceitar(EnderecoInet* enderecoPar);
void desativarEscrita();
void setSemAtrasoTcp(bool ativar);
void setReutilizarEndereco(bool ativar);
void setReutilizarPorta(bool ativar);
void setManterAtivo(bool ativar);
private:
const int descritor_;
};
Vinculando IP e Porta
Chama a função de sistema bind para vincular as informações de IP e porta passadas.
void Soquete::vincularEndereco(EnderecoInet& enderecoLocal) {
if (0 != ::bind(descritor_, (sockaddr*)enderecoLocal.getEnderecoSock(), sizeof(sockaddr_in))) {
LOG_FATAL("falha ao bind do descritor:%d\n", descritor_);
}
}
Escutando o descritor de socket
void Soquete::escutar() {
if (0 != ::listen(descritor_, 1024)) {
LOG_FATAL("falha ao listen do descritor:%d\n", descritor_);
}
}
Aceitando novas conexões
Armazena as informações da nova conexão através da função accept no addr, então retorna o novo descritor de conexão. Como obtemos as informações do cliente através da chamada de sistema accept (armazenadas na estrutura sockaddr_in addr), mas essas informações são armazenadas em uma variável local na pilha que se torna inválida após o retorno da função, precisamos salvar essas informações no objeto InetAddress passado pelo chamador para que possam ser usadas após o retorno da função.
int Soquete::aceitar(EnderecoInet* enderecoPar) {
sockaddr_in addr;
socklen_t len;
bzero(&addr, sizeof addr);
int descritorConexao = ::accept(descritor_, (sockaddr*)&addr, &len);
if (descritorConexao > 0) {
enderecoPar->setEnderecoSock(addr);
}
return descritorConexao;
}
Desativando o canal de escrita
void Soquete::desativarEscrita() {
if (::shutdown(descritor_, SHUT_WR) < 0) {
LOG_ERROR("erro ao desativar escrita");
}
}
Destrutor
Soquete::~Soquete() {
close(descritor_);
}