Dimensionando o Spark Streaming para registrar a ingestão de eventos

Como a Airbnb expandiu o streaming do Spark com um novo leitor Kafka balanceado que pode ingerir uma grande quantidade de eventos de registro do Kafka quase em tempo real

Hao Wang Blocked Desbloquear Seguir Seguindo 20 de novembro de 2018

por Hao Wang e Liyin Tang

Andando por um riacho durante uma experiência do Airbnb em Kuala Lumpur. Pesquisando, visualizando e registrando tais experiências, todas produzirão eventos de registro que serão processados por nossa estrutura de processamento de fluxo.

Registrando Ingestão de Eventos no Airbnb

Os eventos de registro são emitidos de clientes (como aplicativos móveis e navegador da Web) e serviços on-line com informações importantes e contexto sobre as ações ou operações. Cada evento contém uma informação específica. Por exemplo, quando um convidado procura por uma casa de praia em Malibu no Airbnb.com, um evento de pesquisa contendo a localização, datas de check-in e check-out, etc. seria gerado (e tornado anônimo para proteção de privacidade).

Na Airbnb, o registro de eventos é crucial para entendermos os hóspedes e os anfitriões e, em seguida, proporcionar-lhes uma experiência melhor. Ele informa a tomada de decisões nos negócios e impulsiona o desenvolvimento de produtos em funções de engenharia como Pesquisa, Experimentação, Pagamentos etc. Como exemplo, os eventos de registro são uma fonte importante para treinar modelos de aprendizado de máquina para classificação de busca de listagens.

Os eventos de registro são ingeridos no data warehouse quase em tempo real e servem como fonte para muitos trabalhos de ETL e análise. Eventos são publicados para Kafka de clientes e serviços. Um trabalho de streaming do Spark (criado no topo do Airstream , a estrutura de processamento de streaming do Airbnb) é continuamente lido por Kafka e grava os eventos no HBase para deduplicação. Por fim, os eventos são despejados do HBase em uma tabela Hive por hora. Como os eventos de registro são inseridos em muitos pipelines e alimentam vários painéis em toda a empresa, é extremamente importante garantir que eles sejam armazenados no data warehouse de maneira oportuna e atendam aos SLAs.

O volume de eventos gerado é enorme e está aumentando rapidamente. Isso representa sérios desafios para a infraestrutura de processamento existente, particularmente o trabalho de streaming do Spark que ingere eventos do Kafka para o HBase. Neste artigo, discutimos os desafios em dimensionar a infraestrutura e uma solução que pode suportar maior rendimento por uma ordem de magnitude e com melhor eficiência.

À medida que a empresa do Airbnb cresce, o número de eventos por semana aumenta.

Os desafios

1. Paralelismo Spark é determinado pelo número de partições Kafka

No atual conector Spark Kafka, há uma correspondência de um para um entre as partições Kafka e as tarefas Spark. Basicamente, uma tarefa Spark é instanciada para ler de uma partição Kafka para garantir a ordenação de eventos quando eles são processados no Spark. No entanto, com esse design, não podemos simplesmente dimensionar o trabalho de streaming do Spark, aumentando o paralelismo e alocando mais recursos.

Para aumentar o paralelismo e a taxa de transferência do Spark, é preciso designar mais partições do Kafka para tópicos com eventos grandes ou eventos altos de QPS. Infelizmente, esse é um processo bastante manual e não é escalonável quando há um grande número de tópicos (que continua aumentando).

Outro problema é que a designação de mais partições para um tópico no Kafka não se aplica retrospectivamente a eventos que já estão no Kafka. As partições adicionais estão disponíveis apenas para novos eventos. É impraticável para nós antecipar o pico de eventos e atribuir mais partições de antemão ao tópico do Kafka afetado. Um pico pode ocorrer a qualquer momento e pode ser devido a vários motivos, como novos recursos do produto ou feriados.

Quando o volume de eventos atinge um nível crítico, os grandes tópicos do Kafka geralmente não podem ser ingeridos com a rapidez necessária para o data warehouse. O problema é exacerbado pela distorção de dados nos eventos que discutiremos a seguir.

2. Inclinação no volume e tamanho do evento

Diferentes tipos de eventos estão sendo registrados com variações significativas em seu volume e tamanho. Alguns são realmente esparsos e alguns podem ter um QPS que é várias ordens de grandeza maior. Os tamanhos dos tipos de eventos podem variar de centenas de bytes a centenas de kilobytes. O gráfico abaixo mostra a grande variação do tamanho médio do evento para os tópicos do Kafka (observe que o eixo Y está na escala de log). Embora tentemos atribuir mais partições para eventos maiores, ainda há uma séria distorção nas partições de Kafka.

O skew é um problema sério em geral para aplicativos de dados. Nesse caso, algumas tarefas do Spark demorariam muito mais tempo para serem concluídas do que outras. Isso leva a inatividade de muitos executores e desperdício de recursos, já que um trabalho do Spark passa para o próximo estágio quando todas as tarefas em um estágio são concluídas. As partições de Kafka com os maiores eventos levariam muito tempo para ler se o tópico não tivesse partições suficientes. Isso resulta em atraso na tarefa de fluxo do Spark, pois os lotes são processados sequencialmente.

3. Quase em tempo real Ingestão e Headroom para Pegar Up

Devido aos desafios acima, há pouco espaço na taxa de transferência do trabalho de streaming do Spark. Uma vez que o trabalho está atrasado devido a vários problemas (como nós de dados ruins ou interrupção do Hive Metastore), leva muito tempo para recuperar o atraso.

Por exemplo, suponhamos que um trabalho com intervalo de 2 minutos processe um lote em 1 minuto em média. Se o trabalho estiver atrasado por 4 horas, levaria mais 4 horas para alcançá-lo. Se quisermos recuperar em 1 hora, isso requer um headroom de 4X (ou seja, processar cada lote em 24s). Além de se recuperar de incidentes, também é necessário um grande espaço para lidar com picos sazonais. Portanto, para uma ingestão quase em tempo real, é crucial ter espaço extra na taxa de transferência.

A solução

Em um sistema ideal, gostaríamos de poder escalonar horizontalmente os trabalhos de streaming do Spark (ou seja, alcançar uma taxa de transferência mais alta aumentando o paralelismo e alocando mais recursos). Também gostaríamos que esses trabalhos fossem balanceados em carga, de modo que cada tarefa levasse aproximadamente o mesmo tempo para ler de Kafka.

Para alcançar esses dois objetivos, nós da equipe da Airbnb Data Platform desenvolvemos um leitor Spark Kafka balanceado que satisfaz esses dois requisitos.

Leitor Spark Kafka Balanceado

Para o fluxo de processamento, a ordenação de eventos não é um requisito, uma vez que os eventos ingeridos são processados minimamente e, em seguida, armazenados no HBase individualmente. Isso nos permite repensar o modelo e procurar novas maneiras de resolver os problemas de dimensionamento. Como resultado, criamos um novo leitor Kafka balanceado para Spark que 1) permite um número arbitrário de divisões, de modo que o paralelismo pode ser aumentado para fornecer maior rendimento; 2) calcula as divisões com base no volume e no tamanho do evento.

Em um nível alto, o leitor Kafka balanceado funciona da seguinte maneira:

  1. Ele pré-calcula o tamanho médio do evento em cada tópico e o salva em um arquivo CSV.
  2. Quando o trabalho de streaming do Spark instancia o leitor Kafka equilibrado, ele passa um parâmetro adicional numberOfSplits para especificar o paralelismo desejado .
  3. Para cada partição Kafka, ele calcula o intervalo de deslocamento para leitura (do deslocamento atual para o deslocamento mais recente) e aplica a restrição maxRatePerPartition , se definida.
  4. Ele usa o algoritmo de particionamento balanceado (descrito na próxima seção) para atribuir subconjuntos de intervalo de deslocamento a divisões uniformemente.
  5. Cada tarefa Spark lê a partir de Kafka um ou mais intervalos de deslocamento de acordo com a divisão.

Abaixo está um exemplo simples com 2 tópicos de Kafka e 3 divisões. Os eventos no tópico A têm um QPS maior, mas um tamanho menor que o tópico B. O leitor Kafka equilibrado agruparia subconjuntos desses eventos juntos, de modo que cada divisão lê 1/3 dos dados de Kafka. Uma divisão (divisão 2) incluiria 4 eventos do tópico A e 1 evento do tópico B para que o tamanho total seja 8 kb para cada divisão.

Observe que a Etapa 1 pode ser aprimorada no futuro, calculando o tamanho médio do evento dinamicamente para que novos tópicos e tópicos com mudanças freqüentes no tamanho do evento sejam melhor explicados.

Algoritmo de Particionamento Balanceado

O problema de atribuir faixas de deslocamento a divisões uniformemente é muito semelhante ao problema de empacotamento de caixa difícil NP. Algoritmos sofisticados para soluções ótimas e algoritmos rápidos para soluções não ideais existem com complexidade computacional não linear. No entanto, eles não podem ser usados porque nosso problema é um pouco diferente em que 1) o número de divisões (ou compartimentos) é fixo; 2) um intervalo de deslocamento (ou item) pode ser dividido em partes menores.

Em vez de adaptar um algoritmo existente complexo, desenvolvemos um algoritmo simples, mas eficaz, que é ilustrado abaixo.

  1. Calcule o peso ideal por divisão, de acordo com a fórmula acima. Para novos tipos de eventos que não estão na lista pré-calculada, use o tamanho médio de todos os tipos de eventos.
  2. Começando da divisão 0. Para cada intervalo de compensação
  3. Atribuir à divisão atual se o peso total for menor que o peso por divisão
  4. Se não couber, separe-o e atribua o subconjunto do intervalo de deslocamento que se encaixa
  5. Se a divisão atual for maior que o peso por divisão , vá para a próxima divisão

Esse algoritmo é super rápido com O (número de divisões). Apenas atravessa as divisões e as partições de Kafka uma vez sequencialmente. O resultado é que os pesos para a maioria das divisões são extremamente balanceados, exceto a última divisão que pode ter muito menos peso (o que é bom porque estamos desperdiçando recursos de no máximo uma tarefa). Em um teste, o peso por divisão estimado é de 489.541.767 com 20k splits. Os pesos para as menores e maiores divisões são 278.068.116 e 489.725.277, respectivamente. A segunda divisão menor tem um peso de 489.541.772. Excluindo a menor divisão, a diferença entre o segundo menor e o maior divide é de 183.505 (apenas 0,04% do maior peso).

O algoritmo de particionamento equilibrado teve bom desempenho tanto no teste quanto na produção. A variação do tempo de execução da tarefa Spark (conforme mostrado no gráfico abaixo) é muito mais distribuída do que o leitor Spark Kafka original. A maioria das tarefas terminou em 2 minutos. Uma pequena parte deles demorou 2 a 3 minutos. Comparando com as amplas faixas de QPS e tamanho do evento, a pequena variação no tempo de execução da tarefa demonstra a incrível eficácia do algoritmo de particionamento balanceado. Ao levar em conta o tamanho e o volume do evento, ele garante que a carga de trabalho de distribuição seja distribuída uniformemente entre os executores.

Melhorias nos sistemas de upstream e downstream

O leitor Kafka equilibrado é uma peça crucial de escalonamento da ingestão de streaming de eventos de registro. Também é importante garantir que não haja outro gargalo nos sistemas upstream e downstream. Nesse caso, aprimoramos o Kafka e o HBase para aprimorar sua produtividade e confiabilidade. Para Kafka, os corretores foram migrados para o VPC, que tem um rendimento de 4X. Um job de streaming é configurado para monitorar o QPS por partição do Kafka para que, quando os volumes de eventos aumentarem, mais partições possam ser adicionadas em tempo hábil. Para o HBase a jusante, o número de regiões para a tabela HBase foi aumentado de 200 para 1000, de modo que o carregamento em massa dos eventos para o HBase pode ter um paralelismo mais alto (que é determinado pelo número de regiões).

Para o trabalho de streaming do Spark, a execução especulativa foi ativada para lidar melhor com problemas de confiabilidade na infraestrutura subjacente. Por exemplo, uma tarefa do Spark pode ficar presa devido à leitura de um nó de dados inválidos com discos defeituosos. Com a execução especulativa, o trabalho é muito menos provável de ser afetado por esses tipos de problemas.

Pensamentos finais

Graças ao leitor Kafka balanceado, os aplicativos Spark que consomem do Kafka agora são escalonáveis horizontalmente com paralelismo arbitrário. O algoritmo de particionamento equilibrado é simples e provou ser extremamente eficaz. Devido a esses aprimoramentos, a tarefa de streaming do Spark para a ingestão de eventos de registro pode manipular uma ordem de grandeza mais eventos que o anterior. A estabilidade do sistema melhorou tanto que não vimos nenhum atraso significativo desde que as mudanças foram implementadas.

Para o crescimento futuro do tráfego de eventos e picos, a tarefa de streaming do Spark para registrar a ingestão de eventos poderá lidar com eles de maneira suave e eficiente. Não há mais preocupação em distorcer os eventos. Se o trabalho estiver atrasado devido a problemas de infraestrutura subjacente, será capaz de recuperar rapidamente.

Os problemas que resolvemos aqui não são incomuns em aplicações Spark de larga escala e aplicativos de dados em geral. É importante entender cuidadosamente os dados em si e como eles são processados em cada etapa, o que pode revelar possíveis gargalos, desvios nos dados e oportunidades de otimização. Por exemplo, o Spark fornece uma boa interface do usuário mostrando o DAG para cada trabalho. A partir disso, pode-se entender como um trabalho é executado e se ele pode ser ajustado para um melhor desempenho via cache, repartição e etc.

Agradecimentos

O escalonamento da ingestão de fluxo de eventos de log envolve muitos sistemas upstream e downstream. O projeto foi um esforço coletivo de quatro equipes da Airbnb Data Platform e da Production Infrastructure. Não seria possível sem as imensas contribuições de Cong Zhu, Pala Muthiah, Jinyang Li, Ronnie Zhu e Gabe Lyons. Somos gratos pela ajuda entusiasta de Xu Zhang com Kafka. Gostaríamos de agradecer a Guang Yang, Jonathan Parks, Gurer Kiratli, Xinyao Hu e Aaron Siegel pelo seu apoio incomparável neste esforço.

Somos gratos a Gurer Kiratli e Xiaohan Zeng por sua ajuda na revisão deste post no blog.