Integrando DAMO-YOLO com MySQL: Arquitetura de Armazenamento para Dados Visuais em Larga Escala

Por que armazenar resultados de detecção em banco de dados?

Sistemas de visão computacional baseados em DAMO-YOLO frequentemente geram enormes volumes de resultados de inferência. Limitar-se a exibir detecções em tela ou gravar arquivos JSON dispersos compromete a rastreabilidade histórica, a análise de tendências e a tomada de decisões automatizadas.

Bancos de dados relacionais como o MySQL resovlem esses problemas ao oferecer armazenamento estruturado, consultas eficientes, suporte a transações e integração com sistemas corporatvios. Quando o volume atinge milhões de registros, recursos como índices e particionamento mantêm a performance em níveis aceitáveis.

Modelagem do schema de detecções

O resultado do DAMO-YOLO contempla classe, pontuação de confiança, coordenadas da bounding box e timestamp. A modelagem abaixo amplia esses campos para atender cenários reais de produção:

-- Tabela principal de detecções
CREATE TABLE yolo_inference_log (
    log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
    media_uri VARCHAR(500) NOT NULL COMMENT 'Caminho da imagem ou vídeo',
    media_frame INT DEFAULT 0 COMMENT 'Índice do frame (0 para imagens)',
    inferred_at DATETIME NOT NULL COMMENT 'Momento da inferência',
    
    target_id INT NOT NULL COMMENT 'Identificador da classe',
    target_label VARCHAR(100) NOT NULL COMMENT 'Nome da classe',
    score DECIMAL(5, 4) NOT NULL COMMENT 'Confiança entre 0 e 1',
    
    -- Coordenadas normalizadas (0-1)
    bbox_x DECIMAL(10, 8) NOT NULL COMMENT 'Centro X da bbox',
    bbox_y DECIMAL(10, 8) NOT NULL COMMENT 'Centro Y da bbox',
    bbox_w DECIMAL(10, 8) NOT NULL COMMENT 'Largura da bbox',
    bbox_h DECIMAL(10, 8) NOT NULL COMMENT 'Altura da bbox',
    
    media_w INT NOT NULL COMMENT 'Largura original',
    media_h INT NOT NULL COMMENT 'Altura original',
    
    model_tag VARCHAR(50) COMMENT 'Versão do modelo',
    inference_duration INT COMMENT 'Tempo de inferência em ms',
    
    INDEX idx_time (inferred_at),
    INDEX idx_label (target_label),
    INDEX idx_score (score),
    INDEX idx_media (media_uri(255)),
    SPATIAL INDEX idx_spatial (bbox_x, bbox_y),
    
    created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Log de inferências do DAMO-YOLO';

Pontos-chave da modelagem:

  • Coordenadas normalizadas: garante consistência independente da resolução da mídia.
  • Dimensões originais: permite reconstruir coordenadas em pixels quando necessário.
  • Índice espacial: acelera consultas baseadas em posição dentro do quadro.
  • Versão do modelo: habilita comparações entre versões e testes A/B.

Estratégia de inserção em lote

Para um sistema com múltiplas câmeras gerando milhares de detecções por segundo, a inserção registro a registro torna-se um gargalo. A solução é utilizar inserção em lote (bulk insert), reduzindo o tráfego de rede e a sobrecarga de parsing SQL.

import mysql.connector
from mysql.connector import Error, pooling
from datetime import datetime
from typing import List, Dict

class YoloDbGateway:
    def __init__(self, db_host='localhost', db_name='yolo_db',
                 db_user='root', db_pass='secret'):
        self._pool = pooling.MySQLConnectionPool(
            pool_name="yolo_conn_pool",
            pool_size=8,
            host=db_host,
            database=db_name,
            user=db_user,
            password=db_pass
        )

    def _acquire(self):
        return self._pool.get_connection()

    def bulk_save(self, detections: List[Dict], chunk=200):
        if not detections:
            return
        conn = self._acquire()
        cur = conn.cursor()
        try:
            sql = """
                INSERT INTO yolo_inference_log 
                (media_uri, media_frame, inferred_at, target_id, target_label, 
                 score, bbox_x, bbox_y, bbox_w, bbox_h, 
                 media_w, media_h, model_tag, inference_duration)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            buffer = []
            for det in detections:
                buffer.append((
                    det.get('media_uri', ''),
                    det.get('media_frame', 0),
                    det.get('inferred_at', datetime.now()),
                    det.get('target_id', 0),
                    det.get('target_label', 'unknown'),
                    det.get('score', 0.0),
                    det.get('bbox_x', 0.0),
                    det.get('bbox_y', 0.0),
                    det.get('bbox_w', 0.0),
                    det.get('bbox_h', 0.0),
                    det.get('media_w', 0),
                    det.get('media_h', 0),
                    det.get('model_tag', 'damo-yolo-v1'),
                    det.get('inference_duration', 0)
                ))
                if len(buffer) >= chunk:
                    cur.executemany(sql, buffer)
                    conn.commit()
                    buffer.clear()
            if buffer:
                cur.executemany(sql, buffer)
                conn.commit()
        except Error as exc:
            conn.rollback()
            raise exc
        finally:
            cur.close()
            conn.close()

    def map_and_store(self, media_uri: str, raw_predictions: List[Dict], model_tag='damo-yolo-v1'):
        mapped = []
        for pred in raw_predictions:
            mapped.append({
                'media_uri': media_uri,
                'media_frame': 0,
                'inferred_at': datetime.now(),
                'target_id': pred['class_id'],
                'target_label': pred['class_name'],
                'score': float(pred['conf']),
                'bbox_x': float(pred['bbox'][0]),
                'bbox_y': float(pred['bbox'][1]),
                'bbox_w': float(pred['bbox'][2]),
                'bbox_h': float(pred['bbox'][3]),
                'media_w': pred.get('img_w', 640),
                'media_h': pred.get('img_h', 640),
                'model_tag': model_tag,
                'inference_duration': pred.get('latency_ms', 0)
            })
        self.bulk_save(mapped)

O uso de connection pooling evita a reabertura constante de conexões, enquanto o executemany mniimiza o número de viagens ao servidor. O tamanho do lote deve ser ajustado conforme a carga — valores entre 100 e 1000 costumam oferecer bom equilíbrio entre desempenho e consumo de memória.

Otimizações para alta concorrência

Segregação leitura/escrita

Aplicações de visão computacional frequentemente têm padrões de acesso desbalanceados. A replicação master-slave do MySQL permite direcionar escritas ao master e leituras aos slaves:

class YoloReadWriteGateway:
    def __init__(self, master_cfg: dict, slave_cfg: dict):
        self._writer = mysql.connector.connect(**master_cfg)
        self._reader = mysql.connector.connect(**slave_cfg)

    def insert(self, det: dict):
        cur = self._writer.cursor()
        cur.execute("INSERT INTO yolo_inference_log (...) VALUES (...)", (...,))
        self._writer.commit()
        cur.close()

    def fetch(self, sql: str, params: tuple):
        cur = self._reader.cursor()
        cur.execute(sql, params)
        rows = cur.fetchall()
        cur.close()
        return rows

Particionamento por tempo

Quando a tabela ultrapassa dezenas de milhões de registros, o particionamento por data reduz o escopo das consultas:

CREATE TABLE yolo_inference_partitioned (
    log_id BIGINT AUTO_INCREMENT,
    inferred_at DATETIME NOT NULL,
    -- demais colunas
    PRIMARY KEY (log_id, inferred_at)
)
PARTITION BY RANGE (TO_DAYS(inferred_at)) (
    PARTITION p_2024_01 VALUES LESS THAN (TO_DAYS('2024-02-01')),
    PARTITION p_2024_02 VALUES LESS THAN (TO_DAYS('2024-03-01')),
    PARTITION p_2024_03 VALUES LESS THAN (TO_DAYS('2024-04-01')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

Escrita assíncrona com fila

Para cenários onde a latência de escrita não pode impactar o pipeline de inferência, uma arquitetura com buffer em Redis e consumer em background suaviza picos de carga:

import redis
import json
import threading

class BufferedDbWriter:
    def __init__(self, redis_cfg: dict):
        self._redis = redis.Redis(**redis_cfg)
        self._gateway = YoloDbGateway()
        self._active = True
        threading.Thread(target=self._consume, daemon=True).start()

    def enqueue(self, detection: dict):
        self._redis.lpush('yolo:pending', json.dumps(detection))

    def _consume(self):
        batch = []
        while self._active:
            item = self._redis.brpop('yolo:pending', timeout=1)
            if item:
                batch.append(json.loads(item[1]))
                if len(batch) >= 200:
                    self._gateway.bulk_save(batch)
                    batch.clear()
            elif batch:
                self._gateway.bulk_save(batch)
                batch.clear()

    def shutdown(self):
        self._active = False

Indexação espaço-temporal

Consultas temporais

Índices compostos aceleram filtros que combinam tempo e categoria:

CREATE INDEX idx_time_label ON yolo_inference_log (inferred_at, target_label);

SELECT * FROM yolo_inference_log
WHERE inferred_at >= NOW() - INTERVAL 1 HOUR
  AND target_label = 'person'
ORDER BY inferred_at DESC;

Consultas espaciais

Para agrupar detecções por quadrante da imagem, uma coluna gerada evita cálculo em tempo de execução:

ALTER TABLE yolo_inference_log
ADD COLUMN quadrant ENUM('TL','TR','BL','BR','CC')
GENERATED ALWAYS AS (
    CASE 
        WHEN bbox_x < 0.5 AND bbox_y < 0.5 THEN 'TL'
        WHEN bbox_x >= 0.5 AND bbox_y < 0.5 THEN 'TR'
        WHEN bbox_x < 0.5 AND bbox_y >= 0.5 THEN 'BL'
        WHEN bbox_x >= 0.5 AND bbox_y >= 0.5 THEN 'BR'
        ELSE 'CC'
    END
) STORED;

CREATE INDEX idx_quadrant ON yolo_inference_log (quadrant);

-- Veículos no quadrante superior esquerdo
SELECT * FROM yolo_inference_log
WHERE quadrant = 'TL'
  AND target_label = 'car'
  AND inferred_at >= CURDATE();

Tabelas de agregação

CREATE TABLE yolo_stats_hourly (
    stat_hour DATETIME NOT NULL,
    target_label VARCHAR(100) NOT NULL,
    total_count INT NOT NULL,
    avg_score DECIMAL(5,4),
    PRIMARY KEY (stat_hour, target_label)
);

INSERT INTO yolo_stats_hourly
SELECT 
    DATE_FORMAT(inferred_at, '%Y-%m-%d %H:00:00'),
    target_label,
    COUNT(*),
    AVG(score)
FROM yolo_inference_log
WHERE inferred_at >= NOW() - INTERVAL 1 HOUR
GROUP BY stat_hour, target_label
ON DUPLICATE KEY UPDATE
    total_count = VALUES(total_count),
    avg_score = VALUES(avg_score);

Monitoramento e manutenção

Métricas essenciais

-- Consultas lentas
SHOW VARIABLES LIKE 'long_query_time';
SHOW STATUS LIKE 'Slow_queries';

-- Conexões ativas
SHOW STATUS LIKE 'Threads_connected';
SHOW VARIABLES LIKE 'max_connections';

-- Tamanho das tabelas
SELECT 
    table_name,
    ROUND(((data_length + index_length) / 1024 / 1024), 2) AS size_mb,
    table_rows
FROM information_schema.TABLES
WHERE table_schema = 'yolo_db'
ORDER BY size_mb DESC;

Manutenção programada

-- Reconstruir índices fragmentados
OPTIMIZE TABLE yolo_inference_log;

-- Atualizar estatísticas do otimizador
ANALYZE TABLE yolo_inference_log;

-- Arquivar dados antigos em lotes
INSERT INTO yolo_inference_archive
SELECT * FROM yolo_inference_log
WHERE inferred_at < NOW() - INTERVAL 3 MONTH
LIMIT 10000;

DELETE FROM yolo_inference_log
WHERE inferred_at < NOW() - INTERVAL 3 MONTH
LIMIT 10000;

Backup e validação

# Backup lógico completo
mysqldump -u root -p yolo_db yolo_inference_log > backup_$(date +%Y%m%d).sql

# Backup incremental via binlog
mysqlbinlog --start-datetime="2024-05-01 00:00:00" \
            --stop-datetime="2024-05-02 00:00:00" \
            /var/log/mysql/mysql-bin.000123 > incremental.sql

Backups devem ser restaurados periodicamente em ambientes de teste para confirmar sua integridade. Um backup não validado não pode ser considerado confiável.

Tags: DAMO-YOLO MySQL Bulk Insert Database Partitioning Python

Publicado em 7-1 20:38