Pré-processamento de dados distribuídos usando Dask, Amazon ECS e Python (Parte 2)

Usando o Dask para EDA e Hyperparameters Optimization (HPO)

Will badr Blocked Unblock Seguir Seguindo 3 de janeiro

Na Parte 1 desta série, expliquei como construir um cluster sem servidor do planejador e dos operadores do DASK no AWS Fargate . Escalar o número de trabalhadores para cima e para baixo é bastante simples. Você pode conseguir isso executando os comandos abaixo da AWS CLI:

 bash ~ # aws ecs serviço de atualização - serviço Dask-Workers - desejado-contagem 10 - cluster Fargate-Dask-Cluster> / dev / null 
 bash ~ # aws ecs serviço de atualização - serviço Dask-Scheduler - desejado-count 1 - cluster Fargate-Dask-Cluster> / dev / null 

Agora que ampliei o cluster Fargate sem servidor, vamos tentar uma análise exploratória de dados (EDA). Estou usando o famoso conjunto de dados NY Yellow Taxi 2017.

Primeiro, vamos carregar nosso dataframe na memória do cluster. Para fazer isso, basta importar a classe do dataframe da biblioteca dask e, em seguida, carregar os arquivos usando a função read_csv () :

A função client.persist () funciona mantendo os dados na memória de forma assíncrona e, em seguida, retorna imediatamente. Ele enviará os gráficos da tarefa ao cluster e retornará um objeto Futuro . Podemos, então, executar consultas rápidas no dataframe de saída. Vamos executar algumas análises:

1.1 Análise de Dataframe

Vamos obter algumas estatísticas descritivas no dataframe. O código deve ser exatamente o mesmo, exceto que, com os dataframes do Dask, você precisa adicionar uma função compute () para obter os resultados imediatamente no notebook. Como você pode ver, demorou cerca de 7 segundos para fazer alguns cálculos complexos no dataframe, como: (Calcule quantiles, mean, count, min, max e desvio padrão).

1.2 Conte a distância total da viagem e conte para cada fornecedor:

Demorou cerca de 5,5 segundos para terminar esta afirmação groupby () . Como você pode ver, o fornecedor 2 fez muito mais viagens que o fornecedor 1 e direcionou muito mais distância.

1.3 Contar os valores ausentes de cada recurso:

Nenhum valor ausente no conjunto de dados.

1.4 Exibir a correlação entre os recursos:

Existem algumas correlações claras, como a alta correlação positiva entre fare_amount e trip_distance, tip_amount e fare_amount. Preciso remover o recurso altamente correlacionado (total_amount) para evitar alguns problemas, como problemas de multicolinearidade .

Vamos tentar o 1.2 novamente, mas desta vez, eu vou persistir os dados na memória e vou deixar funcionar em segundo plano enquanto eu faço outras tarefas. Eu também imprimo a barra de progresso para ver quando a tarefa é finalizada.

Barra de progresso da função persistente

Observe que o prompt retorna imediatamente quando os dados estão sendo carregados / calculados em segundo plano usando os trabalhadores do cluster.

Quando a barra de progresso estiver concluída (toda verde), você poderá consultar a saída dos dados rapidamente. Observe que o prompt retorna imediatamente quando os dados estão sendo carregados / calculados em segundo plano usando os trabalhadores do cluster.

Usando o Dask for Machine Learning:

Uma das tarefas mais dispendiosas em computação no aprendizado de máquina é a otimização de hiperparâmetros (HPO). HPO é uma técnica usada para ajustar os parâmetros ML que não são aprendidos durante o processo de treinamento , como taxa de aprendizado, otimizador, regularização ou número de camadas ocultas em uma rede neural. Ele funciona explorando o espaço de busca de um intervalo pré-definido para cada um dos hiperparâmetros que você vai ajustar. Existem muitas bibliotecas e técnicas para executar o processo HPO, mas neste artigo, vou me concentrar em ajustar os hiperparâmetros usando a técnica GridSearch e a biblioteca Scikit-Learn em Python.

HPO usando Grid Search:

A técnica Grid Search é uma pesquisa exaustiva ao especificar manualmente um subconjunto do espaço hiperparâmetro de um algoritmo de aprendizado. O algoritmo passará por cada combinação dos hiperparâmetros visando (maximizando / minimizando) as métricas objetivas (Precisão / Perda). Isso acabará dando os melhores resultados, mas quanto mais hiperparâmetros você ajustar, mais complexo o processo de ajuste será obtido.

Pesquisa de grade bidimensional para HPO

HPO no Dask Cluster usando Grid Search:

Como o processo HPO é computacionalmente caro, nós o rodamos em um Dask Cluster para aproveitar a escala e a elasticidade. O Scikit-learn usa uma biblioteca muito poderosa chamada joblib para paralelizar processos em vários núcleos de CPU.

O Joblib também fornece uma interface para outros sistemas paralelos para se tornar um mecanismo de execução. Podemos conseguir isso usando o gerenciador de contexto parallel_backend para executar com milhares de núcleos em um cluster:

Primeiro, precisamos importar o joblib do sklearn externals e registrar o Dask Distributed como um mecanismo de backend paralelo para o joblib.

 from sklearn.externals.joblib import _dask, parallel_backend 
de sklearn.utils import register_parallel_backend
from sklearn.externals.joblib import parallel_backend
 register_parallel_backend ('distributed', _ dask.DaskDistributedBackend) 

Então, precisamos executar a seguinte linha para começar a usar o cluster como um mecanismo de execução:

 com parallel_backend ('distributed', scheduler_host = 'dask-Scheduler.local-dask: 8786'): 
<Normal sklearn Code>

Então sua lógica de código do sklearn permanece exatamente a mesma sem alterações.

Aqui está o código completo para usar o GridSearch HPO para encontrar os melhores hiperparâmetros para um Classificador de Floresta Aleatório que classificará os dígitos manuscritos do conjunto de dados MNIST:

 dos conjuntos de dados de importação do sklearn 
de sklearn.model_selection import train_test_split
de sklearn.model_selection import GridSearchCV
de sklearn.ensemble import RandomForestClassifier
 import numpy como np 
a partir do tempo de importação
 from sklearn.externals.joblib import _dask, parallel_backend 
de sklearn.utils import register_parallel_backend
register_parallel_backend ('distributed', _dask.DaskDistributedBackend)
 # Carregando o conjunto de dados do Digits 
digits = datasets.load_digits ()
 # Para aplicar um classificador a esses dados, precisamos nivelar a imagem para 
# transformar os dados em uma matriz (amostras, recurso):
n_samples = len (digits.images)
X = digits.images.reshape ((n_samples, -1))
y = digits.target
 # Divide o conjunto de dados em duas partes iguais 
X_train, X_test, y_train, y_test = train_test_split (X, y, test_size = 0.3, estado_aleatório = 0)
 clf = RandomForestClassifier (n_estimators = 20) 
 # use uma grade completa sobre todos os parâmetros 
param_grid = {"max_depth": [3,4,5,6, None],
"max_features": [1, 3, 10, None],
"min_samples_split": [2, 3, 10],
"bootstrap": [Verdadeiro, Falso],
"criterion": ["gini", "entropy"]}
 # pesquisa de grade de corrida 
grid_search = GridSearchCV (clf, param_grid = param_grid, cv = 8, iid = True)
 start = time () 
com parallel_backend ('distributed', scheduler_host = 'dask-Scheduler.local-dask: 8786'):
grid_search.fit (X, y)
clf.fit (X, y)
 print ("GridSearchCV levou% .2f segundos para% d configurações de parâmetro candidatas". 
% (time () - iniciar, len (grid_search.cv_results _ ['params'])))
 results = grid_search.cv_results_ 

# Retorna o índice da melhor pontuação de validação
idx = np.flatnonzero (resultados ['rank_test_score'] == 1)
print ("A melhor pontuação é:" + str (resultados ['mean_test_score'] [idx [0]]))

#print os parâmetros para o melhor trabalho
print ("Parâmetros: {0}". format (resultados ['params'] [idx [0]]))

A saída do código acima

Demorou cerca de 40 segundos em um cluster de 10 nós para encontrar a melhor combinação de hiperparâmetros para o classificador, enquanto que em uma única máquina (mesmo com múltiplos núcleos / múltiplas CPUs), levaria muitos minutos sem o recurso de paralelização.

Resumo:

Pela minha experiência com o Dask, é uma ótima biblioteca pré-processar grandes conjuntos de dados de maneira distribuída. Se você é um fã de Pandas e Numpy e tem problemas para ajustar seus dados na memória, então o Dask deve definitivamente ser o caminho a percorrer. É absolutamente uma ótima solução quando se trata de tarefas de aprendizagem de máquina sensíveis ao tempo e ao custo, como HPO, imputação de dados, pré-processamento de dados e análise exploratória.

Texto original em inglês.