Desmistificando o Apache Airflow: Orquestração de Fluxo de Dados para Análise de Ações

Wander Buraslan
6 min readMay 18, 2024

--

Planejando e Criando Fluxos de Orquestração de Dados da Petrobras para Análise de Ações com Apache Airflow

A Petrobras perdeu R$ 34 bilhões em valor de mercado nesta quarta-feira (15), após o anúncio de que Jean Paul Prates foi demitido da presidência da empresa. A petroleira encerrou o pregão com um valor de mercado de R$ 509 bilhões, contra R$ 543 bilhões de ontem. G1 — Economia, maio de 2024.

Durante a última semana, vi algumas notícias relacionadas a demissão de Jean Paul Prates da presidência da Petrobras (PETR3), e como investidor da PETR3 comecei a ficar curioso sobre como eu poderia analisar alguns dados da Petrobras, especificamente, queria acompanhar diáriamente o cenário da Petrobras após a mudança de presidencia.

Para resolver isso, decidi utilizar a API da Alpha Vantage para obter os dados.

Este conjunto de APIs oferece dados de patrimônio global em quatro resoluções temporais distintas: diária, semanal, mensal e intradiária, com mais de 20 anos de histórico. Inclui também um endpoint leve para cotações de tickers e diversas funções utilitárias, como pesquisa de tickers e verificação do status de abertura/fechamento do mercado, proporcionando maior conveniência. Alpha Vantage

Pois bem, como podemos ver, aparentemente meu problema estaria resolvido a partir do momento que eu estivesse com a API em mãos, pois assim basta trazer os dados que desejo.

Bom… aí que está o problema, eu simplemente odiei ter que executar isso no VS Code diariamente. E foi assim que o Apache Airflow apareceu.

POR QUE UTILIZAR O APACHE AIRFLOW?

Airflow™ é uma plataforma de orquestração de fluxo de trabalho em lote. A estrutura Airflow contém operadores para conexão com muitas tecnologias e é facilmente extensível para conexão com uma nova tecnologia. Se seus fluxos de trabalho tiverem início e fim claros e forem executados em intervalos regulares, eles poderão ser programados como um Airflow DAG.

O Airflow é responsável por agendar, gerenciar e monitorar scripts. Uma das funcionalidades que profissionais da área de dados atribuem ao Airflow está dentro dessas três responsabilidades mencionadas. Dessa forma, realizar processos de Extração, Transformação e Leitura de Dados acaba sendo uma tarefa muito interessante dentro do Airflow.

O QUE FAREMOS AQUI?

Já fiz uma introdução bem superficial sobre o que é o Apache Airflow, e agora precisamos entender como organizar isso tudo, inicialmente a API será responsabilizada por trazer os dados para o nosso Pipeline (trajeto) através de um request, e logo após iremos desenvolver uma função de extração e tratamento de dados (aqui irei colocar em um único módulo).

MÃOS A OBRA!

CARREGAR DADOS

Na extração de dados, um aspecto importante a considerar ao analisar a documentação da API é o nosso pedido (request). A mágica acontece no pedido quando você deseja acessar dados de uma ação específica. Aqui, por exemplo, vamos nos concentrar nas cotações da Petrobras. Veja o exemplo abaixo:

#Chamamos o nome da ação
symbol = "PETR3.SA"

#Buscamos o dado aplicando um metodo request (Get - busca)
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&interval=5min&apikey=your_api_key"
dados_api = requests.get(url)
data = dados_api.json()

Os dados JSON da API vão ter tudo o que a gente precisa, mas pro jeito que eu tô pensando para o projeto, o melhor é a gente pegar as informações que a gente deseja e criar um DataFrame do Pandas pra guardar esses dados de um jeito mais fácil de trabalhar.

def data_extract_api():
#Chamamos o nome da ação que desejamos
symbol = "PETR3.SA"

#Buscamos o dado aplicando um metodo Get (busca)
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&interval=5min&apikey=your_api_key"
dados_api = requests.get(url)
data = dados_api.json()

#Guardamos o atributo do nome da ação e data de atualização da extração do dado
symbol = data['Meta Data']['2. Symbol']
last_refreshed = data['Meta Data']['3. Last Refreshed']

#Separamos do Json o parâmetro de Data referente ao dicionário da API
time_series_data = data['Time Series (Daily)']
formatted_data = []

#Aqui organizaremos as informações
#Chamamos os parâmetros que desejamos do Json
#Realizamos um processo de adição a variavel list
for date, values in time_series_data.items():
row = {'Date': date, 'Symbol': symbol, 'Last Refreshed': last_refreshed}
row.update({key.split('. ')[1]: value for key, value in values.items()})
formatted_data.append(row)

return pd.DataFrame(data)

O que precisamos definir agora são alguns conceitos vinculados à Pipeline de Dados dentro do Apache Airflow. Entre esses conceitos estão os DAGs, Scheduler, Queue, entre outros.

Não se preocupe, o objetivo aqui é realmente apresentar esses conceitos e criar um case interessante.

CONCEITOS GERAIS

Inicialmente, temos o conceito de Directed Acyclic Graph (DAG). Uma DAG pode ser vista como uma lista de tarefas que precisamos executar em ordem, como se fosse um checklist. Tecnicamente, uma DAG é um grafo acíclico dirigido, composto por vértices que representam nossas tarefas e arestas que representam as dependências e relações entre elas.

Exemplificação de DAG em um grafo acíclico dirigido

Como mencionado anteriormente, temos as Tasks, que são as atividades a serem realizadas. Elas estão dentro de um conjunto maior, as nossas DAGs. O mais legal disso tudo é que realmente se assemelha a um checklist; não é à toa que chamamos de fluxo, pois definimos a ordem em que as tarefas serão realizadas, tecnicamente falando, instanciadas.

Por fim, temos o Schedule, que é nosso agendador, nosso segurança que fica monitorando as DAGs. A responsabilidade dele é fazer com que as atividades sejam executadas no momento que desejamos.

⚠️ Atenção!
Lembrando que toda essa explicação está disponível na documentação do Apache Airflow, vale muito a pena ir lá, corre!!!

Agora, temos o essencial para entender a aplicação do fluxo de dados para obter informações sobre as ações que desejamos.

FLUXO DE TRABALHO COMO CÓDIGO

A principal característica dos fluxos de trabalho do Airflow é que todos os fluxos de trabalho são definidos em código Python. “Fluxos de trabalho como código” serve a vários propósitos: Dinâmico, Extensível e Flexível.

No código abaixo, definimos alguns parâmetros relacionados ao que chamamos de gerente da DAG.

Adicionamos um intervalo de datas e emails para receber notificações sobre erros e execuções das nossas tarefas.

from airflow.utils.dates import days_ago

arguments = {
'start_date':days_ago(0),
'email':'emailteste@gmail.com',
'email_on_failure':False,
'email_on_retry':False,
'retries':1,
'retry_delay':timedelta(minutes=5),
}

De acordo com os conceitos de DAG, a seguir temos uma implementação superficial de como podemos construir a etapa de monitoramento e gerenciamento de projetos a partir dela.

Lembrando que a DAG é o nosso conjunto de atividades. Ela terá nome, gerente, descrição e horário, como podemos ver no código a seguir.

from airflow import DAG

stock_dag = DAG(
'extractDataStock',
default_args=arguments,
description='Extract Stock Prices yFinance',
schedule_interval = '20 00 * * *',
catchup=False,
tags=['extract','stock_price','etl']
)

Por fim, é muito importante explicar os operadores. Assim como o gerente supervisiona as atividades, os operadores são responsáveis por definir exatamente o que cada atividade irá realizar.

Estou sendo bem superficial na explicação, mas, por exemplo, os operadores são excelentes em termos de orientação ao reuso, pois podem ser reutilizados em outros momentos.

stock_etl = PythonOperator(
task_id='transient',
python_callable=data_extract_api,
dag=stock_dag,
)

Levando em consideração que você possui o Apache Airflow instalado em seu ambiente, basta salvar o código em um arquivo .py, colar na pasta de dags e, após a execução manual no Airflow, verificar se o log mostra os dados históricos da PETR3.

CONCLUSÃO

O objetivo principal deste artigo é mostrar como uma aplicação simples gera um entendimento sobre uma ferramenta muito utilizada em processos de engenharia de dados.

Com base nos problemas relatados, buscamos entender como integrar uma API que gera dados em JSON e como orientar esses dados para um fluxo de extração e tratamento de dados. Porém, mesmo tenhamos uma API pronta para nos disponibilizar os dados, o Apache Airflow torna-se um personagem principal nisso tudo.

O Apache Airflow desempenha um papel interessante como gerenciador do fluxo como um todo. Ao consultar a documentação, por exemplo, você perceberá que o Airflow, além do que foi demonstrado aqui, oferece uma variedade de recursos para monitoramento das DAGs desenvolvidas.

No entanto, ao entendermos as finalidades para as quais podemos aplicar tudo isso, a partir de um fluxo de dados completamente preparado, poderemos utilizá-lo em dashboards ou análises exploratórias, bem como orientar outras aplicações.

REFRÊNCIAS

--

--

Wander Buraslan

Estudante de Engenharia de Software em busca de desenvolvimento profissional, com o objetivo de analisar dados e compartilhar resultado com as pessoas.