Envio de Mensagens de Transação com RocketMQ e SpringBoot

Ambiente

  • JDK: 8u22
  • RocketMQ: rocketmq-all-4.5.2-bin-release
  • SpringBoot: 2.1.6.RELEASE
  • rocketmq-spring-boot-starter: 2.0.3

Procseso de Envio de Mensagens de Transação

O envio de mensagens de transação no RocketMQ segue um padrão de duas fases. Inicialmente, uma mensagem prepaartória é enviada ao broker, onde é armazenada temporariamente no tópico RMQ_SYS_TRANS_HALF_TOPIC. Após a conclusão da transação local, uma segunda confirmação é realizada para direcionar a mensagem ao tópico de destino original.

  1. O produtor envia uma mensagem preparatória (semi-mensagem) para o broker do MQ. O broker registra a mensagem localmente e retorna um status de confirmação ao produtor.
  2. Após o envio da mensagem preparatória, se o status indicar sucesso, a lógica de negócios local é executada. Com base no resultado, o produtor retorna manualmente um estado (como RocketMQLocalTransactionState.COMMIT ou RocketMQLocalTransactionState.ROLLBACK) ao MQ.
  3. Se o estado retornado for COMMIT, a mensagem preparatória é marcada como confirmável e enviada ao consumidor final. Se for ROLLBACK, a mensagem preparatória é descartada.
  4. Caso o MQ não receba o estado dentro de um tempo limite, uma tarefa agendada verifica periodicamente o status da transação local.
  5. A responsabilidade sobre as transações do produtor e consumidor recai sobre o desenvolvedor. O MQ garante a transação com base nos parâmetros configurados.

Produtor

O produtor é responsável por iniciar o envio de mensagens transacionais. O exemplo abaixo demonstra a implementação com SpringBoot.

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class ProdutorMensagem {

    @Resource
    private RocketMQTemplate mqTemplate;

    public TransactionSendResult enviarMsgTransacional(String payload, String topico, String tag) {
        log.info("Disparando mensagem: {}", payload);
        Message<String> mensagem = MessageBuilder.withPayload(payload).build();
        TransactionSendResult resultado = mqTemplate.sendMessageInTransaction("grupo_producer_springboot", topico, mensagem, tag);
        log.info("Resultado da transacao: {}", resultado.getLocalTransactionState());
        return resultado;
    }
}

Ouvinte de Transações do Produtor

O ouvinte de transações processa a lógica de negócios local e gerencia o estado da transação. O método executeLocalTransaction é invocado após o envio da mensagem preparatória. Se a execução local for bem-sucedida, o estado COMMIT é retornado; caso contrário, ROLLBACK é usado para desfazer a operação. O método checkLocalTransaction é chamado periodicamente pelo MQ para verificar transações pendentes, mas seu acionamento depende da versão e configuração.

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "grupo_producer_springboot")
public class OuvinteTransacaoProdutor implements RocketMQLocalTransactionListener {

    private final AtomicInteger contador = new AtomicInteger(0);
    private final ConcurrentHashMap<String, Object> cacheLocal = new ConcurrentHashMap<>();
    @Autowired
    private ServicoNegocioLocal servicoLocal;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            servicoLocal.executarTarefa(msg.getPayload().toString());
            log.info("Transacao local concluida. Mensagem: {}", msg);
            cacheLocal.put(msg.getHeaders().getId().toString(), msg.getPayload());
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("Falha na transacao local: {}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("Verificando estado da transacao...");
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

Consumidor

O consumidor recebe as mensagens confirmadas. A anotação @RocketMQMessageListener permite filtrar por tag usando selectorExpression. No entanto, se a fila estiver indisponível, a mensagem pode ser encaminhada para outra tag, o que significa que o filtro por tag não garante consumo exclusivo.

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RocketMQMessageListener(topic = "topico_springboot", consumerGroup = "grupo_consumidor_springboot")
public class ConsumidorMensagem implements RocketMQListener<MessageExt> {

    @Autowired
    private ServicoProcessamento processamento;

    @Override
    public void onMessage(MessageExt extensao) {
        try {
            String conteudo = new String(extensao.getBody(), "UTF-8");
            log.info("Mensagem recebida: {}, ID: {}", conteudo, extensao.getMsgId());
            processamento.processarMensagem(conteudo);
        } catch (Exception e) {
            log.error("Erro ao consumir mensagem: {}", e.getMessage());
        }
    }
}

Resultados de Execução

Ao enviar mensagens, os logs do produtor mostram o estado da transação. Por exemplo:

2019-12-04 17:36:43.711 INFO [produtor] ProdutorMensagem: Resultado da transacao: COMMIT_MESSAGE
2019-12-04 17:36:59.956 INFO [produtor] ProdutorMensagem: Disparando mensagem: primeiro
2019-12-04 17:36:59.958 INFO [servico] ServicoNegocioLocal: Executando tarefa com payload: [B@5dc81a2c
2019-12-04 17:36:59.958 INFO [ouvinte] OuvinteTransacaoProdutor: Transacao local concluida. Mensagem: GenericMessage [payload=byte[5], headers={rocketmq_TOPIC=topico_springboot, id=252178bf-1d37-2f33-0892-721a12c0fc23, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133ACA340005}]
2019-12-04 17:36:59.958 INFO [produtor] ProdutorMensagem: Resultado da transacao: COMMIT_MESSAGE

Resultados do Consumidor

O consumidor processa as mensagens confirmadas, como mostrado nos logs:

2019-12-04 17:36:59.966 INFO [consumidor] ConsumidorMensagem: Mensagem recebida: primeiro, ID: C0A80A8B264018B4AAC2133ACA340005
2019-12-04 17:36:59.966 INFO [processamento] ServicoProcessamento: Processando mensagem: primeiro

Repositório de Código

O código completo está disponível em: https://gitee.com/ghostsugar/rocketMq

Tags: RocketMQ SpringBoot java Transações em Mensagens microsservicos

Publicado em 6-10 04:59 por Thomas