Empacotamento de Mensagens TLV e Resolução de Pacotes Colados em TCP com Go

Aprimorando a Estrutura de Requisições

Inicialmente, o servidor processava os dados brutos recebidos armazenando-os diretamente em um slice de bytes dentro da estrutura de requisição. Essa abordagem é limitada, pois não permite identificar o tipo ou o tamanho da mensagem de forma nativa. Para evoluir a arquitetura, abstraímos o campo de dados para um tipo de mensagem dediccado, encapsulando as informações de controle e o payload.

Definindo a Interface e a Estrutura da Mensagem

Criamos uma interface para padronizar o comportamento dos pacotes de rede, permitindo a obtenção e alteração do identificador, tamanho e conteúdo.

package ziface

// IPacket define o contrato para mensagens de rede
type IPacket interface {
	GetPayloadSize() uint32 // Retorna o tamanho do payload
	GetPacketID() uint32    // Retorna o identificador do pacote
	GetPayload() []byte     // Retorna o conteúdo da mensagem

	SetPacketID(uint32)    // Define o ID do pacote
	SetPayload([]byte)     // Define o conteúdo
	SetPayloadSize(uint32) // Define o tamanho do payload
}

Em seguida, implementamos essa interface em uma estrutura concreta no pacote de rede.

package znet

import "zinx/ziface"

// NetworkPacket representa a mensagem transmitida na rede
type NetworkPacket struct {
	ID          uint32 // Identificador da mensagem
	PayloadSize uint32 // Comprimento dos dados
	Payload     []byte // Dados efetivos
}

// NewPacket instancia um novo pacote de rede
func NewPacket(id uint32, payload []byte) *NetworkPacket {
	return &NetworkPacket{
		ID:          id,
		PayloadSize: uint32(len(payload)),
		Payload:     payload,
	}
}

// GetPayloadSize obtém o tamanho do payload
func (p *NetworkPacket) GetPayloadSize() uint32 {
	return p.PayloadSize
}

// GetPacketID obtém o ID do pacote
func (p *NetworkPacket) GetPacketID() uint32 {
	return p.ID
}

// GetPayload obtém os dados da mensagem
func (p *NetworkPacket) GetPayload() []byte {
	return p.Payload
}

// SetPayloadSize define o tamanho do payload
func (p *NetworkPacket) SetPayloadSize(size uint32) {
	p.PayloadSize = size
}

// SetPacketID define o ID do pacote
func (p *NetworkPacket) SetPacketID(id uint32) {
	p.ID = id
}

// SetPayload define os dados da mensagem
func (p *NetworkPacket) SetPayload(payload []byte) {
	p.Payload = payload
}

Codificação e Decodificação TLV

Como o TCP é um protocolo orientado a fluxo, ele não preserva as fronteiras das mensagens. Isso pode resultar na leitura de múltiplos pacotes de uma só vez ou na fragmentação de um único pacote. Para mitigar esse problema de agregação de pacotes (sticky packets), utilizamos o formato TLV (Tamanho-Tipo-Valor, adaptado aqui para Comprimento-ID-Dados).

A estratégia consiste em prefixar cada mensagem com um cabeçalho de tamanho fixo contendo o comprimento e o identificador. O receptor primeiro lê o cabeçalho para descobrir o tamanho do corpo da mensagem e, em seguida, lê a quantidade exata de bytes restante, garantindo a delimitação correta.

Interface do Codec

Definimos uma interface para o mecanismo de empacotamento e desempacotamento.

package ziface

// ICodec estabelece as regras para serialização e desserialização de pacotes
type ICodec interface {
	GetHeaderLength() uint32                 // Retorna o tamanho do cabeçalho
	Encode(pkt IPacket) ([]byte, error)      // Serializa o pacote
	Decode([]byte) (IPacket, error)          // Desserializa o cabeçalho
}

Implementação do Codec TLV

A implementação concreta utiliza a biblioteca encoding/binary para converter os dados em fluxos de bytes com endianness definida.

package znet

import (
	"bytes"
	"encoding/binary"
	"errors"
	"zinx/ziface"
	"zinx/settings"
)

// TLVCodec lida com a formatação dos pacotes
type TLVCodec struct{}

// NewTLVCodec cria uma instância do codec
func NewTLVCodec() *TLVCodec {
	return &TLVCodec{}
}

// GetHeaderLength retorna o tamanho fixo do cabeçalho (4 bytes para tamanho + 4 bytes para ID)
func (c *TLVCodec) GetHeaderLength() uint32 {
	return 8
}

// Encode transforma um IPacket em um slice de bytes
func (c *TLVCodec) Encode(pkt ziface.IPacket) ([]byte, error) {
	buffer := bytes.NewBuffer([]byte{})

	// Escreve o tamanho do payload
	if err := binary.Write(buffer, binary.LittleEndian, pkt.GetPayloadSize()); err != nil {
		return nil, err
	}

	// Escreve o ID do pacote
	if err := binary.Write(buffer, binary.LittleEndian, pkt.GetPacketID()); err != nil {
		return nil, err
	}

	// Escreve o payload
	if err := binary.Write(buffer, binary.LittleEndian, pkt.GetPayload()); err != nil {
		return nil, err
	}

	return buffer.Bytes(), nil
}

// Decode extrai as informações do cabeçalho a partir dos bytes recebidos
func (c *TLVCodec) Decode(rawData []byte) (ziface.IPacket, error) {
	reader := bytes.NewReader(rawData)
	pkt := &NetworkPacket{}

	// Lê o tamanho do payload
	if err := binary.Read(reader, binary.LittleEndian, &pkt.PayloadSize); err != nil {
		return nil, err
	}

	// Lê o ID do pacote
	if err := binary.Read(reader, binary.LittleEndian, &pkt.ID); err != nil {
		return nil, err
	}

	// Validação de segurança para evitar alocações excessivas
	if settings.Conf.MaxPacketSize > 0 && pkt.PayloadSize > settings.Conf.MaxPacketSize {
		return nil, errors.New("payload excede o tamanho máximo permitido")
	}

	return pkt, nil
}

Integração ao Núcleo do Framewrok

Com a lógica de serialização pronta, precisamos integrá-la ao ciclo de vida das conexões e requisições.

Atualizando a Requisição

A interface de requisição agora expõe o identificador da mensagem, e a estrutura interna armazena a instância de IPacket.

// IRequest encapsula os dados da requisição do cliente
type IRequest interface {
	GetConnection() IConnection
	GetData() []byte
	GetMsgID() uint32
}

// Request implementa IRequest
type Request struct {
	conn ziface.IConnection
	pkt  ziface.IPacket
}

func (r *Request) GetConnection() ziface.IConnection {
	return r.conn
}

func (r *Request) GetData() []byte {
	return r.pkt.GetPayload()
}

func (r *Request) GetMsgID() uint32 {
	return r.pkt.GetPacketID()
}

Processamento de Leitura na Conexão

O método responsável por ler dados do socket TCP deve ser ajustado para primeiro ler o cabeçalho, decodificá-lo e, subsequentemente, ler o corpo da mensagem.

func (c *Connection) StartReader() {
	fmt.Println("Goroutine de leitura iniciada")
	defer fmt.Println(c.RemoteAddr().String(), " leitor encerrado")
	defer c.Stop()

	codec := NewTLVCodec()

	for {
		// Lê o cabeçalho de tamanho fixo
		headerBytes := make([]byte, codec.GetHeaderLength())
		if _, err := io.ReadFull(c.GetTCPConnection(), headerBytes); err != nil {
			fmt.Println("Erro ao ler cabeçalho:", err)
			c.ExitBuffChan <- true
			break
		}

		// Decodifica o cabeçalho para obter metadados
		pkt, err := codec.Decode(headerBytes)
		if err != nil {
			fmt.Println("Erro na decodificação:", err)
			c.ExitBuffChan <- true
			break
		}

		// Lê o corpo da mensagem com base no tamanho extraído
		if pkt.GetPayloadSize() > 0 {
			payloadBytes := make([]byte, pkt.GetPayloadSize())
			if _, err := io.ReadFull(c.GetTCPConnection(), payloadBytes); err != nil {
				fmt.Println("Erro ao ler payload:", err)
				c.ExitBuffChan <- true
				break
			}
			pkt.SetPayload(payloadBytes)
		}

		// Encaminha a requisição para o roteador
		req := &Request{
			conn: c,
			pkt:  pkt,
		}

		go func(r ziface.IRequest) {
			c.Router.PreHandle(r)
			c.Router.Handle(r)
			c.Router.PostHandle(r)
		}(req)
	}
}

Envio de Mensagens

Para responder aos clientes, a conexão precisa de um método que serialize a resposta no formato TLV antes de escrevê-la no socket.

type IConnection interface {
	// ... outros métodos
	SendPacket(pktID uint32, payload []byte) error
}

func (c *Connection) SendPacket(pktID uint32, payload []byte) error {
	if c.isClosed {
		return errors.New("conexão fechada, impossível enviar dados")
	}

	codec := NewTLVCodec()
	rawData, err := codec.Encode(NewPacket(pktID, payload))
	if err != nil {
		return errors.New("falha na serialização do pacote")
	}

	if _, err := c.Conn.Write(rawData); err != nil {
		c.ExitBuffChan <- true
		return errors.New("falha ao escrever no socket")
	}
	
	return nil
}

Validação com Aplicação de Teste

Para validar a implementação, construímos um servidor simples que ecoa mensagens e um cliente que envia requisições contínuas.

Implementação do Servidor

package main

import (
	"fmt"
	"zinx/ziface"
	"zinx/znet"
)

// EchoRouter processa as requisições de teste
type EchoRouter struct {
	znet.BaseRouter
}

func (r *EchoRouter) Handle(req ziface.IRequest) {
	fmt.Printf("Mensagem recebida -> ID: %d, Conteúdo: %s\n", req.GetMsgID(), string(req.GetData()))
	
	// Responde ao cliente
	err := req.GetConnection().SendPacket(100, []byte("Recebido com sucesso!"))
	if err != nil {
		fmt.Println("Erro ao enviar resposta:", err)
	}
}

func main() {
	server := znet.NewServer()
	server.AddRouter(&EchoRouter{})
	server.Serve()
}

Impleemntação do Cliente

package main

import (
	"fmt"
	"io"
	"net"
	"time"
	"zinx/znet"
)

func main() {
	fmt.Println("Iniciando cliente de teste...")
	time.Sleep(2 * time.Second)

	conn, err := net.Dial("tcp", "127.0.0.1:8999")
	if err != nil {
		fmt.Println("Falha ao conectar:", err)
		return
	}
	defer conn.Close()

	codec := znet.NewTLVCodec()

	for i := 0; i < 3; i++ {
		// Envia mensagem
		payload := []byte(fmt.Sprintf("Mensagem de teste %d", i))
		rawData, _ := codec.Encode(znet.NewPacket(1, payload))
		conn.Write(rawData)

		// Lê resposta
		header := make([]byte, codec.GetHeaderLength())
		if _, err := io.ReadFull(conn, header); err != nil {
			fmt.Println("Erro ao ler cabeçalho da resposta")
			break
		}

		respPkt, _ := codec.Decode(header)
		if respPkt.GetPayloadSize() > 0 {
			respPayload := make([]byte, respPkt.GetPayloadSize())
			io.ReadFull(conn, respPayload)
			fmt.Printf("Resposta do servidor -> ID: %d, Dados: %s\n", respPkt.GetPacketID(), string(respPayload))
		}

		time.Sleep(1 * time.Second)
	}
}

Tags: go TCP tlv network-programming zinx

Publicado em 6-24 04:09