Norm Niemer Seguir 24 de julho · 4 min ler
Airflow e Luigi são ótimos para engenharia de dados, mas não otimizados para ciência de dados. O d6tflow traz DAGs ao estilo de fluxo de ar para a ciência de dados.
Fluxos de trabalho de ciência de dados são DAGs
Os fluxos de trabalho de ciência de dados geralmente se parecem com isso.
Esse fluxo de trabalho é semelhante aos fluxos de trabalho de engenharia de dados. Envolve encadear tarefas parametrizadas que passam múltiplas entradas e saídas entre si. Veja 4 razões pelas quais o seu código de aprendizado de máquina é provavelmente ruim, por que passar dados entre funções ou nomes de arquivos / bancos de dados de hardcoding sem explicitamente definir dependências de tarefas não é uma boa maneira de escrever código de ciência de dados.
# código de ciência de dados inválido
def process_data (data, do_preprocess):
data = do_stuff (data, do_preprocess)
data.to_pickle ('data.pkl') data = pd.read_csv ('data.csv')
process_data (data, True)
df_train = pd.read_pickle (df_train)
model = sklearn.svm.SVC ()
model.fit (df_train.iloc [:,: - 1], df_train ['y'])
P & D versus fluxos de trabalho de dados de produção
Usar o fluxo de ar e o luigi é um grande passo a frente da criação de código funcional para gerenciar fluxos de trabalho de dados. Embora os fluxos de trabalho de engenharia de dados e ciência de dados sejam semelhantes, eles têm a mesma nota. Ambas as bibliotecas foram projetadas para serem usadas por engenheiros de dados em configurações de produção nas quais o foco é:
- certificando-se de que tudo está correndo bem no tempo
- agendamento e coordenação
- recuperando-se de falhas
- qualidade de dados
Em contraste, o foco no fluxo de trabalho de pesquisa e desenvolvimento é:
- gerando insights
- velocidade de prototipagem
- avaliar a potência preditiva com diferentes modelos e parâmetros
- visualizando saída
Como resultado, o fluxo de trabalho de pesquisa e desenvolvimento:
- é menos bem definido
- envolve tentativa e erro
- requer redefinição freqüente de tarefas e saída como modelos, parâmetros e alteração de dados
- tira a saída do engenheiro de dados
Problemas com o fluxo de ar / luigi nas configurações de pesquisa e desenvolvimento
Como as duas bibliotecas são otimizadas para configurações de produção de engenharia de dados, o UX para uma configuração de pesquisa e desenvolvimento de ciência de dados não é ótimo:
- Código WET para leitura / gravação de dados
- Controlar manualmente nomes de arquivos ou nomes de tabelas de bancos de dados onde os dados são salvos
- Inconveniente para redefinir tarefas conforme os modelos, parâmetros e dados são alterados
- Inconveniente para acompanhar os resultados do modelo com diferentes configurações de parâmetros
Mantendo manualmente o controle de nomes de arquivos em fluxos de trabalho de dados complexos … Não escalável.
# exemplo de armazenamento manual em cache de quadros de dados e controle de arquivos
cfg_fpath_cc_base = cfg_fpath_base + 'vendor /'
cfg_fpath_cc_raw = cfg_fpath_cc_base + 'df_cc_raw.pkl'
cfg_fpath_cc_raw_recent2 = cfg_fpath_cc_base + 'df_cc_raw_recent2.pkl'
cfg_fpath_cc_yoy = cfg_fpath_cc_base + 'df_cc_yoy.pkl'
cfg_fpath_cc_yoy_bbg = cfg_fpath_cc_base + 'df_cc_yoy_bbg.pkl'
cfg_fpath_cc_yoy_fds = cfg_fpath_cc_base + 'df_cc_yoy_fds.pkl'
cfg_fpath_cc_var_fds = cfg_fpath_cc_base + 'df_cc_var_fds.pkl'
cfg_fpath_cc_yoy_recent2 = cfg_fpath_cc_base + 'df_cc_yoy_recent2.pkl'
cfg_fpath_cc_actual = cfg_fpath_cc_base + 'df_cc_sales_actual.pkl'
cfg_fpath_cc_monthly = cfg_fpath_cc_base + 'df_cc_monthly.pkl'
cfg_fpath_cc_yoy_cs2 = 'data / processed / df_cc_yoy_cs2.pq' # dados de compradores consistentes para a nova metodologia de 2018
Como o d6tflow é diferente do fluxo de ar / luigi
O d6tflow é otimizado para fluxos de trabalho de pesquisa e desenvolvimento de ciência de dados. Aqui estão os benefícios de usar o d6tflow na ciência de dados.
Benefício: as tarefas têm dados de entrada e saída
Em vez de carregar e salvar manualmente os dados, isso é terceirizado para a biblioteca. Isso é melhor e reduz a manutenção porque a localização dos dados de entrada / saída pode mudar sem ter que reescrever o código. Também torna mais fácil para o engenheiro de dados entregar dados ao cientista de dados.
class TaskProcess (d6tflow.tasks.TaskPqPandas): # define o formato de saída que o def requer (self):
return TaskGetData () # define dependência def run (self):
data = self.input (). load () # carrega dados de entrada
data = do_stuff (data) # process data
self.save (data) # salva os dados de saída
Benefício: facilmente invalidar tarefas
Cenários comuns de invalidação são implementados. Isso aumenta a velocidade de prototipagem à medida que você altera o código e os dados durante a tentativa e erro.
# force execution incluindo tarefas downstream
d6tflow.run (TaskTrain (), force = TaskGetData ()) # redefinir tarefa única
TaskGetData (). Invalidate () # redefinir todas as tarefas posteriores
d6tflow.invalidate_downstream (TaskGetData (), TaskTrain ()) # redefine todas as tarefas de envio
d6tflow.invalidate_upstream (TaskTrain ())
Benefício: Treine facilmente modelos usando diferentes parâmetros
Você pode executar de maneira inteligente o fluxo de trabalho depois de alterar um parâmetro. Os parâmetros são transmitidos da tarefa de destino para a tarefa de recebimento de dados relevante. Assim, você não precisa mais controlar manualmente quais tarefas atualizar, aumentando a velocidade de prototipagem e reduzindo os erros.
d6tflow.preview (TaskTrain (do_preprocess = False)) '' '
Task? - [TaskTrain - {'do_preprocess': 'False'} (PENDENTE)]
--? - [TaskPreprocess - {'do_preprocess': 'False'} (PENDENTE)]
--? - [TaskGetData- {} (COMPLETE)] => isso não muda e não precisa executar novamente
'' '
Benefício: compare facilmente os modelos
Diferentes modelos que foram treinados com diferentes parâmetros podem ser facilmente carregados e comparados.
df_train1 = TaskPreprocess (). output (). load ()
model1 = TaskTrain (). output (). load ()
print (sklearn.metrics.accuracy_score (df_train1 ['y'], model1.predict (df_train1.iloc [:,: - 1]))) df_train2 = TaskPreprocess (do_preprocess = False) .output (). load ()
model2 = TaskTrain (do_preprocess = False) .output (). load ()
print (sklearn.metrics.accuracy_score (df_train2 ['y'], model2.predict (df_train2.iloc [:,: - 1])))
Acelere o engenheiro de dados para o handover de cientista de dados
Para compartilhar rapidamente os arquivos de saída do fluxo de trabalho, você pode usar o d6tpipe . Consulte Compartilhando Fluxos de Trabalho e Saídas .
importar d6tflow.pipe d6tflow.pipe.init (api, 'nome do canal') # salvar saída de fluxo
pipe = d6tflow.pipes.get_pipe ()
pipe.pull () class Task2 (d6tflow.tasks.TaskPqPandas): def requer (auto):
return Task1 () # define dependência def run (self):
data = self.input (). load () # carrega dados do engenheiro de dados
Alternativamente, você pode salvar as saídas em um banco de dados usando o d6tflow premium .
d6tflow2.db.init ('postgresql + psycopg2: //usr: pwd @ localhost / db', 'schema_name') classe Task1 (d6tflow2.tasks.TaskSQLPandas): def run (self):
df = pd.DataFrame ()
self.save (df)
Por fim, o cientista de dados pode herdar tarefas que o engenheiro de dados gravou para carregar rapidamente os dados de origem.
import tasks_factors # tarefas de importação escritas pelo engenheiro de dados
import utils class Task1 (tasks_factors.Task1):
external = True # confia no engenheiro de dados para executar def run (self):
data = self.input (). load () # carrega dados do engenheiro de dados
Início rápido do d6tflow
Aqui está um exemplo completo de como usar o d6tflow para um fluxo de trabalho ML https://github.com/d6t/d6tflow#example-output
Modelo para projetos ML escaláveis
Um modelo de código d6tflow para projetos reais está disponível em https://github.com/d6t/d6tflow-template
- Entradas e saídas de múltiplas tarefas
- Herança de Parâmetro
- Tarefas modularizadas, executar e viz