Implementação de um Componente de Roteamento Personalizado para Sharding de Banco de Dados

A implementação de sharding (fragmentação) de dados em sistemas distribuídos é uma estratégia comum para escalar aplicações. Este processo envolve a divisão de um banco de dados lógico em múltiplos bancos de dados físicos ou a divisão de tabelas grandes em tabelas menores, distribuindo-os por diferentes instâncias. Para gerenciar essa distribuição de forma transparente para a aplicação, é necessário um componente de roteamento customizado.

1. Definição da Anotação de Roteamento

O ponto de partida é uma anotação personalizada que sinalizará os métodos onde a lógica de roteamento de banco de dados e/ou tabela deve ser aplicada. Esta anotação será utilizada por um aspecto (AOP) para interceptar as chamadas e determinar o destino correto da operação.

import java.lang.annotation.*;

/**
 * Anotação para indicar que um método requer roteamento de banco de dados.
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DatabaseRoute {
    /**
     * Chave usada para calcular o roteamento do banco de dados e da tabela.
     * Pode ser um nome de campo no objeto de argumento do método.
     * @return A chave de roteamento.
     */
    String routingKey() default "";
}

Esta anotação será aplicada a métodos de interfaces de repositório (DAO/Mapper), por exemplo:

import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface IUserRepository {
    @DatabaseRoute(routingKey = "userId")
    User findUserById(User request);

    @DatabaseRoute(routingKey = "userId")
    void registerUser(User request);
}

2. Configuração e Inicialização de Fontes de Dados Dinâmicas

Para suportar o roteamento dinâmico, precisamos de uma forma de gerenciar múltiplas fontes de dados (data sources) e alternar entre elas em tempo de execução. O Spring Framework oferece o AbstractRoutingDataSource para este propósito. A configuração das fontes de dados pode ser lida a partir de um arquivo de propriedades.

2.1. Propriedades de Configuração do Sharding

Definimos uma classe para mapear as propriedades de configuração do nosso roteador de sharding, usando @ConfigurationProperties.

import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.List;
import java.util.Map;

@ConfigurationProperties(prefix = "app.sharding")
public class ShardingConfigProperties {
    private int databaseCount;
    private int tableCount;
    private String defaultDataSource;
    private List<String> dataSourceNames;
    private Map<String, Map<String, String>> dataSourceConfigs;

    // Getters e Setters
    public int getDatabaseCount() { return databaseCount; }
    public void setDatabaseCount(int databaseCount) { this.databaseCount = databaseCount; }
    public int getTableCount() { return tableCount; }
    public void setTableCount(int tableCount) { this.tableCount = tableCount; }
    public String getDefaultDataSource() { return defaultDataSource; }
    public void setDefaultDataSource(String defaultDataSource) { this.defaultDataSource = defaultDataSource; }
    public List<String> getDataSourceNames() { return dataSourceNames; }
    public void setDataSourceNames(List<String> dataSourceNames) { this.dataSourceNames = dataSourceNames; }
    public Map<String, Map<String, String>> getDataSourceConfigs() { return dataSourceConfigs; }
    public void setDataSourceConfigs(Map<String, Map<String, String>> dataSourceConfigs) { this.dataSourceConfigs = dataSourceConfigs; }
}

Exemplo de arquivo application.properties:

app.sharding.databaseCount=2
app.sharding.tableCount=4
app.sharding.defaultDataSource=master
app.sharding.dataSourceNames=master,shard01,shard02

app.sharding.dataSourceConfigs.master.url=jdbc:mysql://localhost:3306/db_master?useUnicode=true&characterEncoding=utf8
app.sharding.dataSourceConfigs.master.username=root
app.sharding.dataSourceConfigs.master.password=root

app.sharding.dataSourceConfigs.shard01.url=jdbc:mysql://localhost:3307/db_shard01?useUnicode=true&characterEncoding=utf8
app.sharding.dataSourceConfigs.shard01.username=root
app.sharding.dataSourceConfigs.shard01.password=root

app.sharding.dataSourceConfigs.shard02.url=jdbc:mysql://localhost:3308/db_shard02?useUnicode=true&characterEncoding=utf8
app.sharding.dataSourceConfigs.shard02.username=root
app.sharding.dataSourceConfigs.shard02.password=root

2.2. Implementação da Fonte de Dados Dinâmica

Cria-se uma subclasse de AbstractRoutingDataSource para alternar entre as fontes de dados registradas, baseando-se em uma chave armazenada no ThreadLocal.

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return RoutingContext.getCurrentDatabaseKey();
    }
}

2.3. Contexto de Roteamento (ThreadLocal)

Um utilitário simples baseado em ThreadLocal para armazenar as chaves de banco de dados e tabela durante a execução de uma thread. Isso garante que cada requisição tenha seu próprio contexto de roteamento.

public class RoutingContext {
    private static final ThreadLocal<String> DATABASE_KEY_HOLDER = new ThreadLocal<>();
    private static final ThreadLocal<String> TABLE_KEY_HOLDER = new ThreadLocal<>();

    public static void setDatabaseKey(String dbKey) {
        DATABASE_KEY_HOLDER.set(dbKey);
    }

    public static String getCurrentDatabaseKey() {
        return DATABASE_KEY_HOLDER.get();
    }

    public static void clearDatabaseKey() {
        DATABASE_KEY_HOLDER.remove();
    }

    public static void setTableKey(String tbKey) {
        TABLE_KEY_HOLDER.set(tbKey);
    }

    public static String getCurrentTableKey() {
        return TABLE_KEY_HOLDER.get();
    }

    public static void clearTableKey() {
        TABLE_KEY_HOLDER.remove();
    }
}

2.4. Configuração do Bean da Fonte de Dados

Um @Configuration para instanciar e configurar o DynamicRoutingDataSource, carregando as configurações do ShardingConfigProperties.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DataSourceConfig {

    private final ShardingConfigProperties shardingConfig;

    public DataSourceConfig(ShardingConfigProperties shardingConfig) {
        this.shardingConfig = shardingConfig;
    }

    @Bean
    public DataSource dynamicDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        DataSource defaultSource = null;

        for (String dbName : shardingConfig.getDataSourceNames()) {
            Map<String, String> props = shardingConfig.getDataSourceConfigs().get(dbName);
            if (props == null) {
                throw new IllegalStateException("Configurações não encontradas para a fonte de dados: " + dbName);
            }
            DriverManagerDataSource dataSource = new DriverManagerDataSource();
            dataSource.setUrl(props.get("url"));
            dataSource.setUsername(props.get("username"));
            dataSource.setPassword(props.get("password"));
            targetDataSources.put(dbName, dataSource);

            if (dbName.equals(shardingConfig.getDefaultDataSource())) {
                defaultSource = dataSource;
            }
        }

        if (defaultSource == null) {
            throw new IllegalStateException("Fonte de dados padrão não configurada!");
        }

        DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource();
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(defaultSource);
        return routingDataSource;
    }
}

3. Interceptação AOP para Lógica de Roteamento

Um aspecto Spring AOP interceptará métodos anotados com @DatabaseRoute. Ele extrairá a chave de roteamento, calculará o índice do banco de dados e da tabela e definirá esses valores no RoutingContext antes que o método original seja executado. Após a execução, ele limpará o contexto.

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Method;

@Aspect
@Component
public class ShardingRoutingAspect {

    private final Logger logger = LoggerFactory.getLogger(ShardingRoutingAspect.class);
    private final ShardingConfigProperties shardingConfig;

    public ShardingRoutingAspect(ShardingConfigProperties shardingConfig) {
        this.shardingConfig = shardingConfig;
    }

    @Pointcut("@annotation(com.example.sharding.annotation.DatabaseRoute)")
    public void databaseRoutePointcut() {}

    @Around("databaseRoutePointcut() && @annotation(databaseRoute)")
    public Object applyRouting(ProceedingJoinPoint joinPoint, DatabaseRoute databaseRoute) throws Throwable {
        String keyAttribute = databaseRoute.routingKey();
        if (StringUtils.isEmpty(keyAttribute)) {
            throw new IllegalArgumentException("A chave de roteamento (routingKey) não pode ser vazia na anotação @DatabaseRoute.");
        }

        String routingValue = extractRoutingValue(keyAttribute, joinPoint.getArgs());
        if (StringUtils.isEmpty(routingValue)) {
            throw new IllegalArgumentException("O valor da chave de roteamento não foi encontrado nos argumentos do método.");
        }

        // Cálculo do hash e índices
        int totalShards = shardingConfig.getDatabaseCount() * shardingConfig.getTableCount();
        int hashCode = routingValue.hashCode();
        // Função de perturbação para melhor distribuição do hash
        int shardingIndex = (totalShards - 1) & (hashCode ^ (hashCode >>> 16));

        // Calcular índices de DB e Tabela
        int dbIndex = shardingIndex / shardingConfig.getTableCount() + 1; // DBs são indexados a partir de 1
        int tableIndex = shardingIndex % shardingConfig.getTableCount();  // Tabelas são indexadas a partir de 0

        // Formata os índices para uso (ex: "shard01", "user_03")
        String formattedDbKey = String.format("shard%02d", dbIndex);
        String formattedTableKey = String.format("tbl%02d", tableIndex);

        RoutingContext.setDatabaseKey(formattedDbKey);
        RoutingContext.setTableKey(formattedTableKey);
        logger.info("Roteamento aplicado: Método: {} -> DB: {} Tabela: {}", getMethodName(joinPoint), formattedDbKey, formattedTableKey);

        try {
            return joinPoint.proceed();
        } finally {
            RoutingContext.clearDatabaseKey();
            RoutingContext.clearTableKey();
        }
    }

    private String extractRoutingValue(String keyAttribute, Object[] args) throws IllegalAccessException, NoSuchFieldException {
        if (args == null || args.length == 0) {
            return null;
        }
        // Tenta encontrar a chave no primeiro argumento, assumindo que é um objeto complexo
        Object arg = args[0];
        try {
            Field field = arg.getClass().getDeclaredField(keyAttribute);
            field.setAccessible(true);
            Object value = field.get(arg);
            return value != null ? value.toString() : null;
        } catch (NoSuchFieldException e) {
            // Se a chave não for um campo, tenta tratar o próprio argumento como a chave
            // Essa lógica pode ser expandida para lidar com diferentes tipos de argumentos
            return arg.toString(); // Retorna o toString() do argumento como valor de roteamento
        }
    }

    private String getMethodName(ProceedingJoinPoint joinPoint) {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        return method.getName();
    }
}

4. Interceptor MyBatis para Sharding de Tabelas

Para o sharding de tabelas, o nome da tabela no SQL gerado precisa ser modificado. Um interceptor MyBatis pode fazer isso, detectando um padrão de nome de tabela e anexando o sufixo da tabela obtido do RoutingContext.

4.1. Anotação para Habilitar Sharding de Tabela

Esta anotação opcional pode ser usada em interfaces de Mapper para indicar se o sharding de tabela deve ser aplicado aos métodos dessa interface.

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface EnableTableSharding {
    boolean value() default true;
}

4.2. Implementação do Interceptor MyBatis

import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
import org.apache.ibatis.session.Configuration; // Import Configuration
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds; // Import RowBounds
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.Statement;
import java.lang.reflect.Field;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Intercepts({
    @Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
    // Outras assinaturas podem ser adicionadas conforme necessário, mas "prepare" é o mais comum para modificar SQL.
})
public class MyBatisTableShardingInterceptor implements Interceptor {

    private final Logger logger = LoggerFactory.getLogger(MyBatisTableShardingInterceptor.class);
    // Padrão para encontrar nomes de tabela após FROM, INTO, UPDATE, JOIN
    private static final Pattern TABLE_NAME_PATTERN = Pattern.compile(
            "\\b(FROM|INTO|UPDATE|JOIN)\\s+(`?)([a-zA-Z0-9_]+)(`?)\\b", Pattern.CASE_INSENSITIVE
    );

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        StatementHandler handler = (StatementHandler) invocation.getTarget();
        MetaObject metaObject = MetaObject.forObject(handler,
                new DefaultObjectFactory(), new DefaultObjectWrapperFactory(), new DefaultReflectorFactory());

        MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
        String mapperMethodId = mappedStatement.getId();
        String className = mapperMethodId.substring(0, mapperMethodId.lastIndexOf("."));

        // Verifica se a anotação @EnableTableSharding está presente na classe do Mapper
        Class<?> mapperClass = Class.forName(className);
        EnableTableSharding shardingAnnotation = mapperClass.getAnnotation(EnableTableSharding.class);

        // Se a anotação não estiver presente ou estiver desabilitada, prossegue sem modificação
        if (shardingAnnotation == null || !shardingAnnotation.value()) {
            return invocation.proceed();
        }

        String tableSuffix = RoutingContext.getCurrentTableKey();
        if (tableSuffix == null || tableSuffix.isEmpty()) {
            logger.warn("Contexto de tabela não encontrado para sharding de tabela. SQL não será modificado para: {}", mapperMethodId);
            return invocation.proceed();
        }

        BoundSql boundSql = handler.getBoundSql();
        String originalSql = boundSql.getSql();
        String modifiedSql = originalSql;

        Matcher matcher = TABLE_NAME_PATTERN.matcher(originalSql);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            // group(3) é o nome da tabela, e groups 2 e 4 são as crases (backticks) se existirem
            matcher.appendReplacement(sb, matcher.group(1) + " " + matcher.group(2) + matcher.group(3) + "_" + tableSuffix + matcher.group(4));
        }
        matcher.appendTail(sb);
        modifiedSql = sb.toString();

        if (!originalSql.equals(modifiedSql)) {
            logger.debug("SQL Original: {}", originalSql);
            logger.debug("SQL Modificado: {}", modifiedSql);
            // Atualiza o SQL via reflexão
            Field sqlField = BoundSql.class.getDeclaredField("sql");
            sqlField.setAccessible(true);
            sqlField.set(boundSql, modifiedSql);
            sqlField.setAccessible(false);
        }

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object target) {
        return Plugin.wrap(target, this);
    }

    @Override
    public void setProperties(Properties properties) {
        // Não há propriedades específicas para configurar neste interceptor
    }
}

Para ativar este interceptor, adicione-o à configuração do MyBatis em application.properties ou application.yml:

mybatis.configuration.interceptors[0]=com.example.sharding.interceptor.MyBatisTableShardingInterceptor

5. Exemplo de Uso de Repositório

Com todas as peças no lugar, um repositório pode ser configurado para usar o roteamento de banco de dados e sharding de tabelas.

import org.apache.ibatis.annotations.Mapper;
import com.example.sharding.annotation.DatabaseRoute;
import com.example.sharding.annotation.EnableTableSharding;
import java.util.List;

@Mapper
@EnableTableSharding(value = true) // Habilita o sharding de tabela para esta interface
public interface IProductRepository {

    @DatabaseRoute(routingKey = "productId")
    void insertProduct(Product product);

    @DatabaseRoute(routingKey = "productId")
    Product findProductById(String productId);

    // Se um método não especifica routingKey, a lógica de extração no aspecto
    // tentará inferir o valor ou lançará uma exceção se não conseguir.
    @DatabaseRoute(routingKey = "ownerId")
    List<Product> findProductsByOwner(String ownerId);
}

Tags: Spring AOP MyBatis Sharding Database Routing Dynamic Data Source

Publicado em 6-5 03:02 por Thomas