Usando o Docker e o Pyspark

Bryant Dean Crocker Blocked Unblock Seguir Seguindo 9 de janeiro

Recentemente eu tenho jogado com o Pyspark um pouco e decidi escrever uma postagem no blog sobre o uso do Pyspark e do Spark SQL. O Spark é uma ótima ferramenta de código aberto para a inserção de dados e aprendizado de máquina em clusters de computação distribuída. O Pyspark é a API do Python para o Spark.

O Pyspark pode ser um pouco difícil de ser instalado e executado em sua máquina. O Docker é uma maneira rápida e fácil de obter um ambiente de ignição funcionando em sua máquina local e é como eu executo o Pyspark na minha máquina.

O que é o Docker?

Vou começar dando uma introdução ao Docker. De acordo com a wikipedia, “o Docker é um programa de computador que realiza virtualização em nível de sistema operacional , também conhecido como 'conteinerização'”. Para simplificar bastante, o Docker cria um sistema operacional walled off linux para executar o software no sistema operacional de suas máquinas, chamado de container. Para aqueles familiarizados com máquinas virtuais, um contêiner é basicamente um vm sem um hypervisor. Esses contêineres podem ser pré-configurados com scripts para instalar software específico e fornecer funcionalidade personalizada. O Dockerhub é um site que contém vários contêineres docker pré-configurados que podem ser executados rapidamente em seu computador. Um deles é o jupyter / pysparknotebook. Esta é a imagem do docker que usaremos hoje.

Iniciando o contêiner do Docker:

Configurar um contêiner Docker em sua máquina local é bem simples. Basta baixar a janela de encaixe do site da janela de encaixe e executar o seguinte comando no terminal:

 docker run -it -p 8888: 8888 jupyter / pyspark-notebook 

navegue até http: // localhost: 8888 no seu navegador e você verá a seguinte tela:

No seu terminal você deve ver um token:

copie e cole este token, os números após “/? token =”, no livro de texto do token e defina uma senha para o servidor de notebook Jupyter na caixa Nova Senha.

Com isso você está pronto para ir! O Spark já está instalado no contêiner. Você está pronto para abrir um caderno e começar a escrever algum código de ignição. Incluirei uma cópia do bloco de anotações, mas recomendo que você digite o código deste artigo em um novo bloco de anotações Jupyter em seu computador local. Isso ajuda você a aprender.

Para parar o contêiner docker e o servidor de notebook Jupyter, basta digitar control + c no terminal que o está executando.

Noções básicas do Pyspark

Spark é uma estrutura de computação de cluster de código aberto escrita principalmente em scala com APIs em R, python, scala e java. Ele é feito principalmente para análise de dados em grande escala e aprendizado de máquina que não cabe na memória local. Neste breve tutorial, não usarei um conjunto de dados grande demais para caber na memória. Este tutorial utiliza o guia oficial de obtenção: https://spark.apache.org/docs/latest/sql-getting-started.html .

Tipos de dados de faísca:

Existem dois tipos principais de dados no ecossistema de faíscas, conjuntos de dados distribuídos resilientes ou RDDs (que são como um cruzamento entre uma lista python e um dicionário) e dataframes (quadros de dados muito parecidos com R e python). Ambos os tipos de dados na ignição são particionados e imutáveis (o que significa que você não pode alterar o objeto, um novo é retornado). Neste tutorial, vou me concentrar no tipo de dados do dataframe.

O conjunto de dados:

O conjunto de dados que eu irei usar é um conjunto de dados um pouco grande para o fornecedor de Vermont do portal Socrata de dados abertos de Vermont. Pode ser baixado facilmente seguindo o link.

Configurando uma sessão do Spark:

Esse snippet de código inicializa o ambiente pyspark no contêiner docker e importa bibliotecas básicas para computação numérica.

 # importar bibliotecas necessárias 
importar pandas como pd
import numpy
import matplotlib.pyplot como plt
da importação pyspark.sql SparkSession
 # criar sparksession 
faísca = SparkSession
.builder
.appName ("Pysparkexample")
.config ("spark.some.config.option", "algum valor")
.getOrCreate ()

Lendo em um CSV:

Eu queria começar comparando a leitura em um CSV com pandas vs Spark. O Spark acaba lendo no CSV muito mais rápido que os pandas. Isso demonstra como os dataframes do Spark são muito mais rápidos quando comparados aos equivalentes de pandas.

 #time lendo em dados com faísca 
%% timeit
df = spark.read.csv ('Vermont_Vendor_Payments (1) .csv', header = 'true')
 # tempo de leitura em dados com pandas 
%% timeit
df_pandas = pd.read_csv ('Vermont_Vendor_Payments (1) .csv', low_memory = False)

Métodos Básicos de Spark:

como com os pandas, acessamos os nomes das colunas com o atributo .columns do dataframe.

 #nós podemos usar o atributo columns assim como com os pandas 
colunas = df.columns
print ('Os nomes das colunas são:')
para i em colunas:
imprimir (i)

Podemos obter o número de linhas usando o método .count () e podemos obter o número de colunas tomando o tamanho dos nomes das colunas.

 print ('O número total de linhas é:', df.count (), ' nO número total de colunas é:', len (df.columns)) 

O método .show () imprime as primeiras 20 linhas do dataframe por padrão. Eu escolhi apenas imprimir 5 neste artigo.

 # mostre as primeiras 5 linhas 
df.show (5)

O método .head () também pode ser usado para exibir a primeira linha. Isso imprime muito melhor no notebook.

 # mostrar primeira linha 
df.head ()

Como nos pandas, podemos chamar o método describe para obter resumos numéricos básicos dos dados. Precisamos usar o método show para imprimi-lo no notebook. Isso não imprime muito bem no notebook.

 df.describe (). show () 

Consultando os dados:

Um dos pontos fortes do Spark é que ele pode ser consultado com a respectiva biblioteca Spark de cada idioma ou com o Spark SQL. Vou demonstrar algumas consultas usando as opções pythonic e SQL.

O código a seguir registra a tabela temporária e seleciona algumas colunas usando a sintaxe SQL:

 # Vou começar criando uma consulta de tabela temporária com SQL 
df.createOrReplaceTempView ('VermontVendor')
spark.sql (
'' '
SELECT `Quarter Ending`, Departamento, Valor, Estado FROM VermontVendor
LIMITE 10
'' '
).exposição()

Este código executa praticamente a mesma operação usando a sintaxe pythonic:

 df.select ('Quarter Ending', 'Departamento', 'Valor', 'Estado'). show (10) 

Uma coisa a notar é que a solução pythonic é significativamente menos código. Eu gosto de SQL e é sintaxe, então eu prefiro a interface SQL sobre o pythonic.

Eu posso filtrar as colunas selecionadas na minha consulta usando a cláusula SQL WHERE

 spark.sql ( 
'' '
 SELECT `Quarter Ending`, Departamento, Valor, Estado FROM VermontVendor 
WHERE Department = 'Education'
LIMITE 10
 '' ' 
).exposição()

Um resultado semelhante pode ser obtido com o método .filter () na API python.

 df.select ('Quarter Ending', 'Departamento', 'Quantidade', 'Estado'). filter (df ['Departamento'] == 'Educação'). show (10) 

Plotagem

Infelizmente, não é possível criar gráficos diretamente com um dataframe do Spark. A solução mais simples é simplesmente usar o método .toPandas () para converter o resultado de cálculos do Spark em um dataframe do pandas. Eu dou alguns exemplos abaixo.

 plot_df = spark.sql ( 
'' '
 SELECT Department, SUM (Quantidade) como Total FROM VermontVendor 
Departamento GROUP BY
ORDER BY Total DESC
LIMITE 10
 '' ' 
) .toPandas ()

fig, ax = plt.subplots (1,1, figsize = (10,6))
plot_df.plot (x = 'Departamento', y = 'Total', tipo = 'barh', cor = 'C0', ax = ax, legenda = Falso)
ax.set_xlabel ('Departamento', tamanho = 16)
ax.set_ylabel ('Total', tamanho = 16)
plt.savefig ('barplot.png')
plt.show ()
 import numpy como np 
Importar seaborn como sns
plot_df2 = spark.sql (
'' '
SELECT Department, SUM (Quantidade) como Total FROM VermontVendor
Departamento GROUP BY
'' '
) .toPandas ()
plt.figure (figsize = (10,6))
sns.distplot (np.log (plot_df2 ['Total']))
plt.title ('Histograma de totais de log para todos os departamentos no conjunto de dados', tamanho = 16)
plt.ylabel ('Density', size = 16)
plt.xlabel ('Log Total', tamanho = 16)
plt.savefig ('distplot.png')
plt.show ()

Iniciando seu contêiner docker novamente:

Depois de iniciar e sair do contêiner docker pela primeira vez, você iniciará de forma diferente para usos futuros, já que o contêiner já foi executado.

Passe o seguinte comando para retornar todos os nomes de contêineres:

 docker ps -a 

Obtenha o ID do contêiner do terminal:

Em seguida, execute o docker start com o ID do contêiner para iniciar o contêiner:

 início de janela 903f152e92c5 

Seu servidor de notebook Jupyter será executado novamente em http: // localhost: 8888 .

O código completo com mais alguns exemplos pode ser encontrado no meu github:

https://github.com/crocker456/PlayingWithPyspark

Fontes:

PySpark 2.0 O tamanho ou a forma de um DataFrame
Obrigado por contribuir com uma resposta ao Stack Overflow! Algumas de suas respostas anteriores não foram bem recebidas, e você está… stackoverflow.com
Introdução – Spark 2.4.0 Documentation
Editar descrição spark.apache.org