Configuração Básica para Acesso a Dados de Mercado
Para desenvolver aplicações financeiras que utilizam dados de ações americanas, é essencial estabelecer uma conexão eficiente com APIs de mercado. A seguir, apresenta-se uma implementação em Python para gerenciar requisições a uma API genérica de dados financeiros, com foco em robustze e tratamento de erros.
# market_client.py - Cliente base para APIs financeiras
import requests
import time
from typing import Dict, Optional, Any
# Configurações de conexão
MARKET_CONFIG = {
'api_endpoint': 'http://exemplo.com/api', # Endpoint base da API
'ws_endpoint': 'ws://exemplo.com/stream', # Endpoint WebSocket (não utilizado neste exemplo)
'request_headers': {
'User-Agent': 'FinancialDataClient/2.0',
'Accept-Encoding': 'gzip, deflate'
},
'timeout_seconds': 15,
'max_retries': 4
}
class FinancialDataConnector:
"""Conector genérico para APIs de dados financeiros."""
def __init__(self):
self.api_base = MARKET_CONFIG['api_endpoint']
self.session = requests.Session()
self.session.headers.update(MARKET_CONFIG['request_headers'])
def execute_api_call(self, resource_path: str, query_params: Dict[str, Any]) -> Optional[Dict]:
"""
Executa uma chamada HTTP com mecanismo de retentativa.
Args:
resource_path: Caminho do recurso na API.
query_params: Parâmetros de consulta.
Returns:
Dados da resposta ou None em caso de falha.
"""
for attempt in range(MARKET_CONFIG['max_retries']):
try:
response = self.session.get(
f"{self.api_base}{resource_path}",
params=query_params,
timeout=MARKET_CONFIG['timeout_seconds']
)
if response.status_code == 200:
# Decodificação automática de gzip
return response.json()
elif response.status_code == 429:
# Tratamento de limite de taxa
delay = (attempt + 1) * 1.5
time.sleep(delay)
continue
else:
return None
except requests.exceptions.RequestException as error:
if attempt < MARKET_CONFIG['max_retries'] - 1:
# Estratégia de backoff exponencial
time.sleep(2 ** attempt)
return None
Obtendo Cotações em Tempo Real de Ações Americanas
A obtenção de dados em tempo real é crucial para monitorametno de mercado. A implementação abaixo demonstra como buscar cotações individuais e em lote, com análise estruturada dos dados recebidos.
# realtime_data.py - Módulo de cotações em tempo real
from market_client import FinancialDataConnector
from typing import List, Dict, Optional
import time
class RealTimeStockFetcher(FinancialDataConnector):
"""Classe para obtenção de cotações em tempo real."""
def retrieve_quote(self, stock_symbol: str) -> Optional[Dict]:
"""
Busca a cotação atual de uma ação.
Args:
stock_symbol: Símbolo da ação (ex: 'GOOGL').
Returns:
Dados estruturados da cotação.
"""
endpoint = "/market/quote"
parameters = {'symbol': stock_symbol}
raw_response = self.execute_api_call(endpoint, parameters)
if raw_response and raw_response.get('status') == 'success':
return self.transform_quote_data(raw_response['data'])
return None
def fetch_batch_quotes(self, symbols: List[str]) -> Dict[str, Optional[Dict]]:
"""
Busca cotações para múltiplas ações com controle de frequência.
Args:
symbols: Lista de símbolos de ações.
Returns:
Mapeamento de símbolo para dados de cotação.
"""
results = {}
for symbol in symbols:
results[symbol] = self.retrieve_quote(symbol)
# Pausa para respeitar limites de taxa
time.sleep(0.4)
return results
def transform_quote_data(self, raw_data: Dict) -> Dict:
"""
Converte dados brutos em formato estruturado.
Args:
raw_data: Dados da API.
Returns:
Dicionário com campos padronizados.
"""
structured = {}
# Mapeamento de campos da API
field_mapping = {
'ticker': 'ticker',
'current_price': 'preco_atual',
'session_open': 'abertura_sessao',
'session_high': 'maxima_sessao',
'session_low': 'minima_sessao',
'previous_close': 'fechamento_anterior',
'volume_total': 'volume_total',
'price_change': 'variacao_preco',
'change_percent': 'percentual_variacao'
}
for api_key, struct_key in field_mapping.items():
structured[struct_key] = raw_data.get(api_key)
# Processamento de dados de profundidade
if 'order_book' in raw_data:
book_data = raw_data['order_book']
structured['ofertas_compra'] = []
for i in range(5):
bid_entry = book_data.get(f'bid_{i+1}')
if bid_entry:
structured['ofertas_compra'].append({
'preco': bid_entry.get('price'),
'quantidade': bid_entry.get('size')
})
structured['ofertas_venda'] = []
for i in range(5):
ask_entry = book_data.get(f'ask_{i+1}')
if ask_entry:
structured['ofertas_venda'].append({
'preco': ask_entry.get('price'),
'quantidade': ask_entry.get('size')
})
return structured
# Exemplo de utilização
def executar_exemplo_cotacoes():
"""Demonstração de obtenção de cotações."""
fetcher = RealTimeStockFetcher()
# Cotação individual
cotacao_apple = fetcher.retrieve_quote('AAPL')
if cotacao_apple:
print("Dados Apple:")
print(f"Ticker: {cotacao_apple['ticker']}")
print(f"Preço: {cotacao_apple['preco_atual']}")
print(f"Variação: {cotacao_apple['percentual_variacao']}%")
# Exibir book de ofertas
if 'ofertas_compra' in cotacao_apple and cotacao_apple['ofertas_compra']:
print("\nOfertas de Compra (Top 3):")
for idx, bid in enumerate(cotacao_apple['ofertas_compra'][:3]):
print(f" {idx+1}: ${bid['preco']} - {bid['quantidade']} unidades")
# Cotações em lote
simbolos = ['MSFT', 'AMZN']
resultados_lote = fetcher.fetch_batch_quotes(simbolos)
for simbolo, dados in resultados_lote.items():
if dados:
print(f"\n{simbolo}: ${dados['preco_atual']} ({dados['percentual_variacao']}%)")
if __name__ == "__main__":
executar_exemplo_cotacoes()
Acessando Dados Históricos de K-lines
Para análise técnica e backtesting, dados históricos de candles (K-lines) são indispansáveis. A classe abaixo gerencia a recuperação e transformação desses dados em diferentes temporalidades.
# kline_processor.py - Processador de dados de candles
from market_client import FinancialDataConnector
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional, Tuple
class HistoricalKlineManager(FinancialDataConnector):
"""Gerenciador de dados históricos de K-lines."""
# Mapeamento de intervalos de tempo
INTERVAL_MAPPING = {
'1min': {'id': '1m', 'descricao': '1 Minuto'},
'5min': {'id': '5m', 'descricao': '5 Minutos'},
'15min': {'id': '15m', 'descricao': '15 Minutos'},
'1hora': {'id': '1h', 'descricao': '1 Hora'},
'diario': {'id': '1d', 'descricao': 'Diário'},
'mensal': {'id': '1M', 'descricao': 'Mensal'}
}
# Limites de registros por intervalo
MAX_RECORDS = {
'1m': 500,
'5m': 250,
'15m': 200,
'1h': 180,
'1d': 120,
'1M': 60
}
def fetch_kline_data(self, stock_code: str, interval: str = 'diario',
record_count: int = 90) -> Optional[List]:
"""
Obtém dados de K-lines históricos.
Args:
stock_code: Código da ação.
interval: Intervalo de tempo (ex: 'diario').
record_count: Número de registros desejados.
Returns:
Lista de dados de K-lines ou None.
"""
interval_config = self.INTERVAL_MAPPING.get(interval)
if not interval_config:
print(f"Intervalo não suportado: {interval}")
return None
# Ajustar quantidade máxima
max_allowed = self.MAX_RECORDS.get(interval_config['id'], 100)
if record_count > max_allowed:
record_count = max_allowed
resource = "/historical/candles"
params = {
'ticker': stock_code,
'interval': interval_config['id'],
'limit': record_count
}
response_data = self.execute_api_call(resource, params)
if isinstance(response_data, list):
return self.normalize_kline_format(response_data)
return None
def normalize_kline_format(self, raw_klines: List) -> List[Dict]:
"""
Normaliza formato dos dados de K-lines.
Args:
raw_klines: Dados brutos da API.
Returns:
Lista de dicionários estruturados.
"""
normalized = []
for candle in raw_klines:
if len(candle) >= 6:
entry = {
'timestamp_ms': candle[0],
'preco_abertura': float(candle[1]),
'preco_maximo': float(candle[2]),
'preco_minimo': float(candle[3]),
'preco_fechamento': float(candle[4]),
'volume_negociado': float(candle[5]) if candle[5] else 0,
'horario_formatado': candle[6] if len(candle) > 6 else None
}
normalized.append(entry)
return normalized
def convert_to_analysis_dataframe(self, kline_list: List[Dict]) -> pd.DataFrame:
"""
Converte lista de K-lines para DataFrame com indicadores.
Args:
kline_list: Dados estruturados de K-lines.
Returns:
DataFrame do Pandas com colunas adicionais.
"""
if not kline_list:
return pd.DataFrame()
df = pd.DataFrame(kline_list)
# Converter timestamp para datetime
df['data_hora'] = pd.to_datetime(df['timestamp_ms'], unit='ms')
df.set_index('data_hora', inplace=True)
# Calcular indicadores técnicos
df['media_movel_5'] = df['preco_fechamento'].rolling(window=5).mean()
df['media_movel_20'] = df['preco_fechamento'].rolling(window=20).mean()
df['banda_superior'] = df['media_movel_20'] + 2 * df['preco_fechamento'].rolling(window=20).std()
df['banda_inferior'] = df['media_movel_20'] - 2 * df['preco_fechamento'].rolling(window=20).std()
# Calcular variação percentual
df['variacao_percentual'] = df['preco_fechamento'].pct_change() * 100
# Identificar candle color
df['cor_candle'] = df.apply(
lambda row: 'verde' if row['preco_fechamento'] >= row['preco_abertura'] else 'vermelho',
axis=1
)
return df
def generate_complete_analysis(self, stock_code: str, interval: str = 'diario',
count: int = 60) -> Optional[pd.DataFrame]:
"""
Gera análise completa com K-lines e indicadores.
Args:
stock_code: Código da ação.
interval: Intervalo de tempo.
count: Quantidade de registros.
Returns:
DataFrame completo para análise.
"""
raw_data = self.fetch_kline_data(stock_code, interval, count)
if not raw_data:
return None
normalized = self.normalize_kline_format(raw_data)
analysis_df = self.convert_to_analysis_dataframe(normalized)
return analysis_df
# Exemplo de análise de K-lines
def exemplo_analise_klines():
"""Demonstração de processamento de dados históricos."""
manager = HistoricalKlineManager()
# Análise diária da Amazon
df_amazon = manager.generate_complete_analysis('AMZN', interval='diario', count=45)
if df_amazon is not None and not df_amazon.empty:
print(f"Registros diários Amazon: {len(df_amazon)}")
# Estatísticas recentes
ultimo_candle = df_amazon.iloc[-1]
print(f"\nÚltimo dia ({ultimo_candle.name.strftime('%d/%m/%Y')}):")
print(f" Abertura: ${ultimo_candle['preco_abertura']:.2f}")
print(f" Máxima: ${ultimo_candle['preco_maximo']:.2f}")
print(f" Mínima: ${ultimo_candle['preco_minimo']:.2f}")
print(f" Fechamento: ${ultimo_candle['preco_fechamento']:.2f}")
print(f" Volume: {ultimo_candle['volume_negociado']:,.0f}")
print(f" MM5: ${ultimo_candle['media_movel_5']:.2f}")
print(f" MM20: ${ultimo_candle['media_movel_20']:.2f}")
print(f" Cor: {ultimo_candle['cor_candle']}")
# Identificar padrões
max_52_semanas = df_amazon['preco_maximo'].max()
min_52_semanas = df_amazon['preco_minimo'].min()
print(f"\n Máxima no período: ${max_52_semanas:.2f}")
print(f" Mínima no período: ${min_52_semanas:.2f}")
# Análise intraday (15 minutos) da Apple
print("\n" + "="*50)
df_apple_15m = manager.generate_complete_analysis('AAPL', interval='15min', count=30)
if df_apple_15m is not None and not df_apple_15m.empty:
print(f"\nDados de 15 minutos Apple (últimas 30 barras):")
print("Últimos 3 candles:")
for idx, row in df_apple_15m.tail(3).iterrows():
print(f" {idx.strftime('%H:%M')}: "
f"O=${row['preco_abertura']:.2f} "
f"H=${row['preco_maximo']:.2f} "
f"L=${row['preco_minimo']:.2f} "
f"C=${row['preco_fechamento']:.2f} "
f"({row['variacao_percentual']:.2f}%)")
if __name__ == "__main__":
exemplo_analise_klines()