RxJava (). Filter {it.basic}

Uma maneira “reativa” de explicar alguns pontos-chave e conceitos sobre o ReactiveX

Paolo Brandi em AndroidPub Seg , a 8 de jul · 8 min ler

Este artigo não tem como objetivo explicar como o ReactiveX funciona, já existem muitos deles por aí (algum link no final para aqueles que podem estar interessados), mas ao invés disso ele quer ser uma referência para alguns conceitos básicos e terminologias que podem ser mais ou menos familiar.
Pode ser interessante para quem já está familiarizado com o mundo ReactiveX, mas às vezes se confunde ou para quem, em vez disso, é novo para ele e quer entender o significado de alguns termos, antes de entrar nele.

Apenas uma suposição antes de começar …

Este artigo refere-se à implementação Java da API ReactiveX, mais especificamente ao RxJava 2.x. Se você está interessado nas diferenças entre esta versão e seu irmão RxJava 1.x, por favor leia
isso .

Pronto? Vamos começar e subscrever este artigo, filtrando apenas o básico do RxJava 🙂

 Artigo observável = RxJava (). Filter {it.basic} 
article.subscribe {readNext ()}

vamos invocar readNext () toda vez que o artigo emite um novo conceito básico de Rx.

Abaixo da primeira emissão 🙂

readNext () – Uma introdução ao ReactiveX

Na página oficial do ReactiveX :

ReactiveX é às vezes chamado de "programação reativa funcional", mas isso é um equívoco. ReactiveX pode ser funcional e pode ser reativo, mas “programação reativa funcional” é um animal diferente. Um ponto principal de diferença é que a programação reativa funcional opera em valores que mudam continuamente ao longo do tempo, enquanto ReactiveX opera em valores discretos que são emitidos ao longo do tempo.

… Agora… segunda emissão…

readNext () – reativo

ReactiveX pode ser reativo.

Definição:
A programação reativa é um paradigma de programação declarativa que se ocupa dos fluxos de dados e da propagação de mudanças ao longo do tempo.
Com este paradigma é possível expressar fluxos de dados, e qualquer estado de mudança pode ser automaticamente propagado pelo modelo de execução subjacente através dos fluxos de dados.

… E… a terceira emissão… você entendeu agora, não?

readNext () – Funcional

ReactiveX pode ser funcional.

Definição:
A programação funcional é um paradigma de programação declarativa que trata a computação como a avaliação de funções matemáticas, usando funções “puras” que não produzem efeitos colaterais, evitam dados mutáveis e de estado variável – por exemplo, a mesma entrada sempre produzirá a mesma saída.

Em contraste com a programação imperativa, que é feita com instruções e seqüência de comandos para alterar o estado e onde os dados são mutáveis.
Na programação funcional, os dados são imutáveis e novos estados são criados a partir dos anteriores.

readNext () – eXtensões reativas

ReactiveX é uma biblioteca construída sobre o paradigma de programação reativa para compor programas assíncronos e baseados em eventos usando observáveis e abstraindo preocupações sobre coisas como threading de baixo nível, sincronização, thread-safety, estruturas de dados concorrentes e E / S sem bloqueio .

É chamado ReactiveX (também conhecido como Extensões reativas) porque:

  • ele estende o padrão do observador para suportar sequências de dados e / ou eventos
  • ele estende / combina a Programação Reativa com a Programação Funcional, adicionando operadores que permitem compor seqüências juntas declarativamente.

readNext () – RxOperators

ReactiveX oferece um grande conjunto de operadores para um tipo diferente de operações.
Em geral, os operadores podem ser encadeados uns aos outros, podem ser aplicados a fluxos de dados, podem transformar um tipo de dados para outro, podem ser compostos para produzir operadores mais complicados, podem ser estendidos para criar
operadores personalizados .

Os operadores podem ser agrupados por categoria :

  • Criando Observable como create (), just (), empty (), defer ()…
  • Transformando Observable como map (), flatMap ()…
  • Filtragem Observable como filter (), skip (), take ()…
  • Combinando Observable como merge (), zip (), switch ()…
  • Operadores de utilitário, como observeOn (), subscribeOn (), doOn ()…
  • Operadores condicionais, como contains (), takeIf ()…
  • Operadores matemáticos / agregados como max (), min (), concat ()…
  • e outros…

Aqui está uma lista completa de operadores e sua documentação que explica como cada um deles funciona e se destina a ser usado com a ajuda de diagramas de mármore típicos.

Exemplo de um diagrama em mármore para o mapa do operador ().

readNext () – Push Pattern

ReactiveX segue o padrão “ push ”, onde tudo é dado, fluxos de dados que podem ser emitidos por uma Fonte e consumidos por outras entidades chamadas Consumidores . Os fluxos de dados podem ser manipulados e combinados antes de serem consumidos.

readNext () – Fonte de dados

Com base na natureza da fonte, pode haver diferentes tipos de fluxos e consumidores.

readNext () – ObservableSource

No RxJava 2.x, ObservableSource é uma interface para um tipo de fonte que pode produzir 0 para muitos itens ou erro , o que permite que o consumidor, como o Observer, assine essa fonte de dados.

 / ** 
* Representa um {{não-contrapressão} básico
@link Observable} interface base de origem,
* consumível através de um {
@link Observer}.
*
*
@param < T > o tipo de elemento
*
@since 2.0
* /
interface pública ObservableSource <T> {

/ **
* Assina o Observer dado a esta instância ObservableSource.
*
@param observer the Observer, não null
*
@throws NullPointerException if { @code observer} é nulo
* /

void subscreve (@NonNull Observer <? super T> observador);
}

readNext () – observável

Observable é uma implementação do ObservableSource, que encerra o fluxo de dados e fornece um conjunto completo de RxOperators. Deve ser visto como um canal para conectar fontes e observadores.

readNext () – Observador

Um observador é o consumidor dos dados. No RxJava, é uma interface que fornece o mecanismo para ouvir cada emissão, erro e terminação do fluxo.

 Observador de interface pública <T> { 

/ **
* Fornece ao Observador um novo item para observar.
* <p>
* O {
@link Observable} pode chamar este método 0 ou mais vezes.
* <p>
* O {
@code Observable} não chamará esse método novamente depois de chamar { @link #onComplete} ou
* {
@link #onError}
*
*
@param t
* o item emitido pelo Observable
* /

void onNext (@NonNull T t);

/ **
* Notifica o Observer que o {
@link Observable} sofreu uma condição de erro.
* <p>
* Se o {
@link Observable} chama esse método. Depois, ele não chamará { @link #onNext} ou
* {
@link #onComplete}
*
*
@param e
* a exceção encontrada pelo Observable
* /

void onError (@NonNull Throwable e);

/ **
* Notifica o Observer que o {
@link Observable} terminou de enviar notificações baseadas em push.
* <p>
* O {
@link Observable} não chamará esse método se chamar { @link #onError}.
* /

void onComplete ();
}
  • onNext (): um Observable chama esse método sempre que o Observable emite um item. Este método toma como parâmetro o item emitido pelo Observable.
  • onError (): um Observable chama esse método para indicar que não conseguiu gerar os dados esperados ou encontrou algum outro erro. Ele não vai fazer mais chamadas para onNext ou onCompleted . O método onError toma como parâmetro uma indicação do que causou o erro.
  • onCompleted (): um Observable chama este método depois de ter chamado onNext pela última vez, se não encontrou nenhum erro.

readNext () – MaybeSource, Maybe e MaybeObserver

No RxJava 2.x, o MaybeSource é uma interface para um tipo de fonte que pode produzir um item ou erro de 0 a 1 , o que permite que o consumidor, como o MaybeObserver, assine essa fonte de dados. Talvez seja a implementação abstrata que fornece um conjunto de operadores.

 interface pública MaybeObserver <T> { 

/ **
* Notifica o MaybeObserver com um item e que o {
@link Maybe} terminou de enviar
* notificações baseadas em push.
* <p>
* O {
@link Maybe} não chamará esse método se chamar { @link #onError}.
*
*
@param t
* o item emitido pelo Talvez
* /

void onSuccess (@NonNull T t);

/ **
* Notifica o MaybeObserver que o {
@link Maybe} sofreu uma condição de erro.
* <p>
* Se o {
@link Maybe} chama esse método, ele não chamará { @link #onSuccess}.
*
*
@param e
* a exceção encontrada pelo Talvez
* /

void onError (@NonNull Throwable e);

/ **
* Chamado assim que o cálculo diferido for concluído normalmente.
* /

void onComplete ();
}

readNext () – SingleSource, Single e SingleObserver

No RxJava 2.x, SingleSource é uma interface para um tipo de fonte que pode produzir 1 item ou erro , o que permite que o consumidor, como o SingleObserver, assine essa fonte de dados. Único é a implementação abstrata que fornece um conjunto de operadores.

 interface pública SingleObserver <T> { 

/ **
* Notifica o SingleObserver com um único item e que o {
@link Single} terminou de enviar
* notificações baseadas em push.
* <p>
* O {
@link Single} não chamará esse método se chamar { @link #onError}.
*
*
@param t
* o item emitido pelo Single
* /

void onSuccess (@NonNull T t);

/ **
* Notifica o SingleObserver que o {
@link Single} sofreu uma condição de erro.
* <p>
* Se o {
@link Single} chama esse método, ele não chamará { @link #onSuccess}.
*
*
@param e
* a exceção encontrada pelo Single
* /

void onError (@NonNull Throwable e);
}

readNext () – CompletableSource, Completable e CompletableObserver

Em RxJava 2.x, CompletableSource é uma interface para um tipo de fonte que pode produzir 0 item ou erro , o que permite que o consumidor, como o CompletableObserver, assine essa fonte de dados. Completable é a implementação abstrata que fornece um conjunto de operadores.

 interface pública CompletableObserver { 

/ **
* Chamado assim que o cálculo diferido for concluído normalmente.
* /

void onComplete ();

/ **
* Chamado uma vez, se o cálculo adiado 'lançar' uma exceção.
*
@param e a exceção, não nula.
* /

void onError (@NonNull Throwable e);

readNext () – Publisher, Flowable e Subscriber

A relação entre Publisher, Flowable e Subscriber é a mesma que para ObservableSource, Observable e Observer, com a única diferença que um Flowable suporta Backpressure.

readNext () – Contrapressão

Contrapressão é a capacidade de lidar com uma fonte de dados em que a taxa de emissão é maior do que a taxa que um consumidor pode realmente consumir esses itens. Por exemplo, isso é verdadeiro para dados em tempo real, como coordenadas de GPS ou eventos de movimento. O ReactiveX fornece políticas diferentes para lidar com esses tipos de cenários. Flowable é o único fluxo de dados capaz de suportar a contrapressão. Para a natureza do problema, é compreensível que o motivo Talvez, Único e Completável não precise suportar esse recurso.

readNext () – Hot vs Cold Observable

  • Observáveis frios podem ser criados várias vezes e cada instância pode ser acionada por conta própria. Ele começa a emitir itens quando um observador se inscreve nele.
  • Observables quentes são como um "fluxo" de eventos em andamento. Os observadores podem ir e vir, mas o fluxo é criado e simplesmente continua.

readNext () – Agendadores

Agendadores são a maneira ReactiveX de lidar com tarefas assíncronas, abstraindo a simultaneidade, a sincronização e o gerenciamento de threads, permitindo alternar de um thread para outro de maneira fácil.

O RxJava possui vários agendadores padrão:

  • Schedulers.computation() : Execute um trabalho intensivo de computação em um número fixo de threads dedicados em segundo plano. A maioria dos operadores assíncronos usa isso como seu Scheduler padrão.
  • Schedulers.io() : executa operações semelhantes a E / S ou bloqueio em um conjunto de threads que muda dinamicamente.
  • Schedulers.single() : executa o trabalho em um único thread de maneira sequencial e FIFO.
  • Schedulers.trampoline() : executa o trabalho de maneira sequencial e FIFO em um dos threads participantes, geralmente para fins de teste.

readNext () – subscribeOn () vs observerOn ()

  • subscribeOn () define o Scheduler onde o trabalho de origem será executado. Como existe apenas uma fonte inicial para uma cadeia Observable, faz sentido ter apenas um operador subscribeOn () . Chamadas subseqüentes para subscribeOn () não afetarão ou substituirão a configuração anterior.
  • observeOn () define o Agendador onde todas as operações de recebimento de dados serão executadas. Em outras palavras, ele altera o Agendador para todos os operadores depois que ele é aplicado. Como pode haver muitas chamadas subsequentes para ele, ter várias chamadas observOn () em uma única cadeia faz sentido e, na verdade, é uma boa prática dependendo do tipo de tarefa que precisa ser executada.

readNext () – Upstream vs Downstream

 fonte 
.operador1 ()
.operador2 ()
.operador3 ()
.operador4 ()
.subscribe (consumidor)

Imagine-se no passo definido pelo operator2:

  • o montante é qualquer coisa para a source
  • downstream é qualquer coisa para o subscribe(consumer)

onComplete ()

Esse fluxo terminou 🙂 É por enquanto.
Se você está mais interessado em Rx e como funciona, você pode encontrar algumas referências que podem ajudá-lo.

Referências: