Programação Concorrente em Python: Tudo sobre Threading

Introdução ao Módulo threading

O módulo multiprocessing foi completamente inspirado no módulo threading, e ambos possuem grande similarities em sua utilização. Portanto, não entraremos em detalhes aprofundados sobre ambos.

Documentação oficial: https://docs.python.org/3/library/threading.html

Duas Formas de Iniciar uma Thread

Forma 1: Função

from threading import Thread
import time

def cumprimentar(nome):
    time.sleep(2)
    print(f'{nome} disse olá')

if __name__ == '__main__':
    t = Thread(target=cumprimentar, args=('elton',))
    t.start()
    print('Thread principal')

Forma 2: Classe

from threading import Thread
import time

class Cumprimento(Thread):
    def __init__(self, nome):
        super().__init__()
        self.nome = nome

    def run(self):
        time.sleep(2)
        print(f'{self.nome} disse olá')

if __name__ == '__main__':
    t = Cumprimento('elton')
    t.start()
    print('Thread principal')

Diferenças entre Múltiplas Threads e Múltiplos Processos

Velocidade de Inicialização

Threads inicializam mais rapidamente que processos. Veja o exemplo:

from threading import Thread
from multiprocessing import Process
import os

def tarefa():
    print('olá')

if __name__ == '__main__':
    # Iniciar threads no processo principal
    t = Thread(target=tarefa)
    t.start()
    print('Thread/Processo principal')

    # O resultado será:
    # olá
    # Thread/Processo principal

    # Iniciar processos filhos
    t = Process(target=tarefa)
    t.start()
    print('Thread/Processo principal')

    # O resultado será:
    # Thread/Processo principal
    # olá

Identificador de Processo (PID)

from threading import Thread
from multiprocessing import Process
import os

def tarefa():
    print('olá', os.getpid())

if __name__ == '__main__':
    # Múltiplas threads - todas compartilham o mesmo PID do processo principal
    t1 = Thread(target=tarefa)
    t2 = Thread(target=tarefa)
    t1.start()
    t2.start()
    print('PID do processo principal:', os.getpid())

    # Múltiplos processos - cada um possui um PID diferente
    p1 = Process(target=tarefa)
    p2 = Process(target=tarefa)
    p1.start()
    p2.start()
    print('PID do processo principal:', os.getpid())

Compartilhamento de Dados

from threading import Thread
from multiprocessing import Process

def modificar_dado():
    global valor
    valor = 0

if __name__ == '__main__':
    # Com processos: cada um possui sua própria cópia dos dados
    # valor = 100
    # p = Process(target=modificar_dado)
    # p.start()
    # p.join()
    # print(valor)  # Permanece 100

    # Com threads: compartilham dados do mesmo processo
    valor = 1
    t = Thread(target=modificar_dado)
    t.start()
    t.join()
    print('Thread principal:', valor)  # Resultado: 0

Exercícios Práticos

Exercício 1: Servidor Socket Multi-threaded

# Servidor
import socket
import threading

servidor = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
servidor.bind(('127.0.0.1', 8080))
servidor.listen(5)

def atender_cliente(conn):
    while True:
        dados = conn.recv(1024)
        print(dados)
        conn.send(dados.upper())

if __name__ == '__main__':
    while True:
        conn, addr = servidor.accept()
        thread = threading.Thread(target=atender_cliente, args=(conn,))
        thread.start()

# Cliente
import socket

cliente = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cliente.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg:
        continue
    cliente.send(msg.encode('utf-8'))
    dados = cliente.recv(1024)
    print(dados)

Exercício 2: Três Tarefas Concurrentes

Uma tarefa recebe entrada do usuário, outra formata o texto para maiúsculas, e a terceira salva em arquivo:

from threading import Thread

mensagens = []
formatadas = []

def receber_entrada():
    while True:
        msg = input('>>: ').strip()
        if not msg:
            continue
        mensagens.append(msg)

def formatar_texto():
    while True:
        if mensagens:
            resultado = mensagens.pop()
            formatadas.append(resultado.upper())

def salvar_arquivo():
    while True:
        if formatadas:
            with open('dados.txt', 'a', encoding='utf-8') as arquivo:
                resultado = formatadas.pop()
                arquivo.write(f'{resultado}\n')

if __name__ == '__main__':
    t1 = Thread(target=receber_entrada)
    t2 = Thread(target=formatar_texto)
    t3 = Thread(target=salvar_arquivo)
    t1.start()
    t2.start()
    t3.start()

Métodos Relacionados às Threads

Métodos da instância Thread:
  # isAlive(): Retorna se a thread está em execução.
  # getName(): Retorna o nome da thread.
  # setName(): Define o nome da thread.

Métodos do módulo threading:
  # threading.current_thread(): Retorna a thread atual.
  # threading.enumerate(): Retorna lista de todas as threads em execução.
  # threading.active_count(): Retorna número de threads em execução.

from threading import Thread, current_thread, enumerate, active_count
import time

def tarefa():
    time.sleep(3)
    print(current_thread().getName())

if __name__ == '__main__':
    t = Thread(target=tarefa)
    t.start()

    print(current_thread().getName())
    print(current_thread())
    print(enumerate())
    print(active_count())
    print('Thread principal')

    # Resultado:
    # MainThread
    # <_MainThread(MainThread, started)>
    # [<_MainThread(MainThread, started)>, <Thread(Thread-1, started)>]
    # Thread principal
    # Thread-1

Esperando a Thread Finalizar

from threading import Thread
import time

def dizer_ola(nome):
    time.sleep(2)
    print(f'{nome} disse olá')

if __name__ == '__main__':
    t = Thread(target=dizer_ola, args=('egon',))
    t.start()
    t.join()  # Espera a thread terminar
    print('Thread principal')
    print(t.is_alive())

    # Resultado:
    # egon disse olá
    # Thread principal
    # False

Threads Demônio

Tanto processos quanto threads seguem a regra: o demônio aguarda a entidade principal finalizar para ser encerrado.

# 1. Para o processo principal, finalizar significa executar todo o código do processo

# 2. Para a thread principal, finalizar significa que todas as threads não-demoníaco terminam

from threading import Thread
import time

def dizer_ola(nome):
    time.sleep(2)
    print(f'{nome} disse olá')

if __name__ == '__main__':
    t = Thread(target=dizer_ola, args=('egon',))
    t.daemon = True  # Deve ser definido antes de start()
    t.start()
    print('Thread principal')
    print(t.is_alive())

    # Resultado:
    # Thread principal
    # True

from threading import Thread
import time

def funcao_a():
    print(123)
    time.sleep(1)
    print("fim123")

def funcao_b():
    print(456)
    time.sleep(3)
    print("fim456")

t1 = Thread(target=funcao_a)
t2 = Thread(target=funcao_b)

t1.daemon = True
t1.start()
t2.start()
print("principal-------")

Python GIL (Global Interpreter Lock)

O GIL (Global Interpreter Lock) é uma característica do interpretador CPython que garante que apenas uma thread execute código Python por vez. Isso ocorre porque o interpretador Python não é thread-safe por padrão.

Por que o GIL existe?

O Python utiliza gerenciamento automático de memória (coletor de lixo). Imagine que uma thread está apagando uma variável enquanto o coletor de lixo tenta limpar esse mesmo espaço de memória. Para evitar problemas assim, o Python simplesmente bloqueia tudo durante a execução de uma thread.

Lock de Sincronização

Importância do Lock

Pontos importantes:
# 1. Threads disputam o GIL (direito de execução), mas precisam do Lock para proteger dados compartilhados
# 2. join() espera tudo (execução totalmente serial), enquanto Lock serializa apenas a parte que modifica dados compartilhados
# 3. GIL e Lock são necessários para diferentes propósitos

GIL vs Lock

Por que precisamos de Lock se já temos GIL?

O GIL garante que apenas uma thread execute código Python por vez, mas não protege dados compartilhados. O Lock é necessário para proteger dados específicos que podem ser modificados por múltiplas threads.

Proteção de dados diferentes requer locks diferentes.

Exemplo sem Lock

from threading import Thread
import time

def decrementar():
    global numero
    temp = numero
    time.sleep(0.1)
    numero = temp - 1

if __name__ == '__main__':
    numero = 100
    threads = []
    for i in range(100):
        t = Thread(target=decrementar)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    print(numero)  # Resultado pode ser 99 (dado incorreto)

Exemplo com Lock

import threading

bloqueio = threading.Lock()

bloqueio.acquire()
# Operações com dados compartilhados
bloqueio.release()

from threading import Thread, Lock
import time

bloqueio = Lock()
numero = 100

def decrementar():
    global numero
    bloqueio.acquire()
    temp = numero
    time.sleep(0.1)
    numero = temp - 1
    bloqueio.release()

if __name__ == '__main__':
    threads = []
    for i in range(100):
        t = Thread(target=decrementar)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    print(numero)  # Resultado: 0 (dado correto)

Análise: Lock vs join()

# Sem lock: execução concorrente rápida, mas dados inseguros
from threading import Thread, Lock
import time

def tarefa():
    print(f'{Thread.current_thread().getName()} está executando')
    global numero
    temp = numero
    time.sleep(0.5)
    numero = temp - 1

if __name__ == '__main__':
    numero = 100
    inicio = time.time()
    threads = [Thread(target=tarefa) for _ in range(100)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    fim = time.time()
    print(f'Tempo: {fim - inicio}, Número: {numero}')

    # Tempo: ~0.52s, Número: 99 (incorreto)

# Com lock: parte com lock executa serialmente
from threading import Thread, Lock
import time

def tarefa():
    time.sleep(3)
    print(f'{Thread.current_thread().getName()} iniciando')
    global numero
    bloqueio.acquire()
    temp = numero
    time.sleep(0.5)
    numero = temp - 1
    bloqueio.release()

if __name__ == '__main__':
    numero = 100
    bloqueio = Lock()
    inicio = time.time()
    threads = [Thread(target=tarefa) for _ in range(100)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    fim = time.time()
    print(f'Tempo: {fim - inicio}, Número: {numero}')

    # Tempo: ~53s, Número: 0 (correto, mas lento)

# Usando join(): execução totalmente serial
from threading import Thread
import time

def tarefa():
    time.sleep(3)
    print(f'{Thread.current_thread().getName()} iniciando')
    global numero
    temp = numero
    time.sleep(0.5)
    numero = temp - 1

if __name__ == '__main__':
    numero = 100
    inicio = time.time()
    for i in range(100):
        t = Thread(target=tarefa)
        t.start()
        t.join()  # Espera cada thread terminar antes de iniciar a próxima
    fim = time.time()
    print(f'Tempo: {fim - inicio}, Número: {numero}')

    # Tempo: ~350s (!), Número: 0 (correto, mas muito lento)

Deadlock e Lock Recursivo

Deadlock

Deadlock ocorre quando duas ou mais threads bloqueiam-se mutuamente, esperando recursos que a outra possui.

from threading import Thread, Lock
import time

mutex_a = Lock()
mutex_b = Lock()

class MinhaThread(Thread):
    def run(self):
        self.funcao1()
        self.funcao2()

    def funcao1(self):
        mutex_a.acquire()
        print(f'{self.name} obteve A')

        mutex_b.acquire()
        print(f'{self.name} obteve B')
        mutex_b.release()
        mutex_a.release()

    def funcao2(self):
        mutex_b.acquire()
        print(f'{self.name} obteve B')
        time.sleep(2)

        mutex_a.acquire()
        print(f'{self.name} obteve A')
        mutex_a.release()
        mutex_b.release()

if __name__ == '__main__':
    for i in range(10):
        t = MinhaThread()
        t.start()

    # Thread-1 obteve A
    # Thread-1 obteve B
    # Thread-1 obteve B
    # Thread-2 obteve A
    # Bloqueia - deadlock!

Solução: Lock Recursivo (RLock)

O RLock permite que a mesma thread adquira o lock múltiplas vezes. Ele mantém um contador interno.

import threading

# Um RLock para todas as necessidades
mutex = threading.RLock()

# A mesma thread pode adquirir o lock várias vezes
mutex.acquire()
mutex.acquire()  # Não blocking para a mesma thread
# ... operações ...
mutex.release()
mutex.release()

Semaphore

Semaphore gerencia um contador interno. acquire() decrementa o contador, release() incrementa. Quando o contador é 0, acquire() bloqueia.

from threading import Thread, Semaphore
import threading
import time

def acessar_recurso():
    semaforo.acquire()
    print(f'{threading.current_thread().getName()} obteve acesso')
    time.sleep(3)
    semaforo.release()

if __name__ == '__main__':
    semaforo = Semaphore(5)  # Máximo 5 threads simultâneas
    for i in range(23):
        t = Thread(target=acessar_recurso)
        t.start()

Event

Event permite que threads aguardem até que um evento seja definido.

event.isSet(): Retorna o estado do evento
event.wait(): Bloqueia se o evento não estiver definido
event.set(): Define o evento como True
event.clear(): Reseta o evento para False

from threading import Thread, Event
import threading
import time
import random

def conectar_banco():
    tentativas = 1
    while not evento.is_set():
        if tentativas > 3:
            raise TimeoutError('Tempo de conexão esgotado')
        print(f'{threading.current_thread().getName()} - Tentativa {tentativas}')
        evento.wait(0.5)
        tentativas += 1
    print(f'{threading.current_thread().getName()} - Conectado com sucesso')

def verificar_banco():
    print(f'{threading.current_thread().getName()} verificando banco de dados')
    time.sleep(random.randint(2, 4))
    evento.set()

if __name__ == '__main__':
    evento = Event()
    conn1 = Thread(target=conectar_banco)
    conn2 = Thread(target=conectar_banco)
    verificacao = Thread(target=verificar_banco)

    conn1.start()
    conn2.start()
    verificacao.start()

Condition

Condition permite que threads aguardem até que uma condição seja satisfeita.

import threading

def executar(n):
    condicao.acquire()
    condicao.wait()
    print(f"Executando thread: {n}")
    condicao.release()

if __name__ == '__main__':
    condicao = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=executar, args=(i,))
        t.start()

    while True:
        entrada = input('>>>')
        if entrada == 'q':
            break
        condicao.acquire()
        condicao.notify(int(entrada))
        condicao.release()

def verificar_condicao():
    entrada = input('>>>')
    return entrada == '1'

def executar(n):
    condicao.acquire()
    condicao.wait_for(verificar_condicao)
    print(f"Executando thread: {n}")
    condicao.release()

if __name__ == '__main__':
    condicao = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=executar, args=(i,))
        t.start()

Timer

Timer executa uma função após um intervalo especificado.

from threading import Timer

def saudacao():
    print("Olá, mundo!")

t = Timer(1, saudacao)
t.start()  # Após 1 segundo, "Olá, mundo!" será impresso

from threading import Timer
import random

class GeradorCodigo:
    def __init__(self):
        self.cache = ''
        self.timer = None
        self.gerar_cache()

    def gerar_cache(self, intervalo=5):
        self.cache = self.criar_codigo()
        print(self.cache)
        self.timer = Timer(intervalo, self.gerar_cache)
        self.timer.start()

    def criar_codigo(self, n=4):
        resultado = ''
        for _ in range(n):
            digito = str(random.randint(0, 9))
            letra = chr(random.randint(65, 90))
            resultado += random.choice([digito, letra])
        return resultado

    def verificar(self):
        while True:
            entrada = input('>>: ').strip()
            if entrada.upper() == self.cache:
                print('Verificado com sucesso')
                self.timer.cancel()
                break

if __name__ == '__main__':
    obj = GeradorCodigo()
    obj.verificar()

Filas de Thread

O módulo queue fornece filas thread-safe.

FIFO (First In, First Out)

import queue

fila = queue.Queue()
fila.put('primeiro')
fila.put('segundo')
fila.put('terceiro')

print(fila.get())
print(fila.get())
print(fila.get())

# Resultado:
# primeiro
# segundo
# terceiro

LIFO (Last In, First Out)

import queue

fila = queue.LifoQueue()
fila.put('primeiro')
fila.put('segundo')
fila.put('terceiro')

print(fila.get())
print(fila.get())
print(fila.get())

# Resultado:
# terceiro
# segundo
# primeiro

Priority Queue

import queue

fila = queue.PriorityQueue()
fila.put((2, 'segundo'))
fila.put((1, 'primeiro'))
fila.put((3, 'terceiro'))

print(fila.get()[1])
print(fila.get()[1])
print(fila.get()[1])

# Resultado:
# primeiro
# segundo
# terceiro

Métodos da Queue

Queue.qsize()          - Retorna tamanho aproximado
Queue.empty()          - Retorna True se vazia
Queue.full()           - Retorna True se cheia
Queue.put(item)        - Insere item (bloqueia se cheia)
Queue.get()            - Remove e retorna item (bloqueia se vazia)
Queue.get_nowait()     - Não bloqueia
Queue.put_nowait()     - Não bloqueia
Queue.task_done()      - Indica tarefa completa
Queue.join()           - Bloqueia até todas as tarefas serem processadas

Módulo concurrent.futures

Este módulo fornece interfaces de alto nível para programação assíncrona.

Executor

submit(fn, *args, **kwargs) - Submete tarefa assíncrona
map(func, iterables)        - Submete múltiplas tarefas
shutdown(wait=True)         - Encerra o executor
result(timeout=None)        - Obtém resultado
add_done_callback(fn)       - Adiciona callback

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def tarefa(n):
    print(f'Processo {os.getpid()} executando')
    time.sleep(random.randint(1, 3))
    return n ** 2

if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=3)
    resultados = []
    for i in range(11):
        futuro = executor.submit(tarefa, i)
        resultados.append(futuro)
    executor.shutdown(wait=True)
    print('===>')
    for futuro in resultados:
        print(futuro.result())

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import os
import time
import random

def tarefa(n):
    print(f'Thread {os.getpid()} executando')
    time.sleep(random.randint(1, 3))
    return n ** 2

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)
    executor.map(tarefa, range(1, 12))

Callbacks

from concurrent.futures import ProcessPoolExecutor
import requests

def obter_pagina(url):
    print(f'Obtendo {url}')
    resposta = requests.get(url)
    if resposta.status_code == 200:
        return {'url': url, 'texto': resposta.text}

def analisar_pagina(futuro):
    resultado = futuro.result()
    print(f'Analisando {resultado["url"]}')
    with open('resultado.txt', 'a') as arquivo:
        arquivo.write(f'URL: {resultado["url"]} - Tamanho: {len(resultado["texto"])}\n')

if __name__ == '__main__':
    urls = [
        'https://www.python.org',
        'https://www.google.com',
        'https://github.com'
    ]

    executor = ProcessPoolExecutor(3)
    for url in urls:
        executor.submit(obter_pagina, url).add_done_callback(analisar_pagina)

Tags: Python threading Concurrency Multithreading GIL

Publicado em 6-24 19:49