El conocimiento es el nuevo dinero.
Aprender es la nueva manera en la que inviertes
Acceso Cursos

Creación de operadores personalizados en RxJS

· 6 min de lectura
Creación de operadores personalizados en RxJS

Los operadores son uno de los bloques de construcción de RxJS. La librería viene con muchos operadores, que pueden ser usados para lidiar con casi cualquier situación que podamos encontrar, pero hay veces en las que puede ser útil crear los nuestros propios.

En este artículo, vamos a aprender diferentes maneras de crear nuestros propios operadores; Pero antes de empezar, vamos a explicar lo que, de hecho, es un operador.

Esto puede sorprenderle, pero un operador es sólo un observable. ¿Qué distingue a los operadores de otros observables? Los operadores son observables que, como su nombre indica, operan sobre un observable fuente.

Tomemos el siguiente ejemplo:

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

interval(1000).pipe(
  map(num => num + 1)
).subscribe(...)

Tenemos la función interval que devuelve un observable. Usando ese observable como fuente, empleamos el método pipe(), pasándole la función map, que devuelve un operador.

Veamos la implementación del método pipe() para que podamos entender mejor su funcionamiento:

class Observable {
  pipe(...operators): Observable<any> {
    return operators.reduce((source, next) => next(source), this);
  }
}

El método pipe toma un array de operadores, hace un bucle sobre él, y cada vez invoca al siguiente operador, pasándole el resultado del anterior como fuente. Si utilizamos esto en nuestro ejemplo, obtendremos la siguiente expresión:

map(interval): Observable

Del examen de este código, podemos aprender que la construcción de un operador es tan simple como escribir una función que toma una fuente observable como entrada, y devuelve un observable:

function myOperator<T>(source: Observable<T>) {
  return source;
}

Enhorabuena, acabamos de crear un operador. Sí, es inútil, ya que devuelve el mismo observable que recibe, pero bueno, no deja de ser un operador:

interval(1000).pipe(
  myOperator
).subscribe(value => console.log(value));
view raw

Ahora, detengámonos un segundo y hablemos de un error común sobre este tema. Si nos fijamos en nuestro ejemplo, es posible que oigas a la gente describirlo como estar "suscrito al observable del intervalo".

Eso no es exacto; siempre estás suscrito al último operador de la cadena (es decir, al último elemento de la lista de operadores).

Así que en este ejemplo, estamos suscritos al observable que devuelve la función miOperador. Permítanme demostrar esto.

Vamos a devolver un observable diferente de nuestro operador:

 function myOperator<T>(source: Observable<T>) {
  return new Observable(subscriber => {
    subscriber.next(`🦄`);
    subscriber.complete();
  });
}

Si volvemos a ejecutar nuestro ejemplo, lo único que obtendremos será un 🦄. Eso es porque recibimos el observable de origen (en nuestro caso el generado por un intervalo), pero no hacemos nada con él (por ejemplo, suscribirnos a él).

Esto demuestra que estamos suscritos al observable que devuelve myOperator, y no al que devuelve el intervalo.

Esto me lleva al siguiente tema: la cadena de observables.

Tomemos nuestro primer ejemplo, un intervalo con un mapa:

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

interval(1000).pipe(
  map(num => num + 1)
).subscribe(...)

Si, como hemos señalado, en realidad estamos suscritos al observable que devuelve el operador map(), ¿cómo alcanzamos el observable que devuelve interval()?

La respuesta es sencilla: ¡no lo hacemos! Cada operador recibe su observable de origen, que es el que le precede, y ese operador es el que (en la mayoría de los casos) se suscribe a él.

Si tu quieres saber más acerca de RXJS, te dejo el siguiente video

Cuando llamamos a subscribe() estamos ejecutando el observable map, que a su vez se suscribe al observable interval. Cada vez que el observable fuente del intervalo emite un nuevo valor, éste llega a la función de suscripción del mapa.

Entonces, tras aplicar la función de proyección del mapa al valor, el observable del mapa emite el resultado de esa función a la última suscripción de la cadena de observables.

Ahora que entendemos cómo funciona ese mecanismo, construyamos algunos operadores.

Creación de un operador filterNil


Un caso de uso común es aquel en el que tenemos una fuente observable que puede emitir un valor nulo o indefinido, y queremos ignorar esos valores.

Vamos a crear un operador que gestione esta tarea por nosotros, que podremos aplicar a cualquier fuente:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      })
    });
  }
}

En este caso, a diferencia del anterior, creamos una función que devuelve un operador. La razón es que esto hace que nuestro código sea escalable, ya que se puede ampliar fácilmente para recibir argumentos en el futuro 🦊.

Una vez que recibimos una fuente, nos suscribimos a ella. Cuando la fuente emite, comprobamos si el valor es undefined o null si ese es el caso, lo ignoramos; De lo contrario, pasamos el valor al siguiente suscriptor.

Lo mismo ocurre con las notificaciones de error y completa. Las pasamos a través de la cadena; de lo contrario, no serán manejadas correctamente por el resto de la cadena. Si ejecutamos el siguiente ejemplo, veremos que no recibimos ninguna notificación para la emisión inicial:

interval(1000).pipe(
  map(value => value === 0 ? undefined : value),
  filterNil()
).subscribe(value => console.log(value));

Los más observadores se habrán dado cuenta de un problema importante: hemos creado una fuga de memoria. Recuerda que cada observable devuelve una función de desuscripción, responsable de realizar cualquier limpieza necesaria.

Esta función es llamada cada vez que alguien llama a unsubscribe(). En nuestro ejemplo, nos hemos suscrito a la fuente, pero nunca nos hemos dado de baja de ella.

Esto significa que la fuente de intervalo seguirá funcionando incluso después de que hayamos llamado al método unsubscribe para el observable resultante.

Entonces, ¿cómo podemos arreglar esto? La solución es tan sencilla como llamar al método de desuscripción de la fuente:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      const subscription = source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      });

      return () => subscription.unsubscribe(); <======
    });
  }
}

En nuestro caso hay una forma más corta: podemos devolver directamente el resultado de la llamada a la suscripción de origen:

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      retrun source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        ...
      });
    });
  }
}
view raw

Ahora cuando llamamos a unsubscribe() en el resultado, en realidad llama a unsubscribe() en el observable fuente.

Creación de Operadores a partir de Operadores Existentes


Ahora que hemos aprendido los fundamentos de la creación de un operador y lo fácil que es, podemos utilizar la forma corta, y en la mayoría de los casos, la forma recomendada de crearlos construirlos a partir de los existentes.

Vamos a recrear el operador filterNil utilizando, como habrás adivinado, el operador filter:

function filterNil() {
  return function<T>(source: Observable<T>) {
    return source.pipe(filter(value => value !== undefined && value !== null));
  }
}

Está muy bien, pero podemos ir un paso más allá. Dado que los operadores pipeables devuelven funciones, que como hemos aprendido, reciben un observable de origen, podemos hacer filterNil aún más corto:

function filterNil() {
  return filter(value => value !== undefined && value !== null)
}

Terminemos con algunos operadores útiles que podemos utilizar a diario:

debug - Creando bonitos logs para depuración


Vamos a crear un operador que utiliza la API de registro de la consola y registra cada una de las notificaciones con colores agradables:

function debug(tag: string) {
  return tap({
    next(value) {
      console.log(`%c[${tag}: Next]`, "background: #009688; color: #fff; padding: 3px; font-size: 9px;", value)
    },
    error(error) {
      console.log(`%[${tag}: Error]`, "background: #E91E63; color: #fff; padding: 3px; font-size: 9px;", error)
    },
    complete() {
      console.log(`%c[${tag}]: Complete`, "background: #00BCD4; color: #fff; padding: 3px; font-size: 9px;")
    }
  })
}

Veámoslo en acción:

optionalDebounce - Debounce o no Debounce


Cuando construimos componentes, hay veces que exponemos una entrada, permitiendo al usuario pasar un valor de debounce opcional para ser usado en alguna acción.

Si el usuario no pasa un valor, podemos omitir el uso del operador debounceTime:

function optionalDebounce<T>(time?: number) {
  return function<T>(source: Observable<T>): Observable<T> {
    return time === undefined ? source : source.pipe(debounceTime(time));
  };
}

filterKey - Obtenga sólo los eventos de teclado que desee


Este operador es para cuando necesitamos filtrar teclas específicas mientras escuchamos eventos de teclado:

type KeyboardEventKeys = 'Escape' | 'Enter';

function filterKey(key: KeyboardEventKeys) {
  return filter((event: KeyboardEvent) => event.key === key);
}
fromEvent(document, 'keyup')
  .pipe(
    filterKey('Escape')
  ).subscribe();

polling - ¡Es hora de volver a pedir datos al servidor!


A veces necesitamos realizar sondeos desde nuestro backend. Podemos crear un operador personalizado para esto que lo hará más legible y sin problemas para aquellos que no están muy familiarizados con RxJS:

function polling<T>(stream: Observable<T>, period: number, initialDelay = 0) {
  return timer(initialDelay, period).pipe(concatMapTo(stream));
}
polling(this.http.get('https://..'), 10000).subscribe()

Rxjs es un camino maravilloso en angular, recuerda que si quieres aprender más acerca de este tema te dejo el siguiente video