Biblioteca de Rede muduo 5

Ciclo de Evento em Grupo de Threads


class CicloDeEventoGrupoThread : noncopyable {
public:
    using CallbackInicializacaoThread = std::function<void>;

    CicloDeEventoGrupoThread(CicloDeEvento* cicloBase, const std::string &nomeArg);
    ~CicloDeEventoGrupoThread();

    void definirNumeroThreads(int numThreads) {
        numThreads_ = numThreads;
    }

    void iniciar(const CallbackInicializacaoThread& cb = CallbackInicializacaoThread());

    // Se operando em multi-threading, o ciclo base distribui canais para subciclos por rodízio
    CicloDeEvento* obterProximoCiclo();

    std::vector<ciclodeevento> obterTodosCiclos();

    bool iniciado() const {
        return iniciado_;
    }
    
    std::string nome() const {
        return nome_;
    }
    
private:
    CicloDeEvento* cicloBase_; // ciclo principal
    std::string nome_;
    bool iniciado_;
    int numThreads_;
    int proximo_;
    std::vector<:unique_ptr>> threads_;
    std::vector<ciclodeevento> ciclos_; // subciclos
};
</ciclodeevento></:unique_ptr></ciclodeevento></void>

Inicialização do Construtor


CicloDeEventoGrupoThread::CicloDeEventoGrupoThread(CicloDeEvento* cicloBase, const std::string &nomeArg)
    :cicloBase_(cicloBase)
    ,nome_(nomeArg)
    ,iniciado_(false)
    ,numThreads_(0)
    ,proximo_(0)
{
}

Destruição

O destrutor não precisa fazer nada especial, pois os CicloDeEvento são criados na pilha e liberados automaticmaente quando saem do escopo.


CicloDeEventoGrupoThread::~CicloDeEventoGrupoThread() {
    
}

Eniciando as Threads

Aqui realmente criamos as threads subjacentes, vinculando cada uma a um CicloDeEvento. O método ciclos_.push_back(t->iniciarCiclo()) retorna o endereço desse ciclo. Se numThreads_ for 0, significa que só existe o ciclo principal (uma única thread).


void CicloDeEventoGrupoThread::iniciar(const CallbackInicializacaoThread& cb) {
    iniciado_ = true;
    for (int i = 0; i < numThreads_; ++i) {
        char buf[nome_.size() + 32];
        snprintf(buf, sizeof buf, "%s%d", nome_.c_str(), i);
        CicloDeEventoThread *t = new CicloDeEventoThread(cb, buf);
        threads_.push_back(std::unique_ptr<ciclodeeventothread>(t));
        ciclos_.push_back(t->iniciarCiclo()); // Cria a thread subjacente, vincula um novo CicloDeEvento e retorna seu endereço
    }

    // Caso numThreads_ == 0, só existe o cicloBase como thread única
    if (numThreads_ == 0 && cb) {
        cb(cicloBase_);
    }
}
</ciclodeeventothread>

Obtendo o Próximo Ciclo por Rodízio

Inicialmente, só existe o ciclo principal. Em seguida, começamos a obter ciclos por rodízio, voltando ao início quando chegamos ao final.


CicloDeEvento* CicloDeEventoGrupoThread::obterProximoCiclo() {
    CicloDeEvento* ciclo = cicloBase_;
    // Rodízio
    if (!ciclos_.empty()) {    // Obtém o próximo ciclo para processamento de eventos por rodízio
        ciclo = ciclos_[proximo_];
        ++proximo_;
        if (proximo_ >= ciclos_.size()) {
            proximo_ = 0;
        }
    }
    return ciclo;
}

Retornando Todos os Ciclos Criados


std::vector<ciclodeevento> CicloDeEventoGrupoThread::obterTodosCiclos() {
    if (ciclos_.empty()) {
        return std::vector<ciclodeevento>(1, cicloBase_);
    } else {
        return ciclos_;
    }
}
</ciclodeevento></ciclodeevento>

Encapsulamento de Socket de Baixxo Nível


class Soquete : noncopyable {
public:
    explicit Soquete(int descritor)
    :descritor_(descritor) {
    }

    ~Soquete();

    int descritor() const {
        return descritor_;
    }

    void vincularEndereco(EnderecoInet& enderecoLocal);
    void escutar();
    int aceitar(EnderecoInet* enderecoPar);

    void desativarEscrita();
    void setSemAtrasoTcp(bool ativar);
    void setReutilizarEndereco(bool ativar);
    void setReutilizarPorta(bool ativar);
    void setManterAtivo(bool ativar);

private:
    const int descritor_;
};

Vinculando IP e Porta

Chama a função de sistema bind para vincular as informações de IP e porta passadas.


void Soquete::vincularEndereco(EnderecoInet& enderecoLocal) {
    if (0 != ::bind(descritor_, (sockaddr*)enderecoLocal.getEnderecoSock(), sizeof(sockaddr_in))) {
        LOG_FATAL("falha ao bind do descritor:%d\n", descritor_);
    }
}

Escutando o descritor de socket


void Soquete::escutar() {
    if (0 != ::listen(descritor_, 1024)) {
        LOG_FATAL("falha ao listen do descritor:%d\n", descritor_);
    }
}

Aceitando novas conexões

Armazena as informações da nova conexão através da função accept no addr, então retorna o novo descritor de conexão. Como obtemos as informações do cliente através da chamada de sistema accept (armazenadas na estrutura sockaddr_in addr), mas essas informações são armazenadas em uma variável local na pilha que se torna inválida após o retorno da função, precisamos salvar essas informações no objeto InetAddress passado pelo chamador para que possam ser usadas após o retorno da função.


int Soquete::aceitar(EnderecoInet* enderecoPar) {
    sockaddr_in addr;
    socklen_t len;
    bzero(&addr, sizeof addr);
    int descritorConexao = ::accept(descritor_, (sockaddr*)&addr, &len);
    if (descritorConexao > 0) {
        enderecoPar->setEnderecoSock(addr);
    }
    return descritorConexao;
}

Desativando o canal de escrita


void Soquete::desativarEscrita() {
    if (::shutdown(descritor_, SHUT_WR) < 0) {
        LOG_ERROR("erro ao desativar escrita");
    }
}

Destrutor


Soquete::~Soquete() {
    close(descritor_);
}

Tags: rede muduo EventLoop socket threading

Publicado em 6-11 06:26 por Thomas