Instalação do ZooKeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
Instalação do Kafka
docker run -d --name kafka_server \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
--env KAFKA_HEAP_OPTS=-Xmx512M \
--env KAFKA_HEAP_OPTS=-Xms256M \
-e KAFKA_ZOOKEEPER_CONNECT=[ip_interno]:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[ip_externo]:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
Utilização do SLF4J com Lombok para Logs
Requisitos do projeto:
- Exibir logs de acesso no console para depuração
- Enviar dados de negócio para o Kafka
- Framewroks populares: Log4j, Logback, SLF4J
Relação entre Log4j, Logback e SLF4J
O SLF4J (Simple Logging Facade for Java) utiliza o padrão de projeto Facade:
- Fornece uma interface abstrata para diferentes implementações de logging
- Implementações disponíveis: Log4j, Logback
- O Logback foi criado pelo mesmo autor do Log4j e oferece recursos superiores
- O Logback é a implementação nativa do SLF4J
- Log4j e Logback podem ser usados isoladamente ou com SLF4J
Recomendação: Utilize sempre SLF4J para desacoplar o código das implementações de logging.
Dependências Maven
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configuração do application.properties
spring.kafka.bootstrap-servers=[ip_externo]:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
Configuração do Logback
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_PATH" value="./application/logs/messages" />
<appender name="console_output" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="file_output" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/application-%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %level [%thread] %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.example.messaging.service.MessageProducer" level="DEBUG" additivity="false">
<appender-ref ref="file_output" />
<appender-ref ref="console_output" />
</logger>
<root level="INFO">
<appender-ref ref="console_output" />
</root>
</configuration>
Implementação do Serviço de Mensagens
@Service
@Slf4j
public class MessageProducerService {
private static final String EVENT_TOPIC = "application_events_topic";
@Autowired
private KafkaTemplate<String, String> messageBroker;
public void publishEvent(HttpServletRequest request, String identifier, Long userId) {
String clientIp = RequestUtils.extractClientIp(request);
Map<String, String> metadata = new HashMap<>();
metadata.put("userId", userId.toString());
metadata.put("sessionId", request.getSession(false) != null ?
request.getSession().getId() : "none");
metadata.put("userAgent", request.getHeader("User-Agent"));
EventPayload payload = EventPayload.builder()
.eventType("USER_ACCESS_EVENT")
.eventData(metadata)
.clientIp(clientIp)
.timestamp(System.currentTimeMillis())
.correlationId(identifier)
.build();
String serializedPayload = JsonUtils.toJson(payload);
log.debug("Enviando evento para o Kafka: {}", serializedPayload);
ListenableFuture<SendResult<String, String>> future =
messageBroker.send(EVENT_TOPIC, identifier, serializedPayload);
future.addCallback(result -> {
log.info("Evento enviado com sucesso - Partição: {}, Offset: {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}, error -> {
log.error("Falha ao enviar evento para o Kafka", error);
});
}
}
Comandos do Kafka
# Criar tópico
./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic application_events_topic
# Listar tópicos
./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
# Excluir tópico
./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic application_events_topic
# Consumir mensagens
./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic application_events_topic
# Produzir mensagens
./kafka-console-producer.sh --broker-list 192.168.75.146:9092 --topic application_events_topic
Exemplo de Controller
@RestController
@Slf4j
public class AccessController {
@Autowired
private LinkService linkManagement;
@Autowired
private MessageProducerService eventPublisher;
@GetMapping("/access/{code}")
public ResponseEntity<?> handleAccess(
@PathVariable String code,
HttpServletRequest request) {
log.info("Requisição recebida para o código: {}", code);
eventPublisher.publishEvent(request, code, 12345L);
return ResponseEntity.ok().build();
}
}