Coleta de Informações do Mercado de Ações Chinesas
Descrição do Projeto
Desenvolver um sistema automatizado para capturar dados de ações listadas nos mercados de Shanghai e Shenzhen, utilizando Selenium para navegação web e MySQL para persistência dos dados coletados.
Dependências Necessárias
- Selenium WebDriver: automatiza a interação com navegadores, permitindo renderização completa de páginas dinâmicas e extração de conteúdo HTML
- time: controle de temporização entre requisições, auxiliando na estabilidade da coleta e evitando bloqueios por frequência excessiva
- mysql.connector: interface de comunicação entre Python e o servidor MySQL
Configuração do Ambiente de Armazenamento
CONFIG_COLETOR = { 'host': 'localhost', 'user': 'root', 'password': 'senha_segura_123', 'database': NOME_BD, 'charset': 'utf8mb4' }
def inicializar_banco(): conexao = None try: conexao = mysql.connector.connect(**DB_SERVER) if conexao.is_connected(): cursor = conexao.cursor() cursor.execute( f"CREATE DATABASE IF NOT EXISTS {NOME_BD} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;" ) cursor.execute(f"USE {NOME_BD};")
ddl_tabela = f"""
CREATE TABLE IF NOT EXISTS {NOME_TABELA} (
registro_id INT AUTO_INCREMENT PRIMARY KEY,
nome_mercado VARCHAR(50) NOT NULL,
codigo_acao VARCHAR(10) NOT NULL,
nome_empresa VARCHAR(50) NOT NULL,
cotacao_atual DECIMAL(10, 3),
variacao_percentual DECIMAL(8, 2),
variacao_valor DECIMAL(10, 3),
volume_negociado VARCHAR(50),
valor_negociado VARCHAR(50),
oscilacao DECIMAL(8, 2),
preco_maximo DECIMAL(10, 3),
preco_minimo DECIMAL(10, 3),
preco_abertura DECIMAL(10, 3),
fechamento_anterior DECIMAL(10, 3),
momento_coleta DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY (codigo_acao, nome_mercado, momento_coleta)
);
"""
cursor.execute(ddl_tabela)
return True
except mysql.connector.Error as erro:
print(f"Falha na inicialização: {erro}")
return False
finally:
if conexao and conexao.is_connected():
cursor.close()
conexao.close()
def registrar_dados(cursor, dados): consulta = """ INSERT INTO cotacoes_acoes (nome_mercado, codigo_acao, nome_empresa, cotacao_atual, variacao_percentual, variacao_valor, volume_negociado, valor_negociado, oscilacao, preco_maximo, preco_minimo, preco_abertura, fechamento_anterior, momento_coleta) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE nome_empresa = VALUES(nome_empresa), cotacao_atual = VALUES(cotacao_atual), variacao_percentual = VALUES(variacao_percentual), variacao_valor = VALUES(variacao_valor), volume_negociado = VALUES(volume_negociado), valor_negociado = VALUES(valor_negociado), oscilacao = VALUES(oscilacao), preco_maximo = VALUES(preco_maximo), preco_minimo = VALUES(preco_minimo), preco_abertura = VALUES(preco_abertura), fechamento_anterior = VALUES(fechamento_anterior), momento_coleta = VALUES(momento_coleta); """ try: cursor.execute(consulta, dados) return True except mysql.connector.Error as err: print(f"Erro ao inserir: {err}") return False
</details>### Fluxo Principal de Coleta
1. **Navegação por mercados:** Iterar sobre os mercados definidos (HS\_A, SH\_A, SZ\_A), alternando via fragmento de URL (#). Utilizar WebDriverWait para garantir carregamento completo
2. **Extração de dados:** Localizar cada linha da tabela (tr) e capturar o texto de cada célula (td). Aplicar funções de normalização para tratar valores inválidos como '-' ou 'nan'
3. **Paginação:** Após extrair todos os dados da página atual, localizar o botão de próxima página via XPath e executar o clique via Selenium
<details><summary>Clique para visualizar o código de coleta</summary>```
def executar_coleta_mercado(nome, hash_mercado, navegador):
print(f">>> Iniciando coleta: {nome}")
url_completa = f"{URL_BASE}{hash_mercado}"
navegador.get(url_completa)
SELETOR_TABELA = "//div[@class='quotetable']"
SELETOR_CORPO = f"{SELETOR_TABELA}/table/tbody"
SELETOR_PAGINACAO = f"{SELETOR_TABELA}/div[@class='qtpager']"
try:
WebDriverWait(navegador, 20).until(
EC.presence_of_element_located((By.XPATH, SELETOR_CORPO))
)
WebDriverWait(navegador, 20).until(
EC.presence_of_element_located((By.XPATH, f"{SELETOR_CORPO}/tr"))
)
except TimeoutException:
print(f"Timeout ao carregar {nome}")
return []
# Determinar total de páginas
total_paginas = 1
try:
links_paginas = navegador.find_elements(By.XPATH, f"{SELETOR_PAGINACAO}/a")
maior_num = 0
for link in links_paginas:
texto = link.text.strip()
if texto.isdigit():
maior_num = max(maior_num, int(texto))
if maior_num > 0:
total_paginas = maior_num
except Exception:
pass
pagina_atual = 1
resultados = []
ultimo_codigo_primeira_linha = None
while len(resultados) < LIMITE_POR_MERCADO and pagina_atual <= total_paginas:
try:
if pagina_atual > 1:
WebDriverWait(navegador, 15).until(
lambda d: d.find_element(
By.XPATH, f"{SELETOR_CORPO}/tr[1]/td[2]"
).text.strip() != ultimo_codigo_primeira_linha
)
time.sleep(1)
corpo_tabela = WebDriverWait(navegador, 15).until(
EC.presence_of_element_located((By.XPATH, SELETOR_CORPO))
)
time.sleep(1)
linhas = corpo_tabela.find_elements(By.TAG_NAME, "tr")
if linhas:
primeira_linha = linhas[0].find_elements(By.TAG_NAME, "td")
if len(primeira_linha) >= 2:
ultimo_codigo_primeira_linha = primeira_linha[1].text.strip()
for linha in linhas:
if len(resultados) >= LIMITE_POR_MERCADO:
break
colunas = linha.find_elements(By.TAG_NAME, "td")
if len(colunas) >= 14:
registro = (
nome,
colunas[1].text.strip(),
colunas[2].text.strip(),
converter_valor(colunas[3].text.strip()),
converter_valor(colunas[4].text.strip().replace('%', '')),
converter_valor(colunas[5].text.strip()),
formatar_volume(colunas[6].text.strip()),
formatar_volume(colunas[7].text.strip()),
converter_valor(colunas[8].text.strip().replace('%', '')),
converter_valor(colunas[9].text.strip()),
converter_valor(colunas[10].text.strip()),
converter_valor(colunas[11].text.strip()),
converter_valor(colunas[12].text.strip()),
datetime.now()
)
resultados.append(registro)
if len(resultados) >= LIMITE_POR_MERCADO:
break
pagina_atual += 1
if pagina_atual <= total_paginas:
try:
btn_proximo = WebDriverWait(navegador, 10).until(
EC.element_to_be_clickable(
(By.XPATH, f"{SELETOR_PAGINACAO}/a[@title='下一页' and text()='>']")
)
)
navegador.execute_script("arguments[0].click();", btn_proximo)
time.sleep(3)
except (TimeoutException, NoSuchElementException):
break
except StaleElementReferenceException:
print("Elemento obsoleto detectado, interrompendo coleta deste mercado")
break
except TimeoutException:
print(f"Timeout na página {pagina_atual}")
break
print(f">>> {nome}: {len(resultados)} registros coletados")
return resultados
Os dados foram armazenados em tabelas MySQL separadas por mercado: Ações HS, Ações SH e Ações SZ, contendo informações como código, nome, cotação, variação, volume e timestamps de coleta.
Dificuldades Encontradas
- Incompatibilidade entre versão do WebDriver e navegador instalado, resolvida mediante download da versão correta do driver
- Identificação precisa do seletor para o botão de paginação, que exigia combinação de atributo title e conteúdo textual específico
Extração de Dados de Cursos Educacionais Online
Objetivo
Desenvolver um coletor para capturar informações de cursos de uma plataforma educacional chinesa, incluindo identificador, nome, instituição, docentes, equipe, participantes, progresso e descrição do curso.
Bibliotecsa Utilizadas
- Selenium WebDriver: para automação do navegador e interação com elementos dinâmicos
- pymysql: driver para comunicação com MySQL
- re: biblioteca de expressões regulares para extração de identificadores de URLs
sql = """
INSERT INTO cursos
(identificador, titulo, instituicao, docente_principal, equipe,
total_alunos, andamento, descricao)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
info_curso['curso_id'],
info_curso['titulo'],
info_curso['instituicao'],
info_curso['docente'],
info_curso['equipe'],
info_curso['alunos'],
info_curso['andamento'],
info_curso['descricao']
))
conexao.commit()
except Exception as e:
conexao.rollback()
print(f"Erro no armazenamento: {e}")
finally:
conexao.close()
</details>### Estratégia de Coleta
A abordagem adotada consiste em acessar o painel pessoal do usuário autenticado, onde os cursos inscritos são listados de forma organizada. Isso evita interferências causadas por atualizações dinâmicas da página principal.
Para localizar os links dos cursos, utiliza-se o XPath `//*[contains(@class, 'menu')]//a`, que captura elementos de navegação dentro da estrutura de menu da plataforma.
<details><summary>Clique para visualizar o código de coleta de cursos</summary>```
def coletar_cursos_painel():
opcoes = webdriver.ChromeOptions()
opcoes.add_argument('--disable-blink-features=AutomationControlled')
opcoes.add_experimental_option('excludeSwitches', ['enable-logging'])
navegador = webdriver.Chrome(options=opcoes)
navegador.maximize_window()
try:
navegador.get("https://www.icourse163.org/home.htm")
espera = WebDriverWait(navegador, 300)
# Aguardar login manual do usuário
print("Aguardando autenticação do usuário...")
espera.until(
lambda d: "home.htm" in d.current_url and "login" not in d.current_url
)
time.sleep(30)
# Rolar página para renderizar todos os cards
for _ in range(3):
navegador.execute_script("window.scrollBy(0, 500);")
time.sleep(0.5)
# Extrair URLs dos cursos a partir do menu
urls_cursos = []
elementos_menu = navegador.find_elements(
By.XPATH, "//*[contains(@class, 'menu')]//a"
)
for elem in elementos_menu:
href = elem.get_attribute("href")
url_limpa = limpar_url(href)
if url_limpa and url_limpa not in urls_cursos:
urls_cursos.append(url_limpa)
urls_finais = urls_cursos[:5]
# Funções auxiliares para extração segura
def obter_texto(seletores):
for sel in seletores:
try:
el = navegador.find_element(By.CSS_SELECTOR, sel)
if el.text.strip():
return el.text.strip()
except:
continue
return "Não disponível"
def obter_atributo(seletores, atributo):
for sel in seletores:
try:
return navegador.find_element(
By.CSS_SELECTOR, sel
).get_attribute(atributo)
except:
continue
return "Não disponível"
# Visitar cada página de curso e extrair informações
for idx, link_curso in enumerate(urls_finais):
navegador.get(link_curso)
WebDriverWait(navegador, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "course-title"))
)
time.sleep(1)
dados = {
'curso_id': link_curso.split("/")[-1],
'titulo': obter_texto([".course-title", "h1"]),
'instituicao': obter_texto([
".school-box .school-name",
".m-teachers a"
]),
'docente': obter_atributo([
".m-teachers_teacher-list .u-teacher-name"
], "title"),
'equipe': obter_atributo([
".m-teachers_teacher-list .u-teacher-name"
], "title"),
'alunos': obter_texto([
".course-enroll-info_course-info_term-progress .count",
".course-enroll-info_course-info_term-num"
]),
'andamento': obter_texto([
".course-enroll-info_course-info_term-info_term-time"
]),
'descricao': obter_texto([
"#j-rectxt2", ".m-infotxt", ".u-course-intro"
])[:800]
}
armazenar_curso(dados)
time.sleep(2)
except Exception as erro_geral:
print(f"Erro no processo principal: {erro_geral}")
finally:
navegador.quit()
A coleta semi-automatizada mostrou-se eficiente, com período de espera de 30 segundos para autenticação manual seguida de extração automatizada dos dados do painel pessoal. A utilização de múltiplos seletores CSS para cada campo garante maior robustez contra mudanças na estrutura da página.
Processamento de Dados em Tempo Real com Flume e Kafka
Infraestrutura Utilizada
Configuração de um pipeline de ingestão de logs utilizando serviços de big data da nuvem Huawei, envolvendo coleta via Flume e transporte via Kafka.
Tarefas Executadas
1. Ativação do Serviço MapReduce
Provisionamento do cluster de processamento distribuído, configurando os recursos computacionais necessários para as operações de análise em tempo real.
2. Geração de Dados de Teste
Criação de script Python para produção de dados simulados:
# Script localizado em /opt/client/autodatagen.py
# Execução: python autodatagen.py "/tmp/flume_spooldir/test.txt" 100
3. Configuração do Apache Kafka
Preparação do ambiente e criação do tópico de mensagens:
# Carregar variáveis de ambiente
source /opt/Bigdata/client/bigdata_env
# Criar tópico para recebimento de dados
kafka-topics.sh --create --topic fludesc \
--partitions 1 --replication-factor 1 \
--bootstrap-server 192.174.2.105:9092
# Verificar tópicos existentes
kafka-topics.sh --list --bootstrap-server 192.174.2.105:9092
4. Instalação e Configuração do Cliente Flume
Instalação do agente Flume e reinicialização do serviço:
cd /opt/FlumeClient/fusioninsight-flume-1.11.0
sh bin/flume-manage.sh restart
Configuração do arquivo conf/properties.properties para definir fonte, canal e destino da coleta de dados.
5. Verificação do Pipeline Completo
Execução do script gerador de dados seguida da verificação do consumo no terminal do Flume, confirmando o fluxo end-to-end de ingestão e transporte de dados.
Desafios Técnicos
- Complexidade na configuração dos serviços distribuídos e identificação de erros em etapas intermediárias
- Dependência entre componentes do pipeline exigindo ordem específica de inicialização e configuração
Código-fonte completo disponível no repositório do projeto.