Pré-processamento de dados distribuídos usando Dask, Amazon ECS e Python (Parte 1)

Will badr Blocked Unblock Seguir Seguindo 18 de dezembro de 2018

A qualidade e a precisão dos modelos de aprendizado de máquina dependem de muitos fatores. Um dos fatores mais críticos é o pré-processamento do conjunto de dados antes de alimentá-lo no algoritmo de aprendizado de máquina que aprende com os dados. Portanto, é essencial que você forneça os dados corretos para o problema que deseja resolver. Por exemplo, alguns algoritmos não suportam valores nulos e devem ser transformados e processados antes do treinamento. Você também deve entender por que há valores ausentes e se há um padrão específico para esses valores e qual é sua influência.

Engenheiros de Dados e Cientistas de Dados freqüentemente usam ferramentas do ecossistema python, como Numpy e Pandas, para analisar, transformar e visualizar seus dados, projetados para serem bibliotecas de alto desempenho, intuitivas e eficientes. Realizar essas operações em um conjunto de dados pequeno de maneira rápida e escalonável não é um desafio, desde que o conjunto de dados possa caber na memória de uma única máquina. No entanto, se o conjunto de dados for grande demais e não couber em uma única máquina, os engenheiros de dados poderão ser forçados a reescrever seu código em ferramentas mais escaláveis, como Spark e SparkML, que podem ser suportadas computacionalmente por um grande cluster EMR.

Para superar esse problema, uso o Dask . O Dask aproveita a familiaridade das pessoas com bibliotecas famosas como o Pandas e você pode usá-lo para desenvolver código para processar dados de maneira escalável, paralela e distribuída.

Neste artigo, estou discutindo como criar um cluster sem servidor para pré-processar dados de maneira distribuída e paralela. Eu também estou usando o cluster distribuído do Dask que fornece paralelismo avançado para análise e processamento de dados. Para habilitar o desempenho em escala, otimizando o custo e mantendo a solução flexível, criarei um cluster sem servidor usando o AWS Fargate, que não fornece sobrecarga de gerenciamento de cluster e uma chamada de API para dimensionar o cluster para cima e para baixo e integrar o cluster com o Amazon SageMaker Jupyter caderno. (Ou qualquer outro IDE de preferência)

O que é o Dask Distributed?

Dask.distributed : é uma biblioteca leve e de código aberto para computação distribuída em Python. Também é um agendador de tarefas dinâmico, distribuído e gerenciado centralmente. Dask tem três componentes principais:

processo dask-scheduler: coordena as ações de vários trabalhadores. O agendador é assíncrono e orientado a eventos, respondendo simultaneamente a solicitações de computação de vários clientes e rastreando o progresso de vários funcionários.

processos dask-worker: Que são distribuídos por várias máquinas e as solicitações simultâneas de vários clientes.

processo dask-client: que é o principal ponto de entrada para usuários de dask.distributed

Diagrama da Solução:

Diagrama e Arquitetura da Solução

Eu preciso manter uma comunicação de baixa latência e configurações de rede simples entre o Juypter Notebook e o cluster Fargate. Por isso, estou criando o cluster do Notebook e o Dask Distributed na mesma Virtual Private Cloud (VPC).

Etapas de pré-implantação:

Primeiro, preciso criar um Repositório Elastic Container Registry (ECR) como um pré-requisito. Para fazer isso, eu vou para o Console da AWS -> Elastic Container Service (ECS) -> Selecionar Repositórios e, em seguida, “Criar repositório”.

(Você também pode pular esta etapa e criar um repositório github)

Precisamos dar um nome a ela – “ dask ” e clicar em “ Próximo passo ”:

Em seguida, passamos para a página " Introdução ", onde encontramos os comandos iniciais para criar e enviar a imagem do contêiner.

Agora que o repositório está pronto, mudarei para um “Terminal” de shell para fazer download, construir e marcar a imagem do contêiner e, em seguida, enviá-la para o ECR. Para fazer isso, estou executando os seguintes comandos shell:

 bash # git clone https://github.com/wmlba/ECS-Dask.git 
 bash # cd ECS-Dask; tar -xzf base-image.tar.gz; base-imagem cd 
 bash # `aws ecr get-login --no-inclui-email - região us-east-1` 
 bash # docker build -t dask. 
 bash # docker tag dask: última <ID da conta da AWS> .dkr.ecr.us-east-1.amazonaws.com / dask: latest 
 push # docker push <ID da conta da AWS> .dkr.ecr.us-east-1.amazonaws.com / dask: latest 

NOTA : Para executar os comandos acima, você precisa ter o Docker instalado localmente na máquina da qual você executa os comandos e as credenciais da AWS configuradas.

A imagem do docker que estou usando é ligeiramente modificada da publicada no repositório dask

Levará alguns minutos para criar a imagem e, em seguida, empurrá-la para o repositório ECR que foi criado em uma etapa anterior. Você pode verificar isso clicando no nome do repositório no console do ECS.

Implante a solução

Agora que tenho a imagem pronta, preciso implantar a solução usando um modelo do CloudFormation seguindo as etapas abaixo:

  1. lançar este modelo do CloudFormation na sua conta. Leva aproximadamente de 3 a 5 minutos para a pilha do CloudFormation concluir

A pilha do CloudFormation criará recursos como: Cluster Fargate, Definições de Tarefa, Serviços e Tarefas para o Dask worker e o Scheduler. Ele também criará uma função e política de execução do IAM para permitir o acesso ao repositório do Elastic Container Registry (ECR) e aos grupos de log do CloudWatch para logs. Todos os recursos serão criados na região us-east-1 por padrão.

2. Na página Especificar Detalhes, especifique uma sub-rede privada com um gateway NAT para o cluster. O Dask Scheduler e Workers irão se comunicar através de uma rede privada e o gateway NAT será necessário apenas para que o Serviço do ECS possa extrair a imagem ECR do repositório. Em seguida, selecione Próximo:

3. Na página Opções , selecione Avançar.

4. Na página Revisar , revise e confirme as configurações. Certifique-se de selecionar a caixa reconhecendo que o modelo criará recursos do AWS Identity and Access Management (IAM) com nomes personalizados. Para implantar a pilha, selecione Criar . Após alguns minutos, a criação da pilha deve estar completa.

Depois que a pilha é criada, você também pode confirmar se o cluster do ECS Dask está implantado em execução. Você pode verificar isso alternando para o Console do ECS -> Clique em Clusters -> Clique em Fargate-Dask-Cluster e, na guia de tarefas, deve haver duas tarefas em execução:

Agora que o Dask Cluster está pronto, vou criar o SageMaker Notebook para que eu possa começar a usar o cluster. Para fazer isso, mudo para o SageMaker Console -> Instâncias do Notebook -> Criar Instância do Notebook .

Em seguida, selecionarei as mesmas VPC e Sub-redes que foram selecionadas anteriormente no modelo do CloudFormation:

NOTA : Você pode selecionar qualquer outra sub-rede e grupo de segurança, desde que você habilite o acesso entre o bloco de notas SageMaker e o Dask Cluster.

Então, eu crio um novo caderno python3 clicando em New -> conda_python3 . Os pacotes do Dask são instalados por padrão no notebook SageMaker, mas é importante garantir que o pacote seja atualizado para a versão mais recente. Para verificar isso, vou executar o comando conda update no notebook:

NOTA : Se a versão do cliente for menor que a versão do planejador e do trabalhador, você encontrará erros ao iniciar o cliente.

A próxima etapa será criar o cliente e conectar-se ao cluster dask executando o código abaixo:

Observe que usei o nome DNS do agendador que foi atribuído automaticamente usando a funcionalidade de descoberta de serviço do ECS que usa as ações da API de nomeação automática do Route 53 para gerenciar as entradas de DNS do Route 53

Agora vamos fazer algumas operações nos dados usando o cluster, mas antes disso, eu aumentarei o número de trabalhadores no cluster para 7 trabalhadores. Para fazer isso, eu corro um comando no notebook como abaixo:

Após alguns segundos, o status das tarefas do trabalhador no Fargate Console será 'RUNNING'. Vou reiniciar o Dask Client para garantir que utilizemos a natureza de paralelismo do cluster.

Agora temos um cluster de 14 núcleos de CPU e 12 GB de memória (2 núcleos de CPU e 2 GB de memória para cada um dos 7 funcionários). Vamos fazer algumas operações intensivas de computação e memória e gerar alguns insights. Estou carregando um dataframe dask com os dados e calculando a distância da viagem e o agrupamento pelo número de passageiros.

Os resultados começam a aparecer após cerca de 2,5 minutos após a paralelização da tarefa em 7 trabalhadores diferentes e o carregamento de mais de 10 GB de dados em paralelo.

Visualização:

Capturas de tela do Bokeh Server na tarefa do Agendador que mostra as operações sendo encadeadas entre os trabalhadores. O painel pode ser acessado no navegador a partir do endereço IP do planejador e da porta 8787:

A captura de tela a seguir está mostrando a utilização de recursos (CPU e memória) para cada trabalhador:

Agora você deve estar pronto para fazer alguma mágica de pré-processamento!

Na parte 2 , mostrarei alguns códigos sobre a execução de análise e pré-processamento / engenharia de recursos e aprendizado de máquina usando o Dask Cluster que criamos.

Obrigado por ler o post. O feedback e a crítica construtiva são sempre bem-vindos. Eu vou ler todos os seus comentários.

-Vai