Integração com API de Dados Financeiros para Ações dos EUA: Implementação de Dados em Tempo Real e K-lines

Configuração Básica para Acesso a Dados de Mercado

Para desenvolver aplicações financeiras que utilizam dados de ações americanas, é essencial estabelecer uma conexão eficiente com APIs de mercado. A seguir, apresenta-se uma implementação em Python para gerenciar requisições a uma API genérica de dados financeiros, com foco em robustze e tratamento de erros.

# market_client.py - Cliente base para APIs financeiras
import requests
import time
from typing import Dict, Optional, Any

# Configurações de conexão
MARKET_CONFIG = {
    'api_endpoint': 'http://exemplo.com/api',  # Endpoint base da API
    'ws_endpoint': 'ws://exemplo.com/stream',  # Endpoint WebSocket (não utilizado neste exemplo)
    'request_headers': {
        'User-Agent': 'FinancialDataClient/2.0',
        'Accept-Encoding': 'gzip, deflate'
    },
    'timeout_seconds': 15,
    'max_retries': 4
}

class FinancialDataConnector:
    """Conector genérico para APIs de dados financeiros."""
    
    def __init__(self):
        self.api_base = MARKET_CONFIG['api_endpoint']
        self.session = requests.Session()
        self.session.headers.update(MARKET_CONFIG['request_headers'])
    
    def execute_api_call(self, resource_path: str, query_params: Dict[str, Any]) -> Optional[Dict]:
        """
        Executa uma chamada HTTP com mecanismo de retentativa.
        
        Args:
            resource_path: Caminho do recurso na API.
            query_params: Parâmetros de consulta.
        
        Returns:
            Dados da resposta ou None em caso de falha.
        """
        for attempt in range(MARKET_CONFIG['max_retries']):
            try:
                response = self.session.get(
                    f"{self.api_base}{resource_path}",
                    params=query_params,
                    timeout=MARKET_CONFIG['timeout_seconds']
                )
                
                if response.status_code == 200:
                    # Decodificação automática de gzip
                    return response.json()
                elif response.status_code == 429:
                    # Tratamento de limite de taxa
                    delay = (attempt + 1) * 1.5
                    time.sleep(delay)
                    continue
                else:
                    return None
                    
            except requests.exceptions.RequestException as error:
                if attempt < MARKET_CONFIG['max_retries'] - 1:
                    # Estratégia de backoff exponencial
                    time.sleep(2 ** attempt)
        
        return None

Obtendo Cotações em Tempo Real de Ações Americanas

A obtenção de dados em tempo real é crucial para monitorametno de mercado. A implementação abaixo demonstra como buscar cotações individuais e em lote, com análise estruturada dos dados recebidos.

# realtime_data.py - Módulo de cotações em tempo real
from market_client import FinancialDataConnector
from typing import List, Dict, Optional
import time

class RealTimeStockFetcher(FinancialDataConnector):
    """Classe para obtenção de cotações em tempo real."""
    
    def retrieve_quote(self, stock_symbol: str) -> Optional[Dict]:
        """
        Busca a cotação atual de uma ação.
        
        Args:
            stock_symbol: Símbolo da ação (ex: 'GOOGL').
        
        Returns:
            Dados estruturados da cotação.
        """
        endpoint = "/market/quote"
        parameters = {'symbol': stock_symbol}
        
        raw_response = self.execute_api_call(endpoint, parameters)
        
        if raw_response and raw_response.get('status') == 'success':
            return self.transform_quote_data(raw_response['data'])
        
        return None
    
    def fetch_batch_quotes(self, symbols: List[str]) -> Dict[str, Optional[Dict]]:
        """
        Busca cotações para múltiplas ações com controle de frequência.
        
        Args:
            symbols: Lista de símbolos de ações.
        
        Returns:
            Mapeamento de símbolo para dados de cotação.
        """
        results = {}
        for symbol in symbols:
            results[symbol] = self.retrieve_quote(symbol)
            # Pausa para respeitar limites de taxa
            time.sleep(0.4)
        
        return results
    
    def transform_quote_data(self, raw_data: Dict) -> Dict:
        """
        Converte dados brutos em formato estruturado.
        
        Args:
            raw_data: Dados da API.
        
        Returns:
            Dicionário com campos padronizados.
        """
        structured = {}
        
        # Mapeamento de campos da API
        field_mapping = {
            'ticker': 'ticker',
            'current_price': 'preco_atual',
            'session_open': 'abertura_sessao',
            'session_high': 'maxima_sessao',
            'session_low': 'minima_sessao',
            'previous_close': 'fechamento_anterior',
            'volume_total': 'volume_total',
            'price_change': 'variacao_preco',
            'change_percent': 'percentual_variacao'
        }
        
        for api_key, struct_key in field_mapping.items():
            structured[struct_key] = raw_data.get(api_key)
        
        # Processamento de dados de profundidade
        if 'order_book' in raw_data:
            book_data = raw_data['order_book']
            
            structured['ofertas_compra'] = []
            for i in range(5):
                bid_entry = book_data.get(f'bid_{i+1}')
                if bid_entry:
                    structured['ofertas_compra'].append({
                        'preco': bid_entry.get('price'),
                        'quantidade': bid_entry.get('size')
                    })
            
            structured['ofertas_venda'] = []
            for i in range(5):
                ask_entry = book_data.get(f'ask_{i+1}')
                if ask_entry:
                    structured['ofertas_venda'].append({
                        'preco': ask_entry.get('price'),
                        'quantidade': ask_entry.get('size')
                    })
        
        return structured


# Exemplo de utilização
def executar_exemplo_cotacoes():
    """Demonstração de obtenção de cotações."""
    fetcher = RealTimeStockFetcher()
    
    # Cotação individual
    cotacao_apple = fetcher.retrieve_quote('AAPL')
    if cotacao_apple:
        print("Dados Apple:")
        print(f"Ticker: {cotacao_apple['ticker']}")
        print(f"Preço: {cotacao_apple['preco_atual']}")
        print(f"Variação: {cotacao_apple['percentual_variacao']}%")
        
        # Exibir book de ofertas
        if 'ofertas_compra' in cotacao_apple and cotacao_apple['ofertas_compra']:
            print("\nOfertas de Compra (Top 3):")
            for idx, bid in enumerate(cotacao_apple['ofertas_compra'][:3]):
                print(f"  {idx+1}: ${bid['preco']} - {bid['quantidade']} unidades")
    
    # Cotações em lote
    simbolos = ['MSFT', 'AMZN']
    resultados_lote = fetcher.fetch_batch_quotes(simbolos)
    
    for simbolo, dados in resultados_lote.items():
        if dados:
            print(f"\n{simbolo}: ${dados['preco_atual']} ({dados['percentual_variacao']}%)")


if __name__ == "__main__":
    executar_exemplo_cotacoes()

Acessando Dados Históricos de K-lines

Para análise técnica e backtesting, dados históricos de candles (K-lines) são indispansáveis. A classe abaixo gerencia a recuperação e transformação desses dados em diferentes temporalidades.

# kline_processor.py - Processador de dados de candles
from market_client import FinancialDataConnector
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional, Tuple

class HistoricalKlineManager(FinancialDataConnector):
    """Gerenciador de dados históricos de K-lines."""
    
    # Mapeamento de intervalos de tempo
    INTERVAL_MAPPING = {
        '1min': {'id': '1m', 'descricao': '1 Minuto'},
        '5min': {'id': '5m', 'descricao': '5 Minutos'},
        '15min': {'id': '15m', 'descricao': '15 Minutos'},
        '1hora': {'id': '1h', 'descricao': '1 Hora'},
        'diario': {'id': '1d', 'descricao': 'Diário'},
        'mensal': {'id': '1M', 'descricao': 'Mensal'}
    }
    
    # Limites de registros por intervalo
    MAX_RECORDS = {
        '1m': 500,
        '5m': 250,
        '15m': 200,
        '1h': 180,
        '1d': 120,
        '1M': 60
    }
    
    def fetch_kline_data(self, stock_code: str, interval: str = 'diario', 
                         record_count: int = 90) -> Optional[List]:
        """
        Obtém dados de K-lines históricos.
        
        Args:
            stock_code: Código da ação.
            interval: Intervalo de tempo (ex: 'diario').
            record_count: Número de registros desejados.
        
        Returns:
            Lista de dados de K-lines ou None.
        """
        interval_config = self.INTERVAL_MAPPING.get(interval)
        if not interval_config:
            print(f"Intervalo não suportado: {interval}")
            return None
        
        # Ajustar quantidade máxima
        max_allowed = self.MAX_RECORDS.get(interval_config['id'], 100)
        if record_count > max_allowed:
            record_count = max_allowed
        
        resource = "/historical/candles"
        params = {
            'ticker': stock_code,
            'interval': interval_config['id'],
            'limit': record_count
        }
        
        response_data = self.execute_api_call(resource, params)
        
        if isinstance(response_data, list):
            return self.normalize_kline_format(response_data)
        
        return None
    
    def normalize_kline_format(self, raw_klines: List) -> List[Dict]:
        """
        Normaliza formato dos dados de K-lines.
        
        Args:
            raw_klines: Dados brutos da API.
        
        Returns:
            Lista de dicionários estruturados.
        """
        normalized = []
        
        for candle in raw_klines:
            if len(candle) >= 6:
                entry = {
                    'timestamp_ms': candle[0],
                    'preco_abertura': float(candle[1]),
                    'preco_maximo': float(candle[2]),
                    'preco_minimo': float(candle[3]),
                    'preco_fechamento': float(candle[4]),
                    'volume_negociado': float(candle[5]) if candle[5] else 0,
                    'horario_formatado': candle[6] if len(candle) > 6 else None
                }
                normalized.append(entry)
        
        return normalized
    
    def convert_to_analysis_dataframe(self, kline_list: List[Dict]) -> pd.DataFrame:
        """
        Converte lista de K-lines para DataFrame com indicadores.
        
        Args:
            kline_list: Dados estruturados de K-lines.
        
        Returns:
            DataFrame do Pandas com colunas adicionais.
        """
        if not kline_list:
            return pd.DataFrame()
        
        df = pd.DataFrame(kline_list)
        
        # Converter timestamp para datetime
        df['data_hora'] = pd.to_datetime(df['timestamp_ms'], unit='ms')
        df.set_index('data_hora', inplace=True)
        
        # Calcular indicadores técnicos
        df['media_movel_5'] = df['preco_fechamento'].rolling(window=5).mean()
        df['media_movel_20'] = df['preco_fechamento'].rolling(window=20).mean()
        df['banda_superior'] = df['media_movel_20'] + 2 * df['preco_fechamento'].rolling(window=20).std()
        df['banda_inferior'] = df['media_movel_20'] - 2 * df['preco_fechamento'].rolling(window=20).std()
        
        # Calcular variação percentual
        df['variacao_percentual'] = df['preco_fechamento'].pct_change() * 100
        
        # Identificar candle color
        df['cor_candle'] = df.apply(
            lambda row: 'verde' if row['preco_fechamento'] >= row['preco_abertura'] else 'vermelho',
            axis=1
        )
        
        return df
    
    def generate_complete_analysis(self, stock_code: str, interval: str = 'diario',
                                   count: int = 60) -> Optional[pd.DataFrame]:
        """
        Gera análise completa com K-lines e indicadores.
        
        Args:
            stock_code: Código da ação.
            interval: Intervalo de tempo.
            count: Quantidade de registros.
        
        Returns:
            DataFrame completo para análise.
        """
        raw_data = self.fetch_kline_data(stock_code, interval, count)
        if not raw_data:
            return None
        
        normalized = self.normalize_kline_format(raw_data)
        analysis_df = self.convert_to_analysis_dataframe(normalized)
        
        return analysis_df


# Exemplo de análise de K-lines
def exemplo_analise_klines():
    """Demonstração de processamento de dados históricos."""
    manager = HistoricalKlineManager()
    
    # Análise diária da Amazon
    df_amazon = manager.generate_complete_analysis('AMZN', interval='diario', count=45)
    
    if df_amazon is not None and not df_amazon.empty:
        print(f"Registros diários Amazon: {len(df_amazon)}")
        
        # Estatísticas recentes
        ultimo_candle = df_amazon.iloc[-1]
        print(f"\nÚltimo dia ({ultimo_candle.name.strftime('%d/%m/%Y')}):")
        print(f"  Abertura: ${ultimo_candle['preco_abertura']:.2f}")
        print(f"  Máxima: ${ultimo_candle['preco_maximo']:.2f}")
        print(f"  Mínima: ${ultimo_candle['preco_minimo']:.2f}")
        print(f"  Fechamento: ${ultimo_candle['preco_fechamento']:.2f}")
        print(f"  Volume: {ultimo_candle['volume_negociado']:,.0f}")
        print(f"  MM5: ${ultimo_candle['media_movel_5']:.2f}")
        print(f"  MM20: ${ultimo_candle['media_movel_20']:.2f}")
        print(f"  Cor: {ultimo_candle['cor_candle']}")
        
        # Identificar padrões
        max_52_semanas = df_amazon['preco_maximo'].max()
        min_52_semanas = df_amazon['preco_minimo'].min()
        print(f"\n  Máxima no período: ${max_52_semanas:.2f}")
        print(f"  Mínima no período: ${min_52_semanas:.2f}")
    
    # Análise intraday (15 minutos) da Apple
    print("\n" + "="*50)
    df_apple_15m = manager.generate_complete_analysis('AAPL', interval='15min', count=30)
    
    if df_apple_15m is not None and not df_apple_15m.empty:
        print(f"\nDados de 15 minutos Apple (últimas 30 barras):")
        print("Últimos 3 candles:")
        
        for idx, row in df_apple_15m.tail(3).iterrows():
            print(f"  {idx.strftime('%H:%M')}: "
                  f"O=${row['preco_abertura']:.2f} "
                  f"H=${row['preco_maximo']:.2f} "
                  f"L=${row['preco_minimo']:.2f} "
                  f"C=${row['preco_fechamento']:.2f} "
                  f"({row['variacao_percentual']:.2f}%)")


if __name__ == "__main__":
    exemplo_analise_klines()

Tags: Python api-financeira dados-tempo-real K-lines acoes-americanas

Publicado em 6-8 22:10 por Thomas