Como usar DAGs no estilo de fluxo de ar para fluxos de trabalho de ciência de dados altamente eficazes

Norm Niemer Seguir 24 de julho · 4 min ler

Airflow e Luigi são ótimos para engenharia de dados, mas não otimizados para ciência de dados. traz DAGs ao estilo de fluxo de ar para a ciência de dados.

Fluxos de trabalho de ciência de dados são DAGs

Os fluxos de trabalho de ciência de dados geralmente se parecem com isso.

Esse fluxo de trabalho é semelhante aos fluxos de trabalho de engenharia de dados. Envolve encadear tarefas parametrizadas que passam múltiplas entradas e saídas entre si. Veja por que passar dados entre funções ou nomes de arquivos / bancos de dados de hardcoding sem explicitamente definir dependências de tarefas não é uma boa maneira de escrever código de ciência de dados.

 # código de ciência de dados inválido 
def process_data (data, do_preprocess):
data = do_stuff (data, do_preprocess)
data.to_pickle ('data.pkl')
data = pd.read_csv ('data.csv')
process_data (data, True)
df_train = pd.read_pickle (df_train)
model = sklearn.svm.SVC ()
model.fit (df_train.iloc [:,: - 1], df_train ['y'])

P & D versus fluxos de trabalho de dados de produção

Usar o e o é um grande passo a frente da criação de código funcional para gerenciar fluxos de trabalho de dados. Embora os fluxos de trabalho de engenharia de dados e ciência de dados sejam semelhantes, eles têm a mesma nota. Ambas as bibliotecas foram projetadas para serem usadas por engenheiros de dados em configurações de produção nas quais o foco é:

  • certificando-se de que tudo está correndo bem no tempo
  • agendamento e coordenação
  • recuperando-se de falhas
  • qualidade de dados

Em contraste, o foco no fluxo de trabalho de pesquisa e desenvolvimento é:

  • gerando insights
  • velocidade de prototipagem
  • avaliar a potência preditiva com diferentes modelos e parâmetros
  • visualizando saída

Como resultado, o fluxo de trabalho de pesquisa e desenvolvimento:

  • é menos bem definido
  • envolve tentativa e erro
  • requer redefinição freqüente de tarefas e saída como modelos, parâmetros e alteração de dados
  • tira a saída do engenheiro de dados

Problemas com o fluxo de ar / luigi nas configurações de pesquisa e desenvolvimento

Como as duas bibliotecas são otimizadas para configurações de produção de engenharia de dados, o UX para uma configuração de pesquisa e desenvolvimento de ciência de dados não é ótimo:

  • Código WET para leitura / gravação de dados
  • Controlar manualmente nomes de arquivos ou nomes de tabelas de bancos de dados onde os dados são salvos
  • Inconveniente para redefinir tarefas conforme os modelos, parâmetros e dados são alterados
  • Inconveniente para acompanhar os resultados do modelo com diferentes configurações de parâmetros

Mantendo manualmente o controle de nomes de arquivos em fluxos de trabalho de dados complexos … Não escalável.

 # exemplo de armazenamento manual em cache de quadros de dados e controle de arquivos 
cfg_fpath_cc_base = cfg_fpath_base + 'vendor /'
cfg_fpath_cc_raw = cfg_fpath_cc_base + 'df_cc_raw.pkl'
cfg_fpath_cc_raw_recent2 = cfg_fpath_cc_base + 'df_cc_raw_recent2.pkl'
cfg_fpath_cc_yoy = cfg_fpath_cc_base + 'df_cc_yoy.pkl'
cfg_fpath_cc_yoy_bbg = cfg_fpath_cc_base + 'df_cc_yoy_bbg.pkl'
cfg_fpath_cc_yoy_fds = cfg_fpath_cc_base + 'df_cc_yoy_fds.pkl'
cfg_fpath_cc_var_fds = cfg_fpath_cc_base + 'df_cc_var_fds.pkl'
cfg_fpath_cc_yoy_recent2 = cfg_fpath_cc_base + 'df_cc_yoy_recent2.pkl'
cfg_fpath_cc_actual = cfg_fpath_cc_base + 'df_cc_sales_actual.pkl'
cfg_fpath_cc_monthly = cfg_fpath_cc_base + 'df_cc_monthly.pkl'
cfg_fpath_cc_yoy_cs2 = 'data / processed / df_cc_yoy_cs2.pq' # dados de compradores consistentes para a nova metodologia de 2018

Como o d6tflow é diferente do fluxo de ar / luigi

O d6tflow é otimizado para fluxos de trabalho de pesquisa e desenvolvimento de ciência de dados. Aqui estão os benefícios de usar o d6tflow na ciência de dados.

Benefício: as tarefas têm dados de entrada e saída

Em vez de carregar e salvar manualmente os dados, isso é terceirizado para a biblioteca. Isso é melhor e reduz a manutenção porque a localização dos dados de entrada / saída pode mudar sem ter que reescrever o código. Também torna mais fácil para o engenheiro de dados entregar dados ao cientista de dados.

 class TaskProcess (d6tflow.tasks.TaskPqPandas): # define o formato de saída que o def requer (self): 
return TaskGetData () # define dependência
def run (self):
data = self.input (). load () # carrega dados de entrada
data = do_stuff (data) # process data
self.save (data) # salva os dados de saída

Benefício: facilmente invalidar tarefas

Cenários comuns de invalidação são implementados. Isso aumenta a velocidade de prototipagem à medida que você altera o código e os dados durante a tentativa e erro.

 # force execution incluindo tarefas downstream 
d6tflow.run (TaskTrain (), force = TaskGetData ())
# redefinir tarefa única
TaskGetData (). Invalidate ()
# redefinir todas as tarefas posteriores
d6tflow.invalidate_downstream (TaskGetData (), TaskTrain ())
# redefine todas as tarefas de envio
d6tflow.invalidate_upstream (TaskTrain ())

Benefício: Treine facilmente modelos usando diferentes parâmetros

Você pode executar de maneira inteligente o fluxo de trabalho depois de alterar um parâmetro. Os parâmetros são transmitidos da tarefa de destino para a tarefa de recebimento de dados relevante. Assim, você não precisa mais controlar manualmente quais tarefas atualizar, aumentando a velocidade de prototipagem e reduzindo os erros.

 d6tflow.preview (TaskTrain (do_preprocess = False)) '' ' 
Task? - [TaskTrain - {'do_preprocess': 'False'} (PENDENTE)]
--? - [TaskPreprocess - {'do_preprocess': 'False'} (PENDENTE)]
--? - [TaskGetData- {} (COMPLETE)] => isso não muda e não precisa executar novamente
'' '

Benefício: compare facilmente os modelos

Diferentes modelos que foram treinados com diferentes parâmetros podem ser facilmente carregados e comparados.

 df_train1 = TaskPreprocess (). output (). load () 
model1 = TaskTrain (). output (). load ()
print (sklearn.metrics.accuracy_score (df_train1 ['y'], model1.predict (df_train1.iloc [:,: - 1])))
df_train2 = TaskPreprocess (do_preprocess = False) .output (). load ()
model2 = TaskTrain (do_preprocess = False) .output (). load ()
print (sklearn.metrics.accuracy_score (df_train2 ['y'], model2.predict (df_train2.iloc [:,: - 1])))

Acelere o engenheiro de dados para o handover de cientista de dados

Para compartilhar rapidamente os arquivos de saída do fluxo de trabalho, você pode usar o . Consulte .

 importar d6tflow.pipe d6tflow.pipe.init (api, 'nome do canal') # salvar saída de fluxo 
pipe = d6tflow.pipes.get_pipe ()
pipe.pull ()
class Task2 (d6tflow.tasks.TaskPqPandas): def requer (auto):
return Task1 () # define dependência
def run (self):
data = self.input (). load () # carrega dados do engenheiro de dados

Alternativamente, você pode salvar as saídas em um banco de dados usando o .

 d6tflow2.db.init ('postgresql + psycopg2: //usr: pwd @ localhost / db', 'schema_name') classe Task1 (d6tflow2.tasks.TaskSQLPandas): def run (self): 
df = pd.DataFrame ()
self.save (df)

Por fim, o cientista de dados pode herdar tarefas que o engenheiro de dados gravou para carregar rapidamente os dados de origem.

 import tasks_factors # tarefas de importação escritas pelo engenheiro de dados 
import utils
class Task1 (tasks_factors.Task1):
external = True # confia no engenheiro de dados para executar
def run (self):
data = self.input (). load () # carrega dados do engenheiro de dados

Início rápido do d6tflow

Aqui está um exemplo completo de como usar o d6tflow para um fluxo de trabalho ML

Modelo para projetos ML escaláveis

Um modelo de código d6tflow para projetos reais está disponível em

  • Entradas e saídas de múltiplas tarefas
  • Herança de Parâmetro
  • Tarefas modularizadas, executar e viz

Texto original em inglês.