Coleta de Dados Financeiros e Educacionais com Selenium e Armazenamento em Banco de Dados

Coleta de Informações do Mercado de Ações Chinesas

Descrição do Projeto

Desenvolver um sistema automatizado para capturar dados de ações listadas nos mercados de Shanghai e Shenzhen, utilizando Selenium para navegação web e MySQL para persistência dos dados coletados.

Dependências Necessárias

  • Selenium WebDriver: automatiza a interação com navegadores, permitindo renderização completa de páginas dinâmicas e extração de conteúdo HTML
  • time: controle de temporização entre requisições, auxiliando na estabilidade da coleta e evitando bloqueios por frequência excessiva
  • mysql.connector: interface de comunicação entre Python e o servidor MySQL

Configuração do Ambiente de Armazenamento

CONFIG_COLETOR = { 'host': 'localhost', 'user': 'root', 'password': 'senha_segura_123', 'database': NOME_BD, 'charset': 'utf8mb4' }

def inicializar_banco(): conexao = None try: conexao = mysql.connector.connect(**DB_SERVER) if conexao.is_connected(): cursor = conexao.cursor() cursor.execute( f"CREATE DATABASE IF NOT EXISTS {NOME_BD} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;" ) cursor.execute(f"USE {NOME_BD};")

        ddl_tabela = f"""
        CREATE TABLE IF NOT EXISTS {NOME_TABELA} (
            registro_id INT AUTO_INCREMENT PRIMARY KEY,
            nome_mercado VARCHAR(50) NOT NULL,
            codigo_acao VARCHAR(10) NOT NULL,
            nome_empresa VARCHAR(50) NOT NULL,
            cotacao_atual DECIMAL(10, 3),
            variacao_percentual DECIMAL(8, 2),
            variacao_valor DECIMAL(10, 3),
            volume_negociado VARCHAR(50),
            valor_negociado VARCHAR(50),
            oscilacao DECIMAL(8, 2),
            preco_maximo DECIMAL(10, 3),
            preco_minimo DECIMAL(10, 3),
            preco_abertura DECIMAL(10, 3),
            fechamento_anterior DECIMAL(10, 3),
            momento_coleta DATETIME DEFAULT CURRENT_TIMESTAMP,
            UNIQUE KEY (codigo_acao, nome_mercado, momento_coleta)
        );
        """
        cursor.execute(ddl_tabela)
        return True

except mysql.connector.Error as erro:
    print(f"Falha na inicialização: {erro}")
    return False
finally:
    if conexao and conexao.is_connected():
        cursor.close()
        conexao.close()

def registrar_dados(cursor, dados): consulta = """ INSERT INTO cotacoes_acoes (nome_mercado, codigo_acao, nome_empresa, cotacao_atual, variacao_percentual, variacao_valor, volume_negociado, valor_negociado, oscilacao, preco_maximo, preco_minimo, preco_abertura, fechamento_anterior, momento_coleta) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE nome_empresa = VALUES(nome_empresa), cotacao_atual = VALUES(cotacao_atual), variacao_percentual = VALUES(variacao_percentual), variacao_valor = VALUES(variacao_valor), volume_negociado = VALUES(volume_negociado), valor_negociado = VALUES(valor_negociado), oscilacao = VALUES(oscilacao), preco_maximo = VALUES(preco_maximo), preco_minimo = VALUES(preco_minimo), preco_abertura = VALUES(preco_abertura), fechamento_anterior = VALUES(fechamento_anterior), momento_coleta = VALUES(momento_coleta); """ try: cursor.execute(consulta, dados) return True except mysql.connector.Error as err: print(f"Erro ao inserir: {err}") return False


</details>### Fluxo Principal de Coleta

1. **Navegação por mercados:** Iterar sobre os mercados definidos (HS\_A, SH\_A, SZ\_A), alternando via fragmento de URL (#). Utilizar WebDriverWait para garantir carregamento completo
2. **Extração de dados:** Localizar cada linha da tabela (tr) e capturar o texto de cada célula (td). Aplicar funções de normalização para tratar valores inválidos como '-' ou 'nan'
3. **Paginação:** Após extrair todos os dados da página atual, localizar o botão de próxima página via XPath e executar o clique via Selenium

<details><summary>Clique para visualizar o código de coleta</summary>```
def executar_coleta_mercado(nome, hash_mercado, navegador):
    print(f">>> Iniciando coleta: {nome}")
    url_completa = f"{URL_BASE}{hash_mercado}"
    navegador.get(url_completa)

    SELETOR_TABELA = "//div[@class='quotetable']"
    SELETOR_CORPO = f"{SELETOR_TABELA}/table/tbody"
    SELETOR_PAGINACAO = f"{SELETOR_TABELA}/div[@class='qtpager']"

    try:
        WebDriverWait(navegador, 20).until(
            EC.presence_of_element_located((By.XPATH, SELETOR_CORPO))
        )
        WebDriverWait(navegador, 20).until(
            EC.presence_of_element_located((By.XPATH, f"{SELETOR_CORPO}/tr"))
        )
    except TimeoutException:
        print(f"Timeout ao carregar {nome}")
        return []

    # Determinar total de páginas
    total_paginas = 1
    try:
        links_paginas = navegador.find_elements(By.XPATH, f"{SELETOR_PAGINACAO}/a")
        maior_num = 0
        for link in links_paginas:
            texto = link.text.strip()
            if texto.isdigit():
                maior_num = max(maior_num, int(texto))
        if maior_num > 0:
            total_paginas = maior_num
    except Exception:
        pass

    pagina_atual = 1
    resultados = []
    ultimo_codigo_primeira_linha = None

    while len(resultados) < LIMITE_POR_MERCADO and pagina_atual <= total_paginas:
        try:
            if pagina_atual > 1:
                WebDriverWait(navegador, 15).until(
                    lambda d: d.find_element(
                        By.XPATH, f"{SELETOR_CORPO}/tr[1]/td[2]"
                    ).text.strip() != ultimo_codigo_primeira_linha
                )
                time.sleep(1)

            corpo_tabela = WebDriverWait(navegador, 15).until(
                EC.presence_of_element_located((By.XPATH, SELETOR_CORPO))
            )
            time.sleep(1)

            linhas = corpo_tabela.find_elements(By.TAG_NAME, "tr")

            if linhas:
                primeira_linha = linhas[0].find_elements(By.TAG_NAME, "td")
                if len(primeira_linha) >= 2:
                    ultimo_codigo_primeira_linha = primeira_linha[1].text.strip()

            for linha in linhas:
                if len(resultados) >= LIMITE_POR_MERCADO:
                    break
                colunas = linha.find_elements(By.TAG_NAME, "td")
                if len(colunas) >= 14:
                    registro = (
                        nome,
                        colunas[1].text.strip(),
                        colunas[2].text.strip(),
                        converter_valor(colunas[3].text.strip()),
                        converter_valor(colunas[4].text.strip().replace('%', '')),
                        converter_valor(colunas[5].text.strip()),
                        formatar_volume(colunas[6].text.strip()),
                        formatar_volume(colunas[7].text.strip()),
                        converter_valor(colunas[8].text.strip().replace('%', '')),
                        converter_valor(colunas[9].text.strip()),
                        converter_valor(colunas[10].text.strip()),
                        converter_valor(colunas[11].text.strip()),
                        converter_valor(colunas[12].text.strip()),
                        datetime.now()
                    )
                    resultados.append(registro)

            if len(resultados) >= LIMITE_POR_MERCADO:
                break

            pagina_atual += 1
            if pagina_atual <= total_paginas:
                try:
                    btn_proximo = WebDriverWait(navegador, 10).until(
                        EC.element_to_be_clickable(
                            (By.XPATH, f"{SELETOR_PAGINACAO}/a[@title='下一页' and text()='>']")
                        )
                    )
                    navegador.execute_script("arguments[0].click();", btn_proximo)
                    time.sleep(3)
                except (TimeoutException, NoSuchElementException):
                    break

        except StaleElementReferenceException:
            print("Elemento obsoleto detectado, interrompendo coleta deste mercado")
            break
        except TimeoutException:
            print(f"Timeout na página {pagina_atual}")
            break

    print(f">>> {nome}: {len(resultados)} registros coletados")
    return resultados

Os dados foram armazenados em tabelas MySQL separadas por mercado: Ações HS, Ações SH e Ações SZ, contendo informações como código, nome, cotação, variação, volume e timestamps de coleta.

Dificuldades Encontradas

  • Incompatibilidade entre versão do WebDriver e navegador instalado, resolvida mediante download da versão correta do driver
  • Identificação precisa do seletor para o botão de paginação, que exigia combinação de atributo title e conteúdo textual específico

Extração de Dados de Cursos Educacionais Online

Objetivo

Desenvolver um coletor para capturar informações de cursos de uma plataforma educacional chinesa, incluindo identificador, nome, instituição, docentes, equipe, participantes, progresso e descrição do curso.

Bibliotecsa Utilizadas

  • Selenium WebDriver: para automação do navegador e interação com elementos dinâmicos
  • pymysql: driver para comunicação com MySQL
  • re: biblioteca de expressões regulares para extração de identificadores de URLs
        sql = """
        INSERT INTO cursos 
        (identificador, titulo, instituicao, docente_principal, equipe,
         total_alunos, andamento, descricao) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        cursor.execute(sql, (
            info_curso['curso_id'],
            info_curso['titulo'],
            info_curso['instituicao'],
            info_curso['docente'],
            info_curso['equipe'],
            info_curso['alunos'],
            info_curso['andamento'],
            info_curso['descricao']
        ))
    conexao.commit()
except Exception as e:
    conexao.rollback()
    print(f"Erro no armazenamento: {e}")
finally:
    conexao.close()

</details>### Estratégia de Coleta

A abordagem adotada consiste em acessar o painel pessoal do usuário autenticado, onde os cursos inscritos são listados de forma organizada. Isso evita interferências causadas por atualizações dinâmicas da página principal.

Para localizar os links dos cursos, utiliza-se o XPath `//*[contains(@class, 'menu')]//a`, que captura elementos de navegação dentro da estrutura de menu da plataforma.

<details><summary>Clique para visualizar o código de coleta de cursos</summary>```
def coletar_cursos_painel():
    opcoes = webdriver.ChromeOptions()
    opcoes.add_argument('--disable-blink-features=AutomationControlled')
    opcoes.add_experimental_option('excludeSwitches', ['enable-logging'])

    navegador = webdriver.Chrome(options=opcoes)
    navegador.maximize_window()

    try:
        navegador.get("https://www.icourse163.org/home.htm")
        espera = WebDriverWait(navegador, 300)

        # Aguardar login manual do usuário
        print("Aguardando autenticação do usuário...")
        espera.until(
            lambda d: "home.htm" in d.current_url and "login" not in d.current_url
        )
        time.sleep(30)

        # Rolar página para renderizar todos os cards
        for _ in range(3):
            navegador.execute_script("window.scrollBy(0, 500);")
            time.sleep(0.5)

        # Extrair URLs dos cursos a partir do menu
        urls_cursos = []
        elementos_menu = navegador.find_elements(
            By.XPATH, "//*[contains(@class, 'menu')]//a"
        )
        for elem in elementos_menu:
            href = elem.get_attribute("href")
            url_limpa = limpar_url(href)
            if url_limpa and url_limpa not in urls_cursos:
                urls_cursos.append(url_limpa)

        urls_finais = urls_cursos[:5]

        # Funções auxiliares para extração segura
        def obter_texto(seletores):
            for sel in seletores:
                try:
                    el = navegador.find_element(By.CSS_SELECTOR, sel)
                    if el.text.strip():
                        return el.text.strip()
                except:
                    continue
            return "Não disponível"

        def obter_atributo(seletores, atributo):
            for sel in seletores:
                try:
                    return navegador.find_element(
                        By.CSS_SELECTOR, sel
                    ).get_attribute(atributo)
                except:
                    continue
            return "Não disponível"

        # Visitar cada página de curso e extrair informações
        for idx, link_curso in enumerate(urls_finais):
            navegador.get(link_curso)
            WebDriverWait(navegador, 10).until(
                EC.presence_of_element_located((By.CLASS_NAME, "course-title"))
            )
            time.sleep(1)

            dados = {
                'curso_id': link_curso.split("/")[-1],
                'titulo': obter_texto([".course-title", "h1"]),
                'instituicao': obter_texto([
                    ".school-box .school-name",
                    ".m-teachers a"
                ]),
                'docente': obter_atributo([
                    ".m-teachers_teacher-list .u-teacher-name"
                ], "title"),
                'equipe': obter_atributo([
                    ".m-teachers_teacher-list .u-teacher-name"
                ], "title"),
                'alunos': obter_texto([
                    ".course-enroll-info_course-info_term-progress .count",
                    ".course-enroll-info_course-info_term-num"
                ]),
                'andamento': obter_texto([
                    ".course-enroll-info_course-info_term-info_term-time"
                ]),
                'descricao': obter_texto([
                    "#j-rectxt2", ".m-infotxt", ".u-course-intro"
                ])[:800]
            }

            armazenar_curso(dados)
            time.sleep(2)

    except Exception as erro_geral:
        print(f"Erro no processo principal: {erro_geral}")
    finally:
        navegador.quit()

A coleta semi-automatizada mostrou-se eficiente, com período de espera de 30 segundos para autenticação manual seguida de extração automatizada dos dados do painel pessoal. A utilização de múltiplos seletores CSS para cada campo garante maior robustez contra mudanças na estrutura da página.


Processamento de Dados em Tempo Real com Flume e Kafka

Infraestrutura Utilizada

Configuração de um pipeline de ingestão de logs utilizando serviços de big data da nuvem Huawei, envolvendo coleta via Flume e transporte via Kafka.

Tarefas Executadas

1. Ativação do Serviço MapReduce

Provisionamento do cluster de processamento distribuído, configurando os recursos computacionais necessários para as operações de análise em tempo real.

2. Geração de Dados de Teste

Criação de script Python para produção de dados simulados:

# Script localizado em /opt/client/autodatagen.py
# Execução: python autodatagen.py "/tmp/flume_spooldir/test.txt" 100

3. Configuração do Apache Kafka

Preparação do ambiente e criação do tópico de mensagens:

# Carregar variáveis de ambiente
source /opt/Bigdata/client/bigdata_env

# Criar tópico para recebimento de dados
kafka-topics.sh --create --topic fludesc \
  --partitions 1 --replication-factor 1 \
  --bootstrap-server 192.174.2.105:9092

# Verificar tópicos existentes
kafka-topics.sh --list --bootstrap-server 192.174.2.105:9092

4. Instalação e Configuração do Cliente Flume

Instalação do agente Flume e reinicialização do serviço:

cd /opt/FlumeClient/fusioninsight-flume-1.11.0
sh bin/flume-manage.sh restart

Configuração do arquivo conf/properties.properties para definir fonte, canal e destino da coleta de dados.

5. Verificação do Pipeline Completo

Execução do script gerador de dados seguida da verificação do consumo no terminal do Flume, confirmando o fluxo end-to-end de ingestão e transporte de dados.

Desafios Técnicos

  • Complexidade na configuração dos serviços distribuídos e identificação de erros em etapas intermediárias
  • Dependência entre componentes do pipeline exigindo ordem específica de inicialização e configuração

Código-fonte completo disponível no repositório do projeto.

Tags: Selenium MySQL WebScraping Python ApacheKafka

Publicado em 6-19 22:00