Configuração do Kafka+ZooKeeper e Implementação de Coleta de Logs em Java

Instalação do ZooKeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

Instalação do Kafka

docker run -d --name kafka_server \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
--env KAFKA_HEAP_OPTS=-Xmx512M \
--env KAFKA_HEAP_OPTS=-Xms256M \
-e KAFKA_ZOOKEEPER_CONNECT=[ip_interno]:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[ip_externo]:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0

Utilização do SLF4J com Lombok para Logs

Requisitos do projeto:

  • Exibir logs de acesso no console para depuração
  • Enviar dados de negócio para o Kafka
  • Framewroks populares: Log4j, Logback, SLF4J

Relação entre Log4j, Logback e SLF4J

O SLF4J (Simple Logging Facade for Java) utiliza o padrão de projeto Facade:

  • Fornece uma interface abstrata para diferentes implementações de logging
  • Implementações disponíveis: Log4j, Logback
  • O Logback foi criado pelo mesmo autor do Log4j e oferece recursos superiores
  • O Logback é a implementação nativa do SLF4J
  • Log4j e Logback podem ser usados isoladamente ou com SLF4J

Recomendação: Utilize sempre SLF4J para desacoplar o código das implementações de logging.

Dependências Maven

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Configuração do application.properties

spring.kafka.bootstrap-servers=[ip_externo]:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3

Configuração do Logback

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_PATH" value="./application/logs/messages" />

    <appender name="console_output" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="file_output" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_PATH}/application.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_PATH}/application-%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss} %level [%thread] %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="com.example.messaging.service.MessageProducer" level="DEBUG" additivity="false">
        <appender-ref ref="file_output" />
        <appender-ref ref="console_output" />
    </logger>

    <root level="INFO">
        <appender-ref ref="console_output" />
    </root>
</configuration>

Implementação do Serviço de Mensagens

@Service
@Slf4j
public class MessageProducerService {

    private static final String EVENT_TOPIC = "application_events_topic";

    @Autowired
    private KafkaTemplate<String, String> messageBroker;

    public void publishEvent(HttpServletRequest request, String identifier, Long userId) {
        String clientIp = RequestUtils.extractClientIp(request);
        Map<String, String> metadata = new HashMap<>();
        
        metadata.put("userId", userId.toString());
        metadata.put("sessionId", request.getSession(false) != null ? 
            request.getSession().getId() : "none");
        metadata.put("userAgent", request.getHeader("User-Agent"));

        EventPayload payload = EventPayload.builder()
                .eventType("USER_ACCESS_EVENT")
                .eventData(metadata)
                .clientIp(clientIp)
                .timestamp(System.currentTimeMillis())
                .correlationId(identifier)
                .build();

        String serializedPayload = JsonUtils.toJson(payload);

        log.debug("Enviando evento para o Kafka: {}", serializedPayload);
        
        ListenableFuture<SendResult<String, String>> future = 
            messageBroker.send(EVENT_TOPIC, identifier, serializedPayload);
        
        future.addCallback(result -> {
            log.info("Evento enviado com sucesso - Partição: {}, Offset: {}", 
                result.getRecordMetadata().partition(),
                result.getRecordMetadata().offset());
        }, error -> {
            log.error("Falha ao enviar evento para o Kafka", error);
        });
    }
}

Comandos do Kafka

# Criar tópico
./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic application_events_topic

# Listar tópicos
./kafka-topics.sh --list --zookeeper 172.17.0.1:2181

# Excluir tópico
./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic application_events_topic

# Consumir mensagens
./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic application_events_topic

# Produzir mensagens
./kafka-console-producer.sh --broker-list 192.168.75.146:9092 --topic application_events_topic

Exemplo de Controller

@RestController
@Slf4j
public class AccessController {

    @Autowired
    private LinkService linkManagement;

    @Autowired
    private MessageProducerService eventPublisher;

    @GetMapping("/access/{code}")
    public ResponseEntity<?> handleAccess(
            @PathVariable String code,
            HttpServletRequest request) {

        log.info("Requisição recebida para o código: {}", code);
        
        eventPublisher.publishEvent(request, code, 12345L);
        
        return ResponseEntity.ok().build();
    }
}

Tags: Docker kafka zookeeper logback slf4j

Publicado em 6-16 23:53