Encapsulamento do Java NIO na camada de rede do Apache Kafka

TransportLayer

TransportLayer herda de ScatteringByteChannel e GatheringByteChannel, o que permite ler e escrever vários buffers em uma única chamada. Isso é útil porque, em protocolos de rede, cabeçalho e corpo podem ficar em buffers distintos.

As duas implementações são PlaintextTransportLayer, para tráfego sem criptografia, e SslTransportLayer, para tráfego criptografado. A versão em texto claro apenas delega as operações de I/O para o SocketChannel subjacente.

public class PlaintextTransportLayer implements TransportLayer {
    private final SelectionKey selectionKey;
    private final SocketChannel socketChannel;
    private final Principal principal = KafkaPrincipal.ANONYMOUS;

    public PlaintextTransportLayer(SelectionKey key) throws IOException {
        this.selectionKey = key;
        this.socketChannel = (SocketChannel) key.channel();
    }

    @Override
    public int read(ByteBuffer destination) throws IOException {
        return socketChannel.read(destination);
    }

    @Override
    public long write(ByteBuffer[] sources) throws IOException {
        return socketChannel.write(sources);
    }

    @Override
    public boolean finishConnect() throws IOException {
        boolean connected = socketChannel.finishConnect();
        if (connected) {
            int ops = selectionKey.interestOps();
            selectionKey.interestOps(ops & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        }
        return connected;
    }
}

Como o canal é não bloqueante, finishConnect verifica se a conexão foi estabelecida e, em caso positivo, desativa o interesse por OP_CONNECT e ativa OP_READ. A camada SSL é mais complexa porque precisa criptografar os dados antes de enviá-los; por isso mantém um buffer interno netWriteBuffer e hasPendingWrites pode retornar true mesmo quando o SocketChannel ainda não enviou tudo. Na camada plaintext esse método sempre retorna false.

NetworkReceive

O TCP entrega um fluxo contínuo de bytes, sem noção de "mensagem". Por isso, o Kafka usa um cabeçalho de 4 bytes que indica o tamanho do corpo. Cada mensagem tem, portanto, o formato [tamanho: 4 bytes][corpo: N bytes]. NetworkReceive é responsável por ler primeiro o cabeçalho, alocar o buffer do corpo e, depois, consumir o corpo.

public class NetworkReceive implements Receive {
    private final String source;
    private final ByteBuffer sizeHeader;
    private final int maxSize;
    private final MemoryPool memoryPool;
    private int expectedSize = -1;
    private ByteBuffer payload;

    public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
        this.maxSize = maxSize;
        this.source = source;
        this.memoryPool = memoryPool;
        this.sizeHeader = ByteBuffer.allocate(4);
    }

    public long readFrom(ScatteringByteChannel channel) throws IOException {
        long totalRead = 0;

        if (sizeHeader.hasRemaining()) {
            int read = channel.read(sizeHeader);
            if (read < 0) throw new EOFException();
            totalRead += read;

            if (!sizeHeader.hasRemaining()) {
                sizeHeader.rewind();
                expectedSize = sizeHeader.getInt();
                if (expectedSize < 0)
                    throw new InvalidReceiveException("Invalid size: " + expectedSize);
                if (maxSize != UNLIMITED && expectedSize > maxSize)
                    throw new InvalidReceiveException("Size " + expectedSize + " exceeds max " + maxSize);
                if (expectedSize == 0) {
                    payload = ByteBuffer.allocate(0);
                }
            }
        }

        if (payload == null && expectedSize != -1) {
            payload = memoryPool.tryAllocate(expectedSize);
        }

        if (payload != null) {
            int read = channel.read(payload);
            if (read < 0) throw new EOFException();
            totalRead += read;
        }

        return totalRead;
    }

    public boolean complete() {
        return !sizeHeader.hasRemaining() && payload != null && !payload.hasRemaining();
    }

    public ByteBuffer payload() {
        return payload;
    }
}

Por causa do problema de fragmentação do TCP, podem ser necessárias várias chamadas a readFrom até que complete() indique que toda a mensagem foi recebida. O método payload() então devolve o buffer contendo o corpo.

NetworkSend

O envio é o inverso da recepção: a mensagem é precedida por um cabeçalho de 4 bytes com o tamanho do corpo. NetworkSend estende ByteBufferSend, que empacota um ou mais buffers e os escreve no canal.

public class ByteBufferSend implements Send {
    private final String destination;
    private final ByteBuffer[] buffers;
    private long remaining;
    private boolean pending;

    public ByteBufferSend(String destination, ByteBuffer... buffers) {
        this.destination = destination;
        this.buffers = buffers;
        for (ByteBuffer b : buffers) {
            remaining += b.remaining();
        }
    }

    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0) throw new EOFException("Negative write");
        remaining -= written;
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }

    public boolean completed() {
        return remaining <= 0 && !pending;
    }
}

public class NetworkSend extends ByteBufferSend {
    public NetworkSend(String destination, ByteBuffer message) {
        super(destination, sizeHeader(message.remaining()), message);
    }

    private static ByteBuffer sizeHeader(int size) {
        ByteBuffer header = ByteBuffer.allocate(4);
        header.putInt(size);
        header.rewind();
        return header;
    }
}

Apenas a camada SSL mantém dados pendentes após a escrita no SocketChannel, porque a criptografia produz um buffer intermediário. Por isso, completed() verifica tanto o número de bytes restantes quanto a existência de escrita pendente.

KafkaChannel

KafkaChannel representa uma conexão lógica entre cliente e servidor. Ele junta TransportLayer, NetworkReceive e NetworkSend, além de gerenciar autanticação, estado da conexão e silenciamento temporário.

public class KafkaChannel implements AutoCloseable {
    private final String id;
    private final TransportLayer transportLayer;
    private SocketAddress remoteAddress;
    private NetworkReceive receive;
    private Send send;
    private final int maxReceiveSize;
    private final MemoryPool memoryPool;
    private final Authenticator authenticator;
    private ChannelState state;
    private boolean disconnected;
    private ChannelMuteState muteState;
    private boolean midWrite;
}

O ciclo de vida começa com finishConnect, que finaliza a conexão não bloqueante e ajusta o estado do canal:

public boolean finishConnect() throws IOException {
    SocketChannel sc = transportLayer.socketChannel();
    if (sc != null) remoteAddress = sc.getRemoteAddress();
    boolean connected = transportLayer.finishConnect();
    if (connected) {
        if (ready()) {
            state = ChannelState.READY;
        } else if (remoteAddress != null) {
            state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
        } else {
            state = ChannelState.AUTHENTICATE;
        }
    }
    return connected;
}

Após a conexão, prepare executa o handshake SSL e a autenticação SASL. Em seguida, o canal pode ser usado para leitura e escrita. O envio começa em setSend, que registra o interesse por OP_WRITE, e a escrita propriamente dita ocorre em write:

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Send already in progress: " + id);
    this.send = send;
    transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

public long write() throws IOException {
    if (send == null) return 0;
    midWrite = true;
    return send.writeTo(transportLayer);
}

A leitura cria uma nova NetworkReceive quando necessário e tenta preenchê-la a partir do TransportLayer. Se o pool de memória não puder alocar o buffer do corpo, o canal é silenciado para evitar pressão adicinoal:

public long read() throws IOException {
    if (receive == null) {
        receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
    }
    long bytes = receive.readFrom(transportLayer);
    if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
        mute();
    }
    return bytes;
}

void mute() {
    if (muteState == ChannelMuteState.NOT_MUTED) {
        if (!disconnected) transportLayer.removeInterestOps(SelectionKey.OP_READ);
        muteState = ChannelMuteState.MUTED;
    }
}

Como o canal é não bloqueante, leituras e escritas podem precisar de várias iterações. Por isso, maybeCompleteSend e maybeCompleteReceive devolvem o objeto pronto apenas quando a operação terminou:

public Send maybeCompleteSend() {
    if (send != null && send.completed()) {
        midWrite = false;
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        Send result = send;
        send = null;
        return result;
    }
    return null;
}

public NetworkReceive maybeCompleteReceive() {
    if (receive != null && receive.complete()) {
        receive.payload().rewind();
        NetworkReceive result = receive;
        receive = null;
        return result;
    }
    return null;
}

O KSelector, que coordena todos esses canais por meio do java.nio.channels.Selector, será abordado na sequência.

Tags: Apache Kafka Java NIO TransportLayer NetworkReceive NetworkSend

Publicado em 6-27 06:54