Introdução ao Módulo threading
O módulo multiprocessing foi completamente inspirado no módulo threading, e ambos possuem grande similarities em sua utilização. Portanto, não entraremos em detalhes aprofundados sobre ambos.
Documentação oficial: https://docs.python.org/3/library/threading.html
Duas Formas de Iniciar uma Thread
Forma 1: Função
from threading import Thread
import time
def cumprimentar(nome):
time.sleep(2)
print(f'{nome} disse olá')
if __name__ == '__main__':
t = Thread(target=cumprimentar, args=('elton',))
t.start()
print('Thread principal')
Forma 2: Classe
from threading import Thread
import time
class Cumprimento(Thread):
def __init__(self, nome):
super().__init__()
self.nome = nome
def run(self):
time.sleep(2)
print(f'{self.nome} disse olá')
if __name__ == '__main__':
t = Cumprimento('elton')
t.start()
print('Thread principal')
Diferenças entre Múltiplas Threads e Múltiplos Processos
Velocidade de Inicialização
Threads inicializam mais rapidamente que processos. Veja o exemplo:
from threading import Thread
from multiprocessing import Process
import os
def tarefa():
print('olá')
if __name__ == '__main__':
# Iniciar threads no processo principal
t = Thread(target=tarefa)
t.start()
print('Thread/Processo principal')
# O resultado será:
# olá
# Thread/Processo principal
# Iniciar processos filhos
t = Process(target=tarefa)
t.start()
print('Thread/Processo principal')
# O resultado será:
# Thread/Processo principal
# olá
Identificador de Processo (PID)
from threading import Thread
from multiprocessing import Process
import os
def tarefa():
print('olá', os.getpid())
if __name__ == '__main__':
# Múltiplas threads - todas compartilham o mesmo PID do processo principal
t1 = Thread(target=tarefa)
t2 = Thread(target=tarefa)
t1.start()
t2.start()
print('PID do processo principal:', os.getpid())
# Múltiplos processos - cada um possui um PID diferente
p1 = Process(target=tarefa)
p2 = Process(target=tarefa)
p1.start()
p2.start()
print('PID do processo principal:', os.getpid())
Compartilhamento de Dados
from threading import Thread
from multiprocessing import Process
def modificar_dado():
global valor
valor = 0
if __name__ == '__main__':
# Com processos: cada um possui sua própria cópia dos dados
# valor = 100
# p = Process(target=modificar_dado)
# p.start()
# p.join()
# print(valor) # Permanece 100
# Com threads: compartilham dados do mesmo processo
valor = 1
t = Thread(target=modificar_dado)
t.start()
t.join()
print('Thread principal:', valor) # Resultado: 0
Exercícios Práticos
Exercício 1: Servidor Socket Multi-threaded
# Servidor
import socket
import threading
servidor = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
servidor.bind(('127.0.0.1', 8080))
servidor.listen(5)
def atender_cliente(conn):
while True:
dados = conn.recv(1024)
print(dados)
conn.send(dados.upper())
if __name__ == '__main__':
while True:
conn, addr = servidor.accept()
thread = threading.Thread(target=atender_cliente, args=(conn,))
thread.start()
# Cliente
import socket
cliente = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cliente.connect(('127.0.0.1', 8080))
while True:
msg = input('>>: ').strip()
if not msg:
continue
cliente.send(msg.encode('utf-8'))
dados = cliente.recv(1024)
print(dados)
Exercício 2: Três Tarefas Concurrentes
Uma tarefa recebe entrada do usuário, outra formata o texto para maiúsculas, e a terceira salva em arquivo:
from threading import Thread
mensagens = []
formatadas = []
def receber_entrada():
while True:
msg = input('>>: ').strip()
if not msg:
continue
mensagens.append(msg)
def formatar_texto():
while True:
if mensagens:
resultado = mensagens.pop()
formatadas.append(resultado.upper())
def salvar_arquivo():
while True:
if formatadas:
with open('dados.txt', 'a', encoding='utf-8') as arquivo:
resultado = formatadas.pop()
arquivo.write(f'{resultado}\n')
if __name__ == '__main__':
t1 = Thread(target=receber_entrada)
t2 = Thread(target=formatar_texto)
t3 = Thread(target=salvar_arquivo)
t1.start()
t2.start()
t3.start()
Métodos Relacionados às Threads
Métodos da instância Thread:
# isAlive(): Retorna se a thread está em execução.
# getName(): Retorna o nome da thread.
# setName(): Define o nome da thread.
Métodos do módulo threading:
# threading.current_thread(): Retorna a thread atual.
# threading.enumerate(): Retorna lista de todas as threads em execução.
# threading.active_count(): Retorna número de threads em execução.
from threading import Thread, current_thread, enumerate, active_count
import time
def tarefa():
time.sleep(3)
print(current_thread().getName())
if __name__ == '__main__':
t = Thread(target=tarefa)
t.start()
print(current_thread().getName())
print(current_thread())
print(enumerate())
print(active_count())
print('Thread principal')
# Resultado:
# MainThread
# <_MainThread(MainThread, started)>
# [<_MainThread(MainThread, started)>, <Thread(Thread-1, started)>]
# Thread principal
# Thread-1
Esperando a Thread Finalizar
from threading import Thread
import time
def dizer_ola(nome):
time.sleep(2)
print(f'{nome} disse olá')
if __name__ == '__main__':
t = Thread(target=dizer_ola, args=('egon',))
t.start()
t.join() # Espera a thread terminar
print('Thread principal')
print(t.is_alive())
# Resultado:
# egon disse olá
# Thread principal
# False
Threads Demônio
Tanto processos quanto threads seguem a regra: o demônio aguarda a entidade principal finalizar para ser encerrado.
# 1. Para o processo principal, finalizar significa executar todo o código do processo
# 2. Para a thread principal, finalizar significa que todas as threads não-demoníaco terminam
from threading import Thread
import time
def dizer_ola(nome):
time.sleep(2)
print(f'{nome} disse olá')
if __name__ == '__main__':
t = Thread(target=dizer_ola, args=('egon',))
t.daemon = True # Deve ser definido antes de start()
t.start()
print('Thread principal')
print(t.is_alive())
# Resultado:
# Thread principal
# True
from threading import Thread
import time
def funcao_a():
print(123)
time.sleep(1)
print("fim123")
def funcao_b():
print(456)
time.sleep(3)
print("fim456")
t1 = Thread(target=funcao_a)
t2 = Thread(target=funcao_b)
t1.daemon = True
t1.start()
t2.start()
print("principal-------")
Python GIL (Global Interpreter Lock)
O GIL (Global Interpreter Lock) é uma característica do interpretador CPython que garante que apenas uma thread execute código Python por vez. Isso ocorre porque o interpretador Python não é thread-safe por padrão.
Por que o GIL existe?
O Python utiliza gerenciamento automático de memória (coletor de lixo). Imagine que uma thread está apagando uma variável enquanto o coletor de lixo tenta limpar esse mesmo espaço de memória. Para evitar problemas assim, o Python simplesmente bloqueia tudo durante a execução de uma thread.
Lock de Sincronização
Importância do Lock
Pontos importantes:
# 1. Threads disputam o GIL (direito de execução), mas precisam do Lock para proteger dados compartilhados
# 2. join() espera tudo (execução totalmente serial), enquanto Lock serializa apenas a parte que modifica dados compartilhados
# 3. GIL e Lock são necessários para diferentes propósitos
GIL vs Lock
Por que precisamos de Lock se já temos GIL?
O GIL garante que apenas uma thread execute código Python por vez, mas não protege dados compartilhados. O Lock é necessário para proteger dados específicos que podem ser modificados por múltiplas threads.
Proteção de dados diferentes requer locks diferentes.
Exemplo sem Lock
from threading import Thread
import time
def decrementar():
global numero
temp = numero
time.sleep(0.1)
numero = temp - 1
if __name__ == '__main__':
numero = 100
threads = []
for i in range(100):
t = Thread(target=decrementar)
threads.append(t)
t.start()
for t in threads:
t.join()
print(numero) # Resultado pode ser 99 (dado incorreto)
Exemplo com Lock
import threading
bloqueio = threading.Lock()
bloqueio.acquire()
# Operações com dados compartilhados
bloqueio.release()
from threading import Thread, Lock
import time
bloqueio = Lock()
numero = 100
def decrementar():
global numero
bloqueio.acquire()
temp = numero
time.sleep(0.1)
numero = temp - 1
bloqueio.release()
if __name__ == '__main__':
threads = []
for i in range(100):
t = Thread(target=decrementar)
threads.append(t)
t.start()
for t in threads:
t.join()
print(numero) # Resultado: 0 (dado correto)
Análise: Lock vs join()
# Sem lock: execução concorrente rápida, mas dados inseguros
from threading import Thread, Lock
import time
def tarefa():
print(f'{Thread.current_thread().getName()} está executando')
global numero
temp = numero
time.sleep(0.5)
numero = temp - 1
if __name__ == '__main__':
numero = 100
inicio = time.time()
threads = [Thread(target=tarefa) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
fim = time.time()
print(f'Tempo: {fim - inicio}, Número: {numero}')
# Tempo: ~0.52s, Número: 99 (incorreto)
# Com lock: parte com lock executa serialmente
from threading import Thread, Lock
import time
def tarefa():
time.sleep(3)
print(f'{Thread.current_thread().getName()} iniciando')
global numero
bloqueio.acquire()
temp = numero
time.sleep(0.5)
numero = temp - 1
bloqueio.release()
if __name__ == '__main__':
numero = 100
bloqueio = Lock()
inicio = time.time()
threads = [Thread(target=tarefa) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
fim = time.time()
print(f'Tempo: {fim - inicio}, Número: {numero}')
# Tempo: ~53s, Número: 0 (correto, mas lento)
# Usando join(): execução totalmente serial
from threading import Thread
import time
def tarefa():
time.sleep(3)
print(f'{Thread.current_thread().getName()} iniciando')
global numero
temp = numero
time.sleep(0.5)
numero = temp - 1
if __name__ == '__main__':
numero = 100
inicio = time.time()
for i in range(100):
t = Thread(target=tarefa)
t.start()
t.join() # Espera cada thread terminar antes de iniciar a próxima
fim = time.time()
print(f'Tempo: {fim - inicio}, Número: {numero}')
# Tempo: ~350s (!), Número: 0 (correto, mas muito lento)
Deadlock e Lock Recursivo
Deadlock
Deadlock ocorre quando duas ou mais threads bloqueiam-se mutuamente, esperando recursos que a outra possui.
from threading import Thread, Lock
import time
mutex_a = Lock()
mutex_b = Lock()
class MinhaThread(Thread):
def run(self):
self.funcao1()
self.funcao2()
def funcao1(self):
mutex_a.acquire()
print(f'{self.name} obteve A')
mutex_b.acquire()
print(f'{self.name} obteve B')
mutex_b.release()
mutex_a.release()
def funcao2(self):
mutex_b.acquire()
print(f'{self.name} obteve B')
time.sleep(2)
mutex_a.acquire()
print(f'{self.name} obteve A')
mutex_a.release()
mutex_b.release()
if __name__ == '__main__':
for i in range(10):
t = MinhaThread()
t.start()
# Thread-1 obteve A
# Thread-1 obteve B
# Thread-1 obteve B
# Thread-2 obteve A
# Bloqueia - deadlock!
Solução: Lock Recursivo (RLock)
O RLock permite que a mesma thread adquira o lock múltiplas vezes. Ele mantém um contador interno.
import threading
# Um RLock para todas as necessidades
mutex = threading.RLock()
# A mesma thread pode adquirir o lock várias vezes
mutex.acquire()
mutex.acquire() # Não blocking para a mesma thread
# ... operações ...
mutex.release()
mutex.release()
Semaphore
Semaphore gerencia um contador interno. acquire() decrementa o contador, release() incrementa. Quando o contador é 0, acquire() bloqueia.
from threading import Thread, Semaphore
import threading
import time
def acessar_recurso():
semaforo.acquire()
print(f'{threading.current_thread().getName()} obteve acesso')
time.sleep(3)
semaforo.release()
if __name__ == '__main__':
semaforo = Semaphore(5) # Máximo 5 threads simultâneas
for i in range(23):
t = Thread(target=acessar_recurso)
t.start()
Event
Event permite que threads aguardem até que um evento seja definido.
event.isSet(): Retorna o estado do evento
event.wait(): Bloqueia se o evento não estiver definido
event.set(): Define o evento como True
event.clear(): Reseta o evento para False
from threading import Thread, Event
import threading
import time
import random
def conectar_banco():
tentativas = 1
while not evento.is_set():
if tentativas > 3:
raise TimeoutError('Tempo de conexão esgotado')
print(f'{threading.current_thread().getName()} - Tentativa {tentativas}')
evento.wait(0.5)
tentativas += 1
print(f'{threading.current_thread().getName()} - Conectado com sucesso')
def verificar_banco():
print(f'{threading.current_thread().getName()} verificando banco de dados')
time.sleep(random.randint(2, 4))
evento.set()
if __name__ == '__main__':
evento = Event()
conn1 = Thread(target=conectar_banco)
conn2 = Thread(target=conectar_banco)
verificacao = Thread(target=verificar_banco)
conn1.start()
conn2.start()
verificacao.start()
Condition
Condition permite que threads aguardem até que uma condição seja satisfeita.
import threading
def executar(n):
condicao.acquire()
condicao.wait()
print(f"Executando thread: {n}")
condicao.release()
if __name__ == '__main__':
condicao = threading.Condition()
for i in range(10):
t = threading.Thread(target=executar, args=(i,))
t.start()
while True:
entrada = input('>>>')
if entrada == 'q':
break
condicao.acquire()
condicao.notify(int(entrada))
condicao.release()
def verificar_condicao():
entrada = input('>>>')
return entrada == '1'
def executar(n):
condicao.acquire()
condicao.wait_for(verificar_condicao)
print(f"Executando thread: {n}")
condicao.release()
if __name__ == '__main__':
condicao = threading.Condition()
for i in range(10):
t = threading.Thread(target=executar, args=(i,))
t.start()
Timer
Timer executa uma função após um intervalo especificado.
from threading import Timer
def saudacao():
print("Olá, mundo!")
t = Timer(1, saudacao)
t.start() # Após 1 segundo, "Olá, mundo!" será impresso
from threading import Timer
import random
class GeradorCodigo:
def __init__(self):
self.cache = ''
self.timer = None
self.gerar_cache()
def gerar_cache(self, intervalo=5):
self.cache = self.criar_codigo()
print(self.cache)
self.timer = Timer(intervalo, self.gerar_cache)
self.timer.start()
def criar_codigo(self, n=4):
resultado = ''
for _ in range(n):
digito = str(random.randint(0, 9))
letra = chr(random.randint(65, 90))
resultado += random.choice([digito, letra])
return resultado
def verificar(self):
while True:
entrada = input('>>: ').strip()
if entrada.upper() == self.cache:
print('Verificado com sucesso')
self.timer.cancel()
break
if __name__ == '__main__':
obj = GeradorCodigo()
obj.verificar()
Filas de Thread
O módulo queue fornece filas thread-safe.
FIFO (First In, First Out)
import queue
fila = queue.Queue()
fila.put('primeiro')
fila.put('segundo')
fila.put('terceiro')
print(fila.get())
print(fila.get())
print(fila.get())
# Resultado:
# primeiro
# segundo
# terceiro
LIFO (Last In, First Out)
import queue
fila = queue.LifoQueue()
fila.put('primeiro')
fila.put('segundo')
fila.put('terceiro')
print(fila.get())
print(fila.get())
print(fila.get())
# Resultado:
# terceiro
# segundo
# primeiro
Priority Queue
import queue
fila = queue.PriorityQueue()
fila.put((2, 'segundo'))
fila.put((1, 'primeiro'))
fila.put((3, 'terceiro'))
print(fila.get()[1])
print(fila.get()[1])
print(fila.get()[1])
# Resultado:
# primeiro
# segundo
# terceiro
Métodos da Queue
Queue.qsize() - Retorna tamanho aproximado
Queue.empty() - Retorna True se vazia
Queue.full() - Retorna True se cheia
Queue.put(item) - Insere item (bloqueia se cheia)
Queue.get() - Remove e retorna item (bloqueia se vazia)
Queue.get_nowait() - Não bloqueia
Queue.put_nowait() - Não bloqueia
Queue.task_done() - Indica tarefa completa
Queue.join() - Bloqueia até todas as tarefas serem processadas
Módulo concurrent.futures
Este módulo fornece interfaces de alto nível para programação assíncrona.
Executor
submit(fn, *args, **kwargs) - Submete tarefa assíncrona
map(func, iterables) - Submete múltiplas tarefas
shutdown(wait=True) - Encerra o executor
result(timeout=None) - Obtém resultado
add_done_callback(fn) - Adiciona callback
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def tarefa(n):
print(f'Processo {os.getpid()} executando')
time.sleep(random.randint(1, 3))
return n ** 2
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3)
resultados = []
for i in range(11):
futuro = executor.submit(tarefa, i)
resultados.append(futuro)
executor.shutdown(wait=True)
print('===>')
for futuro in resultados:
print(futuro.result())
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time
import random
def tarefa(n):
print(f'Thread {os.getpid()} executando')
time.sleep(random.randint(1, 3))
return n ** 2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=3)
executor.map(tarefa, range(1, 12))
Callbacks
from concurrent.futures import ProcessPoolExecutor
import requests
def obter_pagina(url):
print(f'Obtendo {url}')
resposta = requests.get(url)
if resposta.status_code == 200:
return {'url': url, 'texto': resposta.text}
def analisar_pagina(futuro):
resultado = futuro.result()
print(f'Analisando {resultado["url"]}')
with open('resultado.txt', 'a') as arquivo:
arquivo.write(f'URL: {resultado["url"]} - Tamanho: {len(resultado["texto"])}\n')
if __name__ == '__main__':
urls = [
'https://www.python.org',
'https://www.google.com',
'https://github.com'
]
executor = ProcessPoolExecutor(3)
for url in urls:
executor.submit(obter_pagina, url).add_done_callback(analisar_pagina)