Tanto o SQLite quanto o PostgreSQL oferecem suporte à operação UPSERT, que significa "inserir ou atualizar". Essa funcionalidade é ativada quando uma coluna com restrição de unicidade entra em conflito.
Sintaxe e Comportamento
A sintaxe básica para UPSERT difere ligeiramente entre PostgreSQL e SQLite:
- PostgreSQL:
INSERT ... ON CONFLICT (coluna) DO UPDATE/NOTHING - SQLite:
INSERT ... ON CONFLICT(coluna) DO UPDATE/NOTHING(note a ausência de espaço antes dos parênteses)
Comparativo de Cenários
| Cenário | PostgreSQL | SQLite | Observação |
|---|---|---|---|
| UPSERT Básico | ON CONFLICT (col) DO UPDATE SET ... |
ON CONFLICT(col) DO UPDATE SET ... |
Diferença sutil na sintaxe dos parênteses. |
| Ignorar Conflito | ON CONFLICT (col) DO NOTHING |
ON CONFLICT(col) DO NOTHING |
Sintaxe idêntica. |
| Referenciar Novos Valores | EXCLUDED.col |
excluded.col |
Diferença de capitalização; PostgreSQL usa maiúsculas, SQLite minúsculas. |
| Retornar Dados | RETURNING * |
RETURNING * |
Funcionalidade idêntica. |
| Atualização Condicional | WHERE condição |
Não suportado diretamente. | SQLite requer o uso de expressões CASE ou filtragem na aplicação. |
Pontos de Atenção
- A coluna de conflito deve possuir uma restrição de unicidade definida.
- Embora as sintaxes sejam semelhantes, as pequenas diferenças entre PostgreSQL e SQLite exigem cuidado ao usar SQL nativo.
- O SQLite não suporta a cláusula
WHEREem operações UPSERT; useCASEou lógica de aplicação para simular essa funcionalidade. - A cláusula
RETURNINGé suportada no SQLite a partir da versão 3.35.
Uso de EXCLUDED e RETURNING
EXCLUDED
A pseudo-tabela EXCLUDED permite referenciar os valores que seriam inseridos em caso de conflito.
INSERT INTO usuarios (email, nome, idade)
VALUES ('teste@exemplo.com', 'Novo Nome', 30)
ON CONFLICT (email) DO UPDATE SET
nome = EXCLUDED.nome, -- Refere-se ao novo valor 'Novo Nome'
idade = EXCLUDED.idade; -- Refere-se ao novo valor 30
| Cenário | Expressão | Significado | Valor Exemplo |
|---|---|---|---|
| Campo da Tabela Original | usuarios.nome |
Valor atual da linha em conflito. | 'Nome Antigo' |
| Campo do Novo Valor | EXCLUDED.nome |
Novo valor pretendido para inserção. | 'Novo Nome' |
| Cálculo Combinado | usuarios.idade + EXCLUDED.idade |
Combinação do valor original com o novo. | 25 + 30 = 55 |
Exemplo 1: Acumulando Estoque
-- Acumula estoque de produto: Estoque atual 100 + Nova quantidade 50 = 150
INSERT INTO produtos (sku, estoque)
VALUES ('IPHONE15', 50)
ON CONFLICT (sku) DO UPDATE SET
estoque = produtos.estoque + EXCLUDED.estoque -- Estoque atual + novo valor
RETURNING estoque;
Exemplo 2: Atualizando Apenas Campos Não Nulos
-- Se o novo valor for NULL, mantém o valor original
INSERT INTO usuarios (email, nome, idade)
VALUES ('teste@exemplo.com', 'Novo Nome', NULL)
ON CONFLICT (email) DO UPDATE SET
nome = COALESCE(EXCLUDED.nome, usuarios.nome), -- Mantém 'Novo Nome'
idade = COALESCE(EXCLUDED.idade, usuarios.idade); -- Mantém a idade original
Exemplo 3: Atualizando Timestamp
-- Atualiza o campo `atualizado_em` durante a atualização
INSERT INTO usuarios (email, nome)
VALUES ('teste@exemplo.com', 'Novo Nome')
ON CONFLICT (email) DO UPDATE SET
nome = EXCLUDED.nome,
atualizado_em = NOW() -- PostgreSQL
-- atualizado_em = CURRENT_TIMESTAMP -- SQLite
RETURNING
A cláusula RETURNING permite obter os dados da linha afetada (inserida ou atualizada) sem a necessidade de uma consulta SELECT adicional.
INSERT INTO usuarios (email, nome)
VALUES ('teste@exemplo.com', 'Fulano')
RETURNING id, email, nome, criado_em;
Exemplo 1: Obtendo o ID Pós-Inserção
-- PostgreSQL / SQLite 3.35+
sql = text("""
INSERT INTO usuarios (email, nome)
VALUES (:email, :nome)
RETURNING id, email, criado_em
""")
resultado = await sessao.execute(sql, {"email": "teste@exemplo.com", "nome": "Fulano"})
usuario = resultado.mappings().first()
print(usuario["id"]) -- Acessa o ID diretamente
Exemplo 2: Retorno Unificado de UPSERT
-- Retorna o estado final, seja por inserção ou atualização
INSERT INTO usuarios (email, nome, contador_login)
VALUES ('teste@exemplo.com', 'Fulano', 1)
ON CONFLICT (email) DO UPDATE SET
nome = EXCLUDED.nome,
contador_login = usuarios.contador_login + 1 -- Incrementa contador
RETURNING
id,
email,
nome,
contador_login,
CASE
WHEN xmax = 0 THEN 'inserido' -- PostgreSQL: xmax=0 indica inserção
ELSE 'atualizado'
END AS acao
Exemplo 3: Retorno em Operações em Lote
-- PostgreSQL suporta RETURNING em operações em lote
INSERT INTO usuarios (email, nome)
VALUES
('a@exemplo.com', 'A'),
('b@exemplo.com', 'B')
ON CONFLICT (email) DO UPDATE SET
nome = EXCLUDED.nome
RETURNING id, email, nome;
Processamento em Python para retornos em lote:
resultado = await sessao.execute(sql)
usuarios = [dict(linha) for linha in resultado.mappings().all()]
# [{'id': 1, 'email': 'a@exemplo.com', 'nome': 'A'}, ...]
Exemplo: Contador de Login de Usuário
async def registrar_login_usuario(sessao: AsyncSession, email: str, nome: str) -> dict:
"""
Registra o login de um usuário:
- Novo usuário: Insere com contador_login = 1.
- Usuário existente: Atualiza contador_login e last_login.
- Retorna o estado final e o tipo de ação (inserido/atualizado).
"""
sql = text("""
INSERT INTO usuarios (
email, nome, contador_login, ultimo_login, criado_em
) VALUES (
:email, :nome, 1, :agora, :agora
)
ON CONFLICT (email) DO UPDATE SET
nome = EXCLUDED.nome, -- Atualiza nome
contador_login = usuarios.contador_login + 1, -- Incrementa contador
ultimo_login = EXCLUDED.ultimo_login -- Atualiza último login
RETURNING
id,
email,
nome,
contador_login,
ultimo_login,
criado_em,
CASE
WHEN xmax = 0 THEN 'inserido'
ELSE 'atualizado'
END AS acao -- Específico do PostgreSQL para distinguir ações
""")
agora = datetime.utcnow()
resultado = await sessao.execute(
sql,
{"email": email, "nome": nome, "agora": agora}
)
linha = resultado.mappings().first()
return dict(linha) if linha else None
# Uso:
usuario_status = await registrar_login_usuario(sessao, "teste@exemplo.com", "Fulano")
print(f"{usuario_status['acao']} usuário {usuario_status['email']} com {usuario_status['contador_login']} logins")
# Saída potencial: inserido usuário teste@exemplo.com com 1 logins
# Ou: atualizado usuário teste@exemplo.com com 5 logins
Modelos de Exemplo
from sqlalchemy import Column, Integer, String, UniqueConstraint
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Usuario(Base):
__tablename__ = "usuarios"
id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(String(100), unique=True, nullable=False)
nome = Column(String(50))
idade = Column(Integer)
saldo = Column(Integer, default=0)
__table_args__ = (
UniqueConstraint("email", name="uq_usuarios_email"),
)
class Produto(Base):
__tablename__ = "produtos"
id = Column(Integer, primary_key=True)
sku = Column(String(50), unique=True, nullable=False)
nome = Column(String(100))
estoque = Column(Integer, default=0)
preco = Column(Integer)
Abordagem ORM com SQLAlchemy
Importe a função insert de acordo com o dialeto específico ou use a função genérica.
Exemplo Básico
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy import insert, func # func para NOW() ou CURRENT_TIMESTAMP
async def upsert_usuario_orm(sessao: AsyncSession, dados_usuario: dict) -> dict:
"""
Realiza UPSERT de usuário usando o ORM do SQLAlchemy.
Atualiza se o email já existir, caso contrário, insere.
"""
# Abordagem 1: Usando a função genérica insert (recomendado)
# SQLAlchemy seleciona a sintaxe correta automaticamente com base no dialeto do banco.
stmt = (
insert(Usuario)
.values(**dados_usuario)
.on_conflict_do_update(
index_elements=["email"], # Coluna(s) para detecção de conflito (unicidade)
set_={
"nome": dados_usuario["nome"],
"idade": dados_usuario.get("idade"),
# Assume-se que há uma coluna 'atualizado_em'
"atualizado_em": func.now()
}
)
.returning(Usuario) # Retorna a linha após inserção/atualização
)
resultado = await sessao.execute(stmt)
usuario = resultado.scalar_one() # Pega a única linha retornada
return {
"id": usuario.id,
"email": usuario.email,
"nome": usuario.nome,
"idade": usuario.idade
}
async def upsert_usuario_ignorar(sessao: AsyncSession, dados_usuario: dict) -> bool:
"""
Realiza UPSERT, mas ignora a linha em caso de conflito (DO NOTHING).
"""
stmt = (
insert(Usuario)
.values(**dados_usuario)
.on_conflict_do_nothing(
index_elements=["email"]
)
)
resultado = await sessao.execute(stmt)
# Retorna True se uma linha foi inserida, False se houve conflito e nada foi feito.
return resultado.rowcount > 0
Atualização Condicional com ORM
Permite atualizar campos específicos apenas sob certas condições.
async def upsert_usuario_condicional(sessao: AsyncSession, dados_usuario: dict) -> dict:
"""
UPSERT: Atualiza apenas campos não nulos em caso de conflito.
"""
stmt = (
insert(Usuario)
.values(**dados_usuario)
.on_conflict_do_update(
index_elements=["email"],
set_={
"nome": dados_usuario["nome"],
# Se 'idade' não for fornecido, mantém o valor original da coluna Usuario.idade
"idade": dados_usuario.get("idade", Usuario.idade),
},
# Opcional: adicionar condição WHERE específica para a atualização
where=Usuario.email == dados_usuario["email"]
)
.returning(Usuario)
)
resultado = await sessao.execute(stmt)
# Retorna um dicionário da linha afetada
return resultado.mappings().first()
UPSERT em Lote com ORM
async def upsert_em_lote_usuarios(sessao: AsyncSession, usuarios: list[dict]) -> int:
"""
Executa UPSERT em lote para múltiplos usuários.
"""
stmt = (
insert(Usuario)
.values(usuarios) # Passa a lista de dicionários diretamente
.on_conflict_do_update(
index_elements=["email"],
set_={
# Usa a sintaxe de referência a EXCLUDED dentro do ORM
"nome": insert(Usuario).excluded.nome,
"idade": insert(Usuario).excluded.idade,
}
)
)
resultado = await sessao.execute(stmt)
# Retorna o número de linhas afetadas (inseridas ou atualizadas)
return resultado.rowcount
Utilizando EXCLUDED no ORM
async def upsert_produto_com_estoque(sessao: AsyncSession, dados_produto: dict) -> dict:
"""
UPSERT de produto: em caso de conflito no SKU, acumula o estoque.
"""
stmt = (
insert(Produto)
.values(**dados_produto)
.on_conflict_do_update(
index_elements=["sku"],
set_={
# Acumula estoque: Estoque atual + Estoque novo
"estoque": Produto.estoque + insert(Produto).excluded.estoque,
# Atualiza outros campos
"nome": insert(Produto).excluded.nome,
"preco": insert(Produto).excluded.preco,
}
)
.returning(Produto) # Retorna os dados do produto após a operação
)
resultado = await sessao.execute(stmt)
# Retorna um dicionário dos dados do produto
return resultado.mappings().first()
Serviço de Usuário com UPSERT
class ServicoUsuario:
"""Serviço para gerenciar operações de usuário, incluindo UPSERT."""
def __init__(self, sessao: AsyncSession):
self.sessao = sessao
async def criar_ou_atualizar(self, email: str, nome: str, idade: int | None = None) -> dict:
"""Cria um novo usuário ou atualiza um existente."""
stmt = (
insert(Usuario)
.values(
email=email,
nome=nome,
idade=idade,
criado_em=datetime.utcnow() # Define data de criação
)
.on_conflict_do_update(
index_elements=["email"], # Chave de conflito
set_={
"nome": nome, # Atualiza nome
"idade": idade, # Atualiza idade
"atualizado_em": datetime.utcnow() # Define data de atualização
}
)
.returning(Usuario) # Retorna a linha resultante
)
resultado = await self.sessao.execute(stmt)
usuario = resultado.scalar_one() # Obtém a instância do usuário
return {
"id": usuario.id,
"email": usuario.email,
"nome": usuario.nome,
"idade": usuario.idade
}
async def criar_ou_atualizar_em_lote(self, usuarios_data: list[dict]) -> int:
"""Realiza criação ou atualização em lote."""
stmt = (
insert(Usuario)
.values(usuarios_data) # Insere múltiplos registros
.on_conflict_do_update(
index_elements=["email"], # Coluna de conflito
set_={
"nome": insert(Usuario).excluded.nome, # Usa valor do registro novo
"idade": insert(Usuario).excluded.idade, # Usa valor do registro novo
"atualizado_em": datetime.utcnow() # Define data de atualização
}
)
)
resultado = await self.sessao.execute(stmt)
return resultado.rowcount # Retorna número de linhas afetadas
async def criar_se_nao_existe(self, email: str, nome: str) -> bool:
"""Tenta criar um usuário apenas se o email não existir."""
stmt = (
insert(Usuario)
.values(
email=email,
nome=nome,
criado_em=datetime.utcnow()
)
.on_conflict_do_nothing(
index_elements=["email"] # Ignora se o email já existir
)
)
resultado = await self.sessao.execute(stmt)
# Retorna True se a inserção ocorreu, False caso contrário
return resultado.rowcount > 0
SQL Nativo
Exemplo Básico (PostgreSQL e SQLite)
async def upsert_usuario_pg(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
"""
Executa operação UPSERT nativa no PostgreSQL.
"""
sql_pg = text("""
INSERT INTO usuarios (email, nome, idade, criado_em)
VALUES (:email, :nome, :idade, :criado_em)
ON CONFLICT (email) DO UPDATE
SET
nome = EXCLUDED.nome, -- Usa o valor novo em caso de conflito
idade = EXCLUDED.idade,
atualizado_em = NOW() -- Função NOW() do PostgreSQL
RETURNING id, email, nome, idade
""")
resultado = await sessao.execute(
sql_pg,
{
"email": dados_usuario["email"],
"nome": dados_usuario["nome"],
"idade": dados_usuario.get("idade"),
"criado_em": datetime.utcnow()
}
)
linha = resultado.mappings().first()
return dict(linha) if linha else None
async def upsert_usuario_sqlite(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
"""
Executa operação UPSERT nativa no SQLite.
A sintaxe é muito similar à do PostgreSQL.
"""
sql_sqlite = text("""
INSERT INTO usuarios (email, nome, idade, criado_em)
VALUES (:email, :nome, :idade, :criado_em)
ON CONFLICT(email) DO UPDATE SET -- Nota: sem espaço antes dos parênteses
nome = excluded.nome, -- Usa 'excluded' em minúsculas no SQLite
idade = excluded.idade,
atualizado_em = CURRENT_TIMESTAMP -- Função CURRENT_TIMESTAMP do SQLite
RETURNING id, email, nome, idade
""")
resultado = await sessao.execute(
sql_sqlite,
{
"email": dados_usuario["email"],
"nome": dados_usuario["nome"],
"idade": dados_usuario.get("idade"),
"criado_em": datetime.utcnow()
}
)
linha = resultado.mappings().first()
return dict(linha) if linha else None
Ignorando Conflitos com SQL Nativo
async def inserir_ou_ignorar_usuario(sessao: AsyncSession, dados_usuario: dict) -> bool:
"""
Tenta inserir um usuário; se o email já existir, ignora a operação.
"""
# SQL para PostgreSQL (funciona de forma similar no SQLite)
sql = text("""
INSERT INTO usuarios (email, nome, idade, criado_em)
VALUES (:email, :nome, :idade, :criado_em)
ON CONFLICT (email) DO NOTHING
""")
resultado = await sessao.execute(
sql,
{
"email": dados_usuario["email"],
"nome": dados_usuario["nome"],
"idade": dados_usuario.get("idade"),
"criado_em": datetime.utcnow()
}
)
# Retorna True se uma linha foi inserida, False se houve conflito
return resultado.rowcount > 0
UPSERT em Lote com SQL Nativo
async def upsert_em_lote_produtos(sessao: AsyncSession, produtos: list[dict]) -> int:
"""
Executa UPSERT em lote para produtos usando SQL nativo.
"""
# SQL para PostgreSQL
sql_pg = text("""
INSERT INTO produtos (sku, nome, estoque, preco, criado_em)
VALUES (
:sku, :nome, :estoque, :preco, :criado_em
)
ON CONFLICT (sku) DO UPDATE SET
nome = EXCLUDED.nome,
estoque = produtos.estoque + EXCLUDED.estoque, -- Acumula estoque
preco = EXCLUDED.preco,
atualizado_em = NOW()
""")
# Para SQLite, a sintaxe seria similar, usando `excluded` e `CURRENT_TIMESTAMP`.
# Execução em lote (iterativa para demonstração)
for prod_data in produtos:
await sessao.execute(
sql_pg,
{
"sku": prod_data["sku"],
"nome": prod_data["nome"],
"estoque": prod_data.get("estoque", 0),
"preco": prod_data.get("preco", 0),
"criado_em": datetime.utcnow()
}
)
return len(produtos) # Retorna o número de produtos processados
Atualização Parcial e Condicional com SQL Nativo
async def upsert_usuario_inteligente(sessao: AsyncSession, dados_usuario: dict) -> dict | None:
"""
UPSERT 'inteligente':
- Atualiza 'idade' apenas se fornecida.
- Atualiza 'nome' apenas se fornecido.
- Atualiza 'atualizado_em'.
"""
sql = text("""
INSERT INTO usuarios (email, nome, idade, criado_em)
VALUES (:email, :nome, :idade, :criado_em)
ON CONFLICT (email) DO UPDATE SET
-- COALESCE mantém o valor original se o novo valor for NULL
nome = COALESCE(:nome, usuarios.nome),
idade = COALESCE(:idade, usuarios.idade),
atualizado_em = NOW() -- Ou CURRENT_TIMESTAMP para SQLite
RETURNING id, email, nome, idade, atualizado_em
""")
resultado = await sessao.execute(
sql,
{
"email": dados_usuario["email"],
# Passa os valores, que podem ser None, para COALESCE funcionar
"nome": dados_usuario.get("nome"),
"idade": dados_usuario.get("idade"),
"criado_em": datetime.utcnow()
}
)
linha = resultado.mappings().first()
return dict(linha) if linha else None
Registro/Login de Usuário: Atualiza Último Login
async def registrar_ou_logar(sessao: AsyncSession, email: str, nome: str) -> dict:
"""
Registra um novo usuário ou atualiza o último login de um existente.
"""
sql = text("""
INSERT INTO usuarios (email, nome, ultimo_login, criado_em)
VALUES (:email, :nome, :agora, :agora)
ON CONFLICT (email) DO UPDATE SET
ultimo_login = EXCLUDED.ultimo_login, -- Atualiza o timestamp do último login
nome = EXCLUDED.nome -- Opcional: atualiza o nome também
RETURNING id, email, nome, ultimo_login, criado_em
""")
agora = datetime.utcnow()
resultado = await sessao.execute(
sql,
{"email": email, "nome": nome, "agora": agora}
)
return dict(resultado.mappings().first())
Acúmulo de Estoque de Produtos
async def adicionar_estoque_produto(sessao: AsyncSession, sku: str, quantidade: int) -> bool:
"""
Adiciona estoque a um produto. Se o produto não existe, insere-o com a quantidade especificada.
"""
sql = text("""
INSERT INTO produtos (sku, estoque, criado_em)
VALUES (:sku, :quantidade, :agora)
ON CONFLICT (sku) DO UPDATE SET
estoque = produtos.estoque + EXCLUDED.estoque, -- Acumula a quantidade
atualizado_em = NOW() -- Ou CURRENT_TIMESTAMP
""")
resultado = await sessao.execute(
sql,
{
"sku": sku,
"quantidade": quantidade, # O valor de 'quantidade' é usado como 'estoque' na inserção e como 'EXCLUDED.estoque' na atualização
"agora": datetime.utcnow()
}
)
return resultado.rowcount > 0 # Retorna True se a linha foi inserida/atualizada
Acúmulo de Pontos de Usuário
async def adicionar_pontos_usuario(sessao: AsyncSession, user_id: int, pontos: int) -> dict | None:
"""
Acumula pontos de um usuário em uma tabela separada `user_points`.
"""
sql = text("""
INSERT INTO user_points (user_id, points, created_at)
VALUES (:user_id, :points, :now)
ON CONFLICT (user_id) DO UPDATE SET
points = user_points.points + EXCLUDED.points, -- Soma os pontos
updated_at = NOW() -- Ou CURRENT_TIMESTAMP
RETURNING user_id, points
""")
resultado = await sessao.execute(
sql,
{
"user_id": user_id,
"points": pontos,
"now": datetime.utcnow()
}
)
linha = resultado.mappings().first()
return dict(linha) if linha else None
Contador de Tags
async def incrementar_contador_tag(sessao: AsyncSession, tag_name: str) -> int:
"""
Incrementa o contador de uma tag. Se a tag não existe, cria com contador 1.
"""
sql = text("""
INSERT INTO tags (name, count, created_at)
VALUES (:name, 1, :now) -- Inicia com contador 1
ON CONFLICT (name) DO UPDATE SET
count = tags.count + 1, -- Incrementa o contador existente
updated_at = NOW() -- Ou CURRENT_TIMESTAMP
RETURNING count -- Retorna o novo valor do contador
""")
resultado = await sessao.execute(
sql,
{"name": tag_name, "now": datetime.utcnow()}
)
# Retorna o valor do contador ou 0 se algo der errado (embora improvável aqui)
return resultado.scalar() or 0