Utilizando Operações UPSERT com SQLAlchemy

Tanto o SQLite quanto o PostgreSQL oferecem suporte à operação UPSERT, que significa "inserir ou atualizar". Essa funcionalidade é ativada quando uma coluna com restrição de unicidade entra em conflito.

Sintaxe e Comportamento

A sintaxe básica para UPSERT difere ligeiramente entre PostgreSQL e SQLite:

  • PostgreSQL: INSERT ... ON CONFLICT (coluna) DO UPDATE/NOTHING
  • SQLite: INSERT ... ON CONFLICT(coluna) DO UPDATE/NOTHING (note a ausência de espaço antes dos parênteses)

Comparativo de Cenários

Cenário PostgreSQL SQLite Observação
UPSERT Básico ON CONFLICT (col) DO UPDATE SET ... ON CONFLICT(col) DO UPDATE SET ... Diferença sutil na sintaxe dos parênteses.
Ignorar Conflito ON CONFLICT (col) DO NOTHING ON CONFLICT(col) DO NOTHING Sintaxe idêntica.
Referenciar Novos Valores EXCLUDED.col excluded.col Diferença de capitalização; PostgreSQL usa maiúsculas, SQLite minúsculas.
Retornar Dados RETURNING * RETURNING * Funcionalidade idêntica.
Atualização Condicional WHERE condição Não suportado diretamente. SQLite requer o uso de expressões CASE ou filtragem na aplicação.

Pontos de Atenção

  • A coluna de conflito deve possuir uma restrição de unicidade definida.
  • Embora as sintaxes sejam semelhantes, as pequenas diferenças entre PostgreSQL e SQLite exigem cuidado ao usar SQL nativo.
  • O SQLite não suporta a cláusula WHERE em operações UPSERT; use CASE ou lógica de aplicação para simular essa funcionalidade.
  • A cláusula RETURNING é suportada no SQLite a partir da versão 3.35.

Uso de EXCLUDED e RETURNING

EXCLUDED

A pseudo-tabela EXCLUDED permite referenciar os valores que seriam inseridos em caso de conflito.


INSERT INTO usuarios (email, nome, idade)
VALUES ('teste@exemplo.com', 'Novo Nome', 30)
ON CONFLICT (email) DO UPDATE SET
    nome = EXCLUDED.nome,   -- Refere-se ao novo valor 'Novo Nome'
    idade = EXCLUDED.idade; -- Refere-se ao novo valor 30
Cenário Expressão Significado Valor Exemplo
Campo da Tabela Original usuarios.nome Valor atual da linha em conflito. 'Nome Antigo'
Campo do Novo Valor EXCLUDED.nome Novo valor pretendido para inserção. 'Novo Nome'
Cálculo Combinado usuarios.idade + EXCLUDED.idade Combinação do valor original com o novo. 25 + 30 = 55

Exemplo 1: Acumulando Estoque


-- Acumula estoque de produto: Estoque atual 100 + Nova quantidade 50 = 150
INSERT INTO produtos (sku, estoque)
VALUES ('IPHONE15', 50)
ON CONFLICT (sku) DO UPDATE SET
    estoque = produtos.estoque + EXCLUDED.estoque  -- Estoque atual + novo valor
RETURNING estoque;

Exemplo 2: Atualizando Apenas Campos Não Nulos


-- Se o novo valor for NULL, mantém o valor original
INSERT INTO usuarios (email, nome, idade)
VALUES ('teste@exemplo.com', 'Novo Nome', NULL)
ON CONFLICT (email) DO UPDATE SET
    nome = COALESCE(EXCLUDED.nome, usuarios.nome),  -- Mantém 'Novo Nome'
    idade = COALESCE(EXCLUDED.idade, usuarios.idade); -- Mantém a idade original

Exemplo 3: Atualizando Timestamp


-- Atualiza o campo `atualizado_em` durante a atualização
INSERT INTO usuarios (email, nome)
VALUES ('teste@exemplo.com', 'Novo Nome')
ON CONFLICT (email) DO UPDATE SET
    nome = EXCLUDED.nome,
    atualizado_em = NOW()  -- PostgreSQL
    -- atualizado_em = CURRENT_TIMESTAMP  -- SQLite

RETURNING

A cláusula RETURNING permite obter os dados da linha afetada (inserida ou atualizada) sem a necessidade de uma consulta SELECT adicional.


INSERT INTO usuarios (email, nome)
VALUES ('teste@exemplo.com', 'Fulano')
RETURNING id, email, nome, criado_em;

Exemplo 1: Obtendo o ID Pós-Inserção


-- PostgreSQL / SQLite 3.35+
sql = text("""
    INSERT INTO usuarios (email, nome)
    VALUES (:email, :nome)
    RETURNING id, email, criado_em
""")

resultado = await sessao.execute(sql, {"email": "teste@exemplo.com", "nome": "Fulano"})
usuario = resultado.mappings().first()
print(usuario["id"])  -- Acessa o ID diretamente

Exemplo 2: Retorno Unificado de UPSERT


-- Retorna o estado final, seja por inserção ou atualização
INSERT INTO usuarios (email, nome, contador_login)
VALUES ('teste@exemplo.com', 'Fulano', 1)
ON CONFLICT (email) DO UPDATE SET
    nome = EXCLUDED.nome,
    contador_login = usuarios.contador_login + 1  -- Incrementa contador
RETURNING
    id,
    email,
    nome,
    contador_login,
    CASE
        WHEN xmax = 0 THEN 'inserido'  -- PostgreSQL: xmax=0 indica inserção
        ELSE 'atualizado'
    END AS acao

Exemplo 3: Retorno em Operações em Lote


-- PostgreSQL suporta RETURNING em operações em lote
INSERT INTO usuarios (email, nome)
VALUES
    ('a@exemplo.com', 'A'),
    ('b@exemplo.com', 'B')
ON CONFLICT (email) DO UPDATE SET
    nome = EXCLUDED.nome
RETURNING id, email, nome;

Processamento em Python para retornos em lote:


resultado = await sessao.execute(sql)
usuarios = [dict(linha) for linha in resultado.mappings().all()]
# [{'id': 1, 'email': 'a@exemplo.com', 'nome': 'A'}, ...]

Exemplo: Contador de Login de Usuário


async def registrar_login_usuario(sessao: AsyncSession, email: str, nome: str) -> dict:
    """
    Registra o login de um usuário:
    - Novo usuário: Insere com contador_login = 1.
    - Usuário existente: Atualiza contador_login e last_login.
    - Retorna o estado final e o tipo de ação (inserido/atualizado).
    """
    sql = text("""
        INSERT INTO usuarios (
            email, nome, contador_login, ultimo_login, criado_em
        ) VALUES (
            :email, :nome, 1, :agora, :agora
        )
        ON CONFLICT (email) DO UPDATE SET
            nome = EXCLUDED.nome,                          -- Atualiza nome
            contador_login = usuarios.contador_login + 1,  -- Incrementa contador
            ultimo_login = EXCLUDED.ultimo_login           -- Atualiza último login
        RETURNING
            id,
            email,
            nome,
            contador_login,
            ultimo_login,
            criado_em,
            CASE
                WHEN xmax = 0 THEN 'inserido'
                ELSE 'atualizado'
            END AS acao  -- Específico do PostgreSQL para distinguir ações
    """)

    agora = datetime.utcnow()
    resultado = await sessao.execute(
        sql,
        {"email": email, "nome": nome, "agora": agora}
    )

    linha = resultado.mappings().first()
    return dict(linha) if linha else None

# Uso:
usuario_status = await registrar_login_usuario(sessao, "teste@exemplo.com", "Fulano")
print(f"{usuario_status['acao']} usuário {usuario_status['email']} com {usuario_status['contador_login']} logins")
# Saída potencial: inserido usuário teste@exemplo.com com 1 logins
# Ou: atualizado usuário teste@exemplo.com com 5 logins

Modelos de Exemplo


from sqlalchemy import Column, Integer, String, UniqueConstraint
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class Usuario(Base):
    __tablename__ = "usuarios"

    id = Column(Integer, primary_key=True, autoincrement=True)
    email = Column(String(100), unique=True, nullable=False)
    nome = Column(String(50))
    idade = Column(Integer)
    saldo = Column(Integer, default=0)

    __table_args__ = (
        UniqueConstraint("email", name="uq_usuarios_email"),
    )

class Produto(Base):
    __tablename__ = "produtos"

    id = Column(Integer, primary_key=True)
    sku = Column(String(50), unique=True, nullable=False)
    nome = Column(String(100))
    estoque = Column(Integer, default=0)
    preco = Column(Integer)

Abordagem ORM com SQLAlchemy

Importe a função insert de acordo com o dialeto específico ou use a função genérica.

Exemplo Básico


from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy import insert, func # func para NOW() ou CURRENT_TIMESTAMP

async def upsert_usuario_orm(sessao: AsyncSession, dados_usuario: dict) -> dict:
    """
    Realiza UPSERT de usuário usando o ORM do SQLAlchemy.
    Atualiza se o email já existir, caso contrário, insere.
    """

    # Abordagem 1: Usando a função genérica insert (recomendado)
    # SQLAlchemy seleciona a sintaxe correta automaticamente com base no dialeto do banco.
    stmt = (
        insert(Usuario)
        .values(**dados_usuario)
        .on_conflict_do_update(
            index_elements=["email"],  # Coluna(s) para detecção de conflito (unicidade)
            set_={
                "nome": dados_usuario["nome"],
                "idade": dados_usuario.get("idade"),
                # Assume-se que há uma coluna 'atualizado_em'
                "atualizado_em": func.now()
            }
        )
        .returning(Usuario)  # Retorna a linha após inserção/atualização
    )

    resultado = await sessao.execute(stmt)
    usuario = resultado.scalar_one() # Pega a única linha retornada

    return {
        "id": usuario.id,
        "email": usuario.email,
        "nome": usuario.nome,
        "idade": usuario.idade
    }

async def upsert_usuario_ignorar(sessao: AsyncSession, dados_usuario: dict) -> bool:
    """
    Realiza UPSERT, mas ignora a linha em caso de conflito (DO NOTHING).
    """
    stmt = (
        insert(Usuario)
        .values(**dados_usuario)
        .on_conflict_do_nothing(
            index_elements=["email"]
        )
    )

    resultado = await sessao.execute(stmt)
    # Retorna True se uma linha foi inserida, False se houve conflito e nada foi feito.
    return resultado.rowcount > 0

Atualização Condicional com ORM

Permite atualizar campos específicos apenas sob certas condições.


async def upsert_usuario_condicional(sessao: AsyncSession, dados_usuario: dict) -> dict:
    """
    UPSERT: Atualiza apenas campos não nulos em caso de conflito.
    """
    stmt = (
        insert(Usuario)
        .values(**dados_usuario)
        .on_conflict_do_update(
            index_elements=["email"],
            set_={
                "nome": dados_usuario["nome"],
                # Se 'idade' não for fornecido, mantém o valor original da coluna Usuario.idade
                "idade": dados_usuario.get("idade", Usuario.idade),
            },
            # Opcional: adicionar condição WHERE específica para a atualização
            where=Usuario.email == dados_usuario["email"]
        )
        .returning(Usuario)
    )

    resultado = await sessao.execute(stmt)
    # Retorna um dicionário da linha afetada
    return resultado.mappings().first()

UPSERT em Lote com ORM


async def upsert_em_lote_usuarios(sessao: AsyncSession, usuarios: list[dict]) -> int:
    """
    Executa UPSERT em lote para múltiplos usuários.
    """
    stmt = (
        insert(Usuario)
        .values(usuarios) # Passa a lista de dicionários diretamente
        .on_conflict_do_update(
            index_elements=["email"],
            set_={
                # Usa a sintaxe de referência a EXCLUDED dentro do ORM
                "nome": insert(Usuario).excluded.nome,
                "idade": insert(Usuario).excluded.idade,
            }
        )
    )

    resultado = await sessao.execute(stmt)
    # Retorna o número de linhas afetadas (inseridas ou atualizadas)
    return resultado.rowcount

Utilizando EXCLUDED no ORM


async def upsert_produto_com_estoque(sessao: AsyncSession, dados_produto: dict) -> dict:
    """
    UPSERT de produto: em caso de conflito no SKU, acumula o estoque.
    """
    stmt = (
        insert(Produto)
        .values(**dados_produto)
        .on_conflict_do_update(
            index_elements=["sku"],
            set_={
                # Acumula estoque: Estoque atual + Estoque novo
                "estoque": Produto.estoque + insert(Produto).excluded.estoque,
                # Atualiza outros campos
                "nome": insert(Produto).excluded.nome,
                "preco": insert(Produto).excluded.preco,
            }
        )
        .returning(Produto) # Retorna os dados do produto após a operação
    )

    resultado = await sessao.execute(stmt)
    # Retorna um dicionário dos dados do produto
    return resultado.mappings().first()

Serviço de Usuário com UPSERT


class ServicoUsuario:
    """Serviço para gerenciar operações de usuário, incluindo UPSERT."""

    def __init__(self, sessao: AsyncSession):
        self.sessao = sessao

    async def criar_ou_atualizar(self, email: str, nome: str, idade: int | None = None) -> dict:
        """Cria um novo usuário ou atualiza um existente."""
        stmt = (
            insert(Usuario)
            .values(
                email=email,
                nome=nome,
                idade=idade,
                criado_em=datetime.utcnow() # Define data de criação
            )
            .on_conflict_do_update(
                index_elements=["email"], # Chave de conflito
                set_={
                    "nome": nome, # Atualiza nome
                    "idade": idade, # Atualiza idade
                    "atualizado_em": datetime.utcnow() # Define data de atualização
                }
            )
            .returning(Usuario) # Retorna a linha resultante
        )

        resultado = await self.sessao.execute(stmt)
        usuario = resultado.scalar_one() # Obtém a instância do usuário

        return {
            "id": usuario.id,
            "email": usuario.email,
            "nome": usuario.nome,
            "idade": usuario.idade
        }

    async def criar_ou_atualizar_em_lote(self, usuarios_data: list[dict]) -> int:
        """Realiza criação ou atualização em lote."""
        stmt = (
            insert(Usuario)
            .values(usuarios_data) # Insere múltiplos registros
            .on_conflict_do_update(
                index_elements=["email"], # Coluna de conflito
                set_={
                    "nome": insert(Usuario).excluded.nome, # Usa valor do registro novo
                    "idade": insert(Usuario).excluded.idade, # Usa valor do registro novo
                    "atualizado_em": datetime.utcnow() # Define data de atualização
                }
            )
        )

        resultado = await self.sessao.execute(stmt)
        return resultado.rowcount # Retorna número de linhas afetadas

    async def criar_se_nao_existe(self, email: str, nome: str) -> bool:
        """Tenta criar um usuário apenas se o email não existir."""
        stmt = (
            insert(Usuario)
            .values(
                email=email,
                nome=nome,
                criado_em=datetime.utcnow()
            )
            .on_conflict_do_nothing(
                index_elements=["email"] # Ignora se o email já existir
            )
        )

        resultado = await self.sessao.execute(stmt)
        # Retorna True se a inserção ocorreu, False caso contrário
        return resultado.rowcount > 0

SQL Nativo

Exemplo Básico (PostgreSQL e SQLite)


async def upsert_usuario_pg(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
    """
    Executa operação UPSERT nativa no PostgreSQL.
    """
    sql_pg = text("""
        INSERT INTO usuarios (email, nome, idade, criado_em)
        VALUES (:email, :nome, :idade, :criado_em)
        ON CONFLICT (email) DO UPDATE
        SET
            nome = EXCLUDED.nome,      -- Usa o valor novo em caso de conflito
            idade = EXCLUDED.idade,
            atualizado_em = NOW()      -- Função NOW() do PostgreSQL
        RETURNING id, email, nome, idade
    """)

    resultado = await sessao.execute(
        sql_pg,
        {
            "email": dados_usuario["email"],
            "nome": dados_usuario["nome"],
            "idade": dados_usuario.get("idade"),
            "criado_em": datetime.utcnow()
        }
    )

    linha = resultado.mappings().first()
    return dict(linha) if linha else None

async def upsert_usuario_sqlite(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
    """
    Executa operação UPSERT nativa no SQLite.
    A sintaxe é muito similar à do PostgreSQL.
    """
    sql_sqlite = text("""
        INSERT INTO usuarios (email, nome, idade, criado_em)
        VALUES (:email, :nome, :idade, :criado_em)
        ON CONFLICT(email) DO UPDATE SET -- Nota: sem espaço antes dos parênteses
            nome = excluded.nome,        -- Usa 'excluded' em minúsculas no SQLite
            idade = excluded.idade,
            atualizado_em = CURRENT_TIMESTAMP -- Função CURRENT_TIMESTAMP do SQLite
        RETURNING id, email, nome, idade
    """)

    resultado = await sessao.execute(
        sql_sqlite,
        {
            "email": dados_usuario["email"],
            "nome": dados_usuario["nome"],
            "idade": dados_usuario.get("idade"),
            "criado_em": datetime.utcnow()
        }
    )

    linha = resultado.mappings().first()
    return dict(linha) if linha else None

Ignorando Conflitos com SQL Nativo


async def inserir_ou_ignorar_usuario(sessao: AsyncSession, dados_usuario: dict) -> bool:
    """
    Tenta inserir um usuário; se o email já existir, ignora a operação.
    """
    # SQL para PostgreSQL (funciona de forma similar no SQLite)
    sql = text("""
        INSERT INTO usuarios (email, nome, idade, criado_em)
        VALUES (:email, :nome, :idade, :criado_em)
        ON CONFLICT (email) DO NOTHING
    """)

    resultado = await sessao.execute(
        sql,
        {
            "email": dados_usuario["email"],
            "nome": dados_usuario["nome"],
            "idade": dados_usuario.get("idade"),
            "criado_em": datetime.utcnow()
        }
    )

    # Retorna True se uma linha foi inserida, False se houve conflito
    return resultado.rowcount > 0

UPSERT em Lote com SQL Nativo


async def upsert_em_lote_produtos(sessao: AsyncSession, produtos: list[dict]) -> int:
    """
    Executa UPSERT em lote para produtos usando SQL nativo.
    """
    # SQL para PostgreSQL
    sql_pg = text("""
        INSERT INTO produtos (sku, nome, estoque, preco, criado_em)
        VALUES (
            :sku, :nome, :estoque, :preco, :criado_em
        )
        ON CONFLICT (sku) DO UPDATE SET
            nome = EXCLUDED.nome,
            estoque = produtos.estoque + EXCLUDED.estoque,  -- Acumula estoque
            preco = EXCLUDED.preco,
            atualizado_em = NOW()
    """)

    # Para SQLite, a sintaxe seria similar, usando `excluded` e `CURRENT_TIMESTAMP`.

    # Execução em lote (iterativa para demonstração)
    for prod_data in produtos:
        await sessao.execute(
            sql_pg,
            {
                "sku": prod_data["sku"],
                "nome": prod_data["nome"],
                "estoque": prod_data.get("estoque", 0),
                "preco": prod_data.get("preco", 0),
                "criado_em": datetime.utcnow()
            }
        )

    return len(produtos) # Retorna o número de produtos processados

Atualização Parcial e Condicional com SQL Nativo


async def upsert_usuario_inteligente(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
    """
    UPSERT 'inteligente':
    - Atualiza 'idade' apenas se fornecida.
    - Atualiza 'nome' apenas se fornecido.
    - Atualiza 'atualizado_em'.
    """
    sql = text("""
        INSERT INTO usuarios (email, nome, idade, criado_em)
        VALUES (:email, :nome, :idade, :criado_em)
        ON CONFLICT (email) DO UPDATE SET
            -- COALESCE mantém o valor original se o novo valor for NULL
            nome = COALESCE(:nome, usuarios.nome),
            idade = COALESCE(:idade, usuarios.idade),
            atualizado_em = NOW() -- Ou CURRENT_TIMESTAMP para SQLite
        RETURNING id, email, nome, idade, atualizado_em
    """)

    resultado = await sessao.execute(
        sql,
        {
            "email": dados_usuario["email"],
            # Passa os valores, que podem ser None, para COALESCE funcionar
            "nome": dados_usuario.get("nome"),
            "idade": dados_usuario.get("idade"),
            "criado_em": datetime.utcnow()
        }
    )

    linha = resultado.mappings().first()
    return dict(linha) if linha else None

Registro/Login de Usuário: Atualiza Último Login


async def registrar_ou_logar(sessao: AsyncSession, email: str, nome: str) -> dict:
    """
    Registra um novo usuário ou atualiza o último login de um existente.
    """
    sql = text("""
        INSERT INTO usuarios (email, nome, ultimo_login, criado_em)
        VALUES (:email, :nome, :agora, :agora)
        ON CONFLICT (email) DO UPDATE SET
            ultimo_login = EXCLUDED.ultimo_login, -- Atualiza o timestamp do último login
            nome = EXCLUDED.nome                  -- Opcional: atualiza o nome também
        RETURNING id, email, nome, ultimo_login, criado_em
    """)

    agora = datetime.utcnow()
    resultado = await sessao.execute(
        sql,
        {"email": email, "nome": nome, "agora": agora}
    )

    return dict(resultado.mappings().first())

Acúmulo de Estoque de Produtos


async def adicionar_estoque_produto(sessao: AsyncSession, sku: str, quantidade: int) -> bool:
    """
    Adiciona estoque a um produto. Se o produto não existe, insere-o com a quantidade especificada.
    """
    sql = text("""
        INSERT INTO produtos (sku, estoque, criado_em)
        VALUES (:sku, :quantidade, :agora)
        ON CONFLICT (sku) DO UPDATE SET
            estoque = produtos.estoque + EXCLUDED.estoque, -- Acumula a quantidade
            atualizado_em = NOW() -- Ou CURRENT_TIMESTAMP
    """)

    resultado = await sessao.execute(
        sql,
        {
            "sku": sku,
            "quantidade": quantidade, # O valor de 'quantidade' é usado como 'estoque' na inserção e como 'EXCLUDED.estoque' na atualização
            "agora": datetime.utcnow()
        }
    )

    return resultado.rowcount > 0 # Retorna True se a linha foi inserida/atualizada

Acúmulo de Pontos de Usuário


async def adicionar_pontos_usuario(sessao: AsyncSession, user_id: int, pontos: int) -> dict | None:
    """
    Acumula pontos de um usuário em uma tabela separada `user_points`.
    """
    sql = text("""
        INSERT INTO user_points (user_id, points, created_at)
        VALUES (:user_id, :points, :now)
        ON CONFLICT (user_id) DO UPDATE SET
            points = user_points.points + EXCLUDED.points, -- Soma os pontos
            updated_at = NOW() -- Ou CURRENT_TIMESTAMP
        RETURNING user_id, points
    """)

    resultado = await sessao.execute(
        sql,
        {
            "user_id": user_id,
            "points": pontos,
            "now": datetime.utcnow()
        }
    )

    linha = resultado.mappings().first()
    return dict(linha) if linha else None

Contador de Tags


async def incrementar_contador_tag(sessao: AsyncSession, tag_name: str) -> int:
    """
    Incrementa o contador de uma tag. Se a tag não existe, cria com contador 1.
    """
    sql = text("""
        INSERT INTO tags (name, count, created_at)
        VALUES (:name, 1, :now) -- Inicia com contador 1
        ON CONFLICT (name) DO UPDATE SET
            count = tags.count + 1, -- Incrementa o contador existente
            updated_at = NOW() -- Ou CURRENT_TIMESTAMP
        RETURNING count -- Retorna o novo valor do contador
    """)

    resultado = await sessao.execute(
        sql,
        {"name": tag_name, "now": datetime.utcnow()}
    )

    # Retorna o valor do contador ou 0 se algo der errado (embora improvável aqui)
    return resultado.scalar() or 0

Tags: SQLAlchemy postgresql sqlite UPSERT banco de dados

Publicado em 6-14 02:11 por Thomas