TransportLayer
TransportLayer herda de ScatteringByteChannel e GatheringByteChannel, o que permite ler e escrever vários buffers em uma única chamada. Isso é útil porque, em protocolos de rede, cabeçalho e corpo podem ficar em buffers distintos.
As duas implementações são PlaintextTransportLayer, para tráfego sem criptografia, e SslTransportLayer, para tráfego criptografado. A versão em texto claro apenas delega as operações de I/O para o SocketChannel subjacente.
public class PlaintextTransportLayer implements TransportLayer {
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private final Principal principal = KafkaPrincipal.ANONYMOUS;
public PlaintextTransportLayer(SelectionKey key) throws IOException {
this.selectionKey = key;
this.socketChannel = (SocketChannel) key.channel();
}
@Override
public int read(ByteBuffer destination) throws IOException {
return socketChannel.read(destination);
}
@Override
public long write(ByteBuffer[] sources) throws IOException {
return socketChannel.write(sources);
}
@Override
public boolean finishConnect() throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected) {
int ops = selectionKey.interestOps();
selectionKey.interestOps(ops & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}
return connected;
}
}
Como o canal é não bloqueante, finishConnect verifica se a conexão foi estabelecida e, em caso positivo, desativa o interesse por OP_CONNECT e ativa OP_READ. A camada SSL é mais complexa porque precisa criptografar os dados antes de enviá-los; por isso mantém um buffer interno netWriteBuffer e hasPendingWrites pode retornar true mesmo quando o SocketChannel ainda não enviou tudo. Na camada plaintext esse método sempre retorna false.
NetworkReceive
O TCP entrega um fluxo contínuo de bytes, sem noção de "mensagem". Por isso, o Kafka usa um cabeçalho de 4 bytes que indica o tamanho do corpo. Cada mensagem tem, portanto, o formato [tamanho: 4 bytes][corpo: N bytes]. NetworkReceive é responsável por ler primeiro o cabeçalho, alocar o buffer do corpo e, depois, consumir o corpo.
public class NetworkReceive implements Receive {
private final String source;
private final ByteBuffer sizeHeader;
private final int maxSize;
private final MemoryPool memoryPool;
private int expectedSize = -1;
private ByteBuffer payload;
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.maxSize = maxSize;
this.source = source;
this.memoryPool = memoryPool;
this.sizeHeader = ByteBuffer.allocate(4);
}
public long readFrom(ScatteringByteChannel channel) throws IOException {
long totalRead = 0;
if (sizeHeader.hasRemaining()) {
int read = channel.read(sizeHeader);
if (read < 0) throw new EOFException();
totalRead += read;
if (!sizeHeader.hasRemaining()) {
sizeHeader.rewind();
expectedSize = sizeHeader.getInt();
if (expectedSize < 0)
throw new InvalidReceiveException("Invalid size: " + expectedSize);
if (maxSize != UNLIMITED && expectedSize > maxSize)
throw new InvalidReceiveException("Size " + expectedSize + " exceeds max " + maxSize);
if (expectedSize == 0) {
payload = ByteBuffer.allocate(0);
}
}
}
if (payload == null && expectedSize != -1) {
payload = memoryPool.tryAllocate(expectedSize);
}
if (payload != null) {
int read = channel.read(payload);
if (read < 0) throw new EOFException();
totalRead += read;
}
return totalRead;
}
public boolean complete() {
return !sizeHeader.hasRemaining() && payload != null && !payload.hasRemaining();
}
public ByteBuffer payload() {
return payload;
}
}
Por causa do problema de fragmentação do TCP, podem ser necessárias várias chamadas a readFrom até que complete() indique que toda a mensagem foi recebida. O método payload() então devolve o buffer contendo o corpo.
NetworkSend
O envio é o inverso da recepção: a mensagem é precedida por um cabeçalho de 4 bytes com o tamanho do corpo. NetworkSend estende ByteBufferSend, que empacota um ou mais buffers e os escreve no canal.
public class ByteBufferSend implements Send {
private final String destination;
private final ByteBuffer[] buffers;
private long remaining;
private boolean pending;
public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer b : buffers) {
remaining += b.remaining();
}
}
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0) throw new EOFException("Negative write");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
public boolean completed() {
return remaining <= 0 && !pending;
}
}
public class NetworkSend extends ByteBufferSend {
public NetworkSend(String destination, ByteBuffer message) {
super(destination, sizeHeader(message.remaining()), message);
}
private static ByteBuffer sizeHeader(int size) {
ByteBuffer header = ByteBuffer.allocate(4);
header.putInt(size);
header.rewind();
return header;
}
}
Apenas a camada SSL mantém dados pendentes após a escrita no SocketChannel, porque a criptografia produz um buffer intermediário. Por isso, completed() verifica tanto o número de bytes restantes quanto a existência de escrita pendente.
KafkaChannel
KafkaChannel representa uma conexão lógica entre cliente e servidor. Ele junta TransportLayer, NetworkReceive e NetworkSend, além de gerenciar autanticação, estado da conexão e silenciamento temporário.
public class KafkaChannel implements AutoCloseable {
private final String id;
private final TransportLayer transportLayer;
private SocketAddress remoteAddress;
private NetworkReceive receive;
private Send send;
private final int maxReceiveSize;
private final MemoryPool memoryPool;
private final Authenticator authenticator;
private ChannelState state;
private boolean disconnected;
private ChannelMuteState muteState;
private boolean midWrite;
}
O ciclo de vida começa com finishConnect, que finaliza a conexão não bloqueante e ajusta o estado do canal:
public boolean finishConnect() throws IOException {
SocketChannel sc = transportLayer.socketChannel();
if (sc != null) remoteAddress = sc.getRemoteAddress();
boolean connected = transportLayer.finishConnect();
if (connected) {
if (ready()) {
state = ChannelState.READY;
} else if (remoteAddress != null) {
state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
} else {
state = ChannelState.AUTHENTICATE;
}
}
return connected;
}
Após a conexão, prepare executa o handshake SSL e a autenticação SASL. Em seguida, o canal pode ser usado para leitura e escrita. O envio começa em setSend, que registra o interesse por OP_WRITE, e a escrita propriamente dita ocorre em write:
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Send already in progress: " + id);
this.send = send;
transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public long write() throws IOException {
if (send == null) return 0;
midWrite = true;
return send.writeTo(transportLayer);
}
A leitura cria uma nova NetworkReceive quando necessário e tenta preenchê-la a partir do TransportLayer. Se o pool de memória não puder alocar o buffer do corpo, o canal é silenciado para evitar pressão adicinoal:
public long read() throws IOException {
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
long bytes = receive.readFrom(transportLayer);
if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
mute();
}
return bytes;
}
void mute() {
if (muteState == ChannelMuteState.NOT_MUTED) {
if (!disconnected) transportLayer.removeInterestOps(SelectionKey.OP_READ);
muteState = ChannelMuteState.MUTED;
}
}
Como o canal é não bloqueante, leituras e escritas podem precisar de várias iterações. Por isso, maybeCompleteSend e maybeCompleteReceive devolvem o objeto pronto apenas quando a operação terminou:
public Send maybeCompleteSend() {
if (send != null && send.completed()) {
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
Send result = send;
send = null;
return result;
}
return null;
}
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
O KSelector, que coordena todos esses canais por meio do java.nio.channels.Selector, será abordado na sequência.