El conocimiento es el nuevo dinero.
Aprender es la nueva manera en la que inviertes
Acceso Cursos

Cómo utilizar RxJS en Qwik

¿Cómo podemos utilizar RxJS sobre Qwik de una manera que siga la filosofía de Qwik de optimizar cada bit de código para que se descargue en el cliente lo más tarde posible?

· 17 min de lectura
Cómo utilizar RxJS en Qwik

RxJS no está soportado en Qwik, pero si queremos codificar reactivamente en Qwik hoy, necesitamos RxJS.

El modelo de suscripción de RxJS encaja bien con la filosofía de Qwik de división fina del código: Los flujos de RxJS no hacen nada hasta que alguien se suscribe, así que ¿para qué realizar el trabajo o cargar el código para hacerlo si nadie está escuchando?

NOTA: Ya tenemos el lanzamiento de QwikV1. Si deseas conocer que nos trae Qwik te dejo el siguiente blog.

Pero como no podemos serializar RxJS directamente, cuando la aplicación se renderiza en el servidor debe pasar la información de suscripción por toda la cadena RxJS, almacenándola en almacenes Qwik. Entonces, cuando se carga en el cliente, podemos comprobar si un evento tiene suscriptores antes de cargar cualquiera de los RxJS para procesarlo.

También podemos aplazar la carga de RxJS aguas abajo hasta que lleguen nuevos valores. Así que si tenemos un typeahead con un BehaviorSubject, un debounceTime y un switchMap, no necesitaremos cargar el código para el BehaviorSubject o debounceTime hasta que el usuario escriba, o para el switchMap hasta que un valor pase el filtro. Así es como se comportaría Qwik si lo implementáramos sin RxJS, sólo usando almacenes Qwik y useWatch$s.

Si tu quieres ver que es lo que nos trae Qwik te dejo el siguiente video

Este es el comportamiento ideal. 🤣

Desafortunadamente, después de descender a una madriguera de conejo bastante profunda, todavía no tenía una implementación que pudiera funcionar para todas las situaciones. Esto sigue siendo un reto abierto. Incluiré mis notas al final de este artículo para quien quiera seguirme.

Esto definitivamente merece un paquete NPM una vez resuelto, y ahora mismo quiero centrarme en StateAdapt, así que no voy a invertir más tiempo en esto por ahora.

Sin embargo, todavía hay una manera de utilizar RxJS en Qwik. Y creo que es bastante buena.

La solución que funciona ahora 🙏🏻

Dado que aún no sabemos cómo serializar RxJS, tenemos que configurar RxJS desde cero en el cliente. No podemos dividir los flujos y cargar cada segmento perezosamente, pero al menos podemos esperar hasta que se requiera RxJS en algún lugar del flujo antes de cargar todo el flujo. Eso sigue siendo mejor que lo que otros frameworks podrían hacer sin mucho trabajo o sintaxis inconveniente, así que no deberíamos estar demasiado tristes.

A veces necesitaremos un useClientEffect$ para obtener algún comportamiento RxJS inmediato, pero la API ideal debería facilitar el cambio entre eso y el gancho perezoso useWatch$, y deberíamos preferir useWatch$ siempre que sea posible.

Cada vez que definimos un observable, necesitamos definir los observables de los que depende. También necesitamos todos los observables que dependen de él, para que podamos averiguar si hay una suscripción en la parte inferior.

De esta manera tendremos el comportamiento esperado/habitual, perezoso de RxJS de no hacer ningún trabajo innecesario hasta que algo se suscriba. Esto significa que una vez que se necesita un observable, todo el grafo reactivo de observables que contiene ese observable necesita ser cargado.

La forma de activar la carga de código en Qwik es establecer el estado dentro de un almacén Qwik y utilizar un gancho para reaccionar a ese cambio de estado. Así que cada observable necesita un almacén correspondiente y useWatch$ que reaccione al almacén cargando y ejecutando el código que define el observable.

Cuando se carga un observable, necesitamos una reacción en cadena que haga que se cargue todo el código del observable en el gráfico, y luego que los observables se definan de arriba a abajo.

Así que, para determinar la arquitectura exacta de este sistema, veamos las diferentes situaciones que podrían desencadenar primero algún comportamiento de RxJS.

Ejemplo 1: Entrada de usuario
Imagina un typeahead que utiliza este RxJS:

const search$ = new BehaviorSubject('');

const results$ = search$.pipe(
  debounceTime(500),
  filter((search) => !!search.length),
  distinctUntilChanged(),
  switchMap((search) => fetchItems(search)),
);

Los almacenes Qwik pueden mantener el estado igual que un BehaviorSubject, por lo que no necesitamos definir el BehaviorSubject hasta que el usuario escriba.

Cuando el usuario escribe, podemos establecer la propiedad value de un almacén Qwik y realizar un seguimiento dentro de un useWatch$. Dentro del useWatch$ podemos definir el BehaviorSubject.

Inmediatamente después de definir el BehaviorSubject, necesitamos definir el observable results$. Necesitamos un gancho genérico que pueda reaccionar a cualquier observable padre que se defina. Así que tenemos que asignar el observable padre a una propiedad de Qwik store que pueda ser rastreada. ¿Pero cómo podemos hacer esto si los observables no pueden ser serializados? Bueno, resulta que los almacenes Qwik pueden contener valores no serializables si los pasas primero por la función noSerializede Qwik.

Así que, cuando el usuario escriba, tengamos una propiedad que podamos definir como

store.rx = noSerialize(new BehaviorSubject(initialValue));

Ahora podemos tener toda la carga de RxJS aguas abajo porque está rastreando las propiedades rx de sus almacenes observables de dependencia.

Cada recuadro representa una tienda Qwik. Cuando se vuelve azul, su propiedad rx ha sido definida. Hemos añadido un retraso de 2000 ms a esto para que puedas ver cómo se propaga por la cadena de tiendas Qwik RxJS.

Aquí está la sintaxis para hacer que esto suceda:

export default component$(() => {
  console.log("rerendering");
  const searchA = useSubject("searchA");

  const results = useStream$(
    () =>
      rx(searchA).pipe(
        debounceTime(500),
        filter((search) => !!search.length),
        distinctUntilChanged(),
        switchMap(search => fetchItems(search))
      ),
    [[] as Result[], [searchA]]
  );

  const resultsOut = useSubscribe(results);

  return (
    <>
      {* ... *}
    </>
  );
});

Hay mucho que explicar aquí, así que vamos a ir a través de él y el código fuente para hacer que suceda.

useSubject


useSubject es nuestro gancho personalizado que define el almacén BehaviorSubject. Antes de ver el código fuente, aquí hay algunas utilidades simples que utiliza:

RxStore es la forma de los almacenes Qwik que estamos creando para RxJS:

// utils/rx-store.type.ts
import { NoSerialize } from "@builder.io/qwik";
import { BehaviorSubject, Observable } from "rxjs";

export interface RxStore<T> {
  value: T;
  rx: NoSerialize<BehaviorSubject<T> | Observable<T>>;
  activated: boolean;
}

propagationDelay es una constante que estoy usando en unos cuantos archivos, así que he creado un archivo dedicado para ella, pero puedes ignorarla y eliminar el setTimeout a su alrededor siempre que la veas.

createRx es una simple función de utilidad que hice para asegurar que un BehaviorSubject está definido en la propiedad rx de un RxStore.

import { noSerialize } from "@builder.io/qwik";
import { BehaviorSubject } from "rxjs";
import { RxStore } from "./rx-store.type";

export function createRx<T>(store: RxStore<T>, value: T) {
  return noSerialize(store.rx || new BehaviorSubject<T>(value));
}

Ahora estamos listos para useSubject:

// utils/use-subject.function.ts
import { useStore, useWatch$ } from "@builder.io/qwik";
import { BehaviorSubject } from "rxjs";
import { createRx } from "./create-rx.function";
import { propagationDelay } from "./propagation-delay.const";
import { RxStore } from "./rx-store.type";

export function useSubject<T>(initialState: T): RxStore<T> {
  const store: RxStore<T> = useStore({
    rx: undefined,
    value: initialState,
    activated: false,
  });

  useWatch$(({ track }) => {
    const value = track(store, "value");
    if (value !== initialState) {
      store.activated = true;

      if (store.rx) {
        (store.rx as BehaviorSubject<T>).next(value);
      } else {
        store.rx = createRx(store, value);
      }
    }
  });

  useWatch$(({ track }) => {
    const activated = track(store, "activated"); // If dependent needs this, it will set activated = true
    if (activated) {
      setTimeout(() => {
        store.rx = createRx(store, store.value);
      }, propagationDelay);
    }
  });

  return store;
}

Los almacenes hijos reaccionarán cuando se defina rx, y los almacenes padres reaccionarán cuando se establezca activado a true. useStream$ explicará esto con más detalle.

useStream$


Antes de examinar useStream$, debemos considerar otra situación: Observables combinados.

Digamos que results$ switch mapea a otro observable definido con un Qwik store. Para definir results$, necesitamos activar el código del observable interno para que sea recuperado.

Mi primera idea fue crear una propiedad suscrita para que los observables de abajo pudieran establecerla y desencadenar la carga del código, pero eso se confundiría con las suscripciones reales de RxJS, y no son lo mismo: el observable interno debe definirse antes de obtener una suscripción real. Por lo tanto, vamos a hacer una propiedad activada, y tener el mismo useWatch$ que define el seguimiento observable.

Recuerda que si quieres aprender más acerca de Qwik, puedes visitar el curso en

Este es el aspecto de ese comportamiento:

El contorno rojo es cuando la tienda se ha activado. De nuevo, he añadido un retraso de 2000 para que puedas ver cómo funciona todo esto. Normalmente no habrá retraso.

Ahora estamos listos para el código que hace que esto funcione. useStream$ también utiliza un montón de utilidades, así que vamos a repasarlas.

qrlToRx es una función que convierte las promesas QRL de Qwik en observables. Esto es útil siempre que necesites pasar código no serializable a un flujo RxJS a través de una función.

// utils/qrl-to-rx.function.ts
import { QRL } from "@builder.io/qwik";
import { from, Observable, switchAll } from "rxjs";

export function qrlToRx<T>(fn: QRL<() => Observable<T>>) {
  return () => from(fn()).pipe(switchAll());
}

useRxStore es una función que define una tienda Qwik para RxJS, pero sin la lógica de BehaviorSubject:

// utils/use-rx-store.function.ts
import { useStore } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";

export function useRxStore<T>(initialState: T): RxStore<T> {
  const store = useStore({
    rx: undefined,
    value: initialState,
    activated: false,
  });

  return store;
}

Bien, ahora estamos listos para la gran función useStream$. He añadido comentarios a lo largo de explicar lo que hace, ya que es tan grande.

// utils/use-stream.function.ts
import {
  implicit$FirstArg,
  noSerialize,
  QRL,
  useWatch$,
} from "@builder.io/qwik";
import { Observable, tap } from "rxjs";
import { propagationDelay } from "./propagation-delay.const";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { useRxStore } from "./use-rx-store.function";

export function useStreamQrl<T>(
  fn: QRL<() => Observable<T>>,
  [initialState, deps]: [T, RxStore<any>[]]
): RxStore<T> {
  const store = useRxStore(initialState);

  useWatch$(({ track }) => {
    // Tracking store.activated and all dep rx's
    // Will not run until a child activates store or a parent rx gets defined
    // If activated or parent defined, activate all parents
    // If all parents defined, define this

    const activated = track(store, "activated");
    const [someDefined, allDefined, allActivated] = deps.reduce(
      (acc, source) => {
        const rx = track(source, "rx");
        acc[0] = acc[0] || !!rx;
        acc[1] = acc[1] && !!rx;
        acc[2] = acc[2] && source.activated;
        return acc;
      },
      [false, true, true]
    );

    if (someDefined) {
      setTimeout(() => {
        store.activated = true;
      }, propagationDelay);
    }

    if (activated && !allActivated) {
      // Activated from parent => someDefined becomes true
      // Activated from child => activated becomes true
      setTimeout(() => {
        deps.forEach(source => {
          source.activated = true;
        });
      }, propagationDelay);
    }

    if (allDefined) {
      setTimeout(() => {
        store.activated = true;
        store.rx = noSerialize(
          qrlToRx(fn)().pipe(tap(value => (store.value = value)))
        );
      }, propagationDelay);
    }
  });

  return store;
}

export const useStream$ = implicit$FirstArg(useStreamQrl);

Ha sido complicado, así que espero que tenga sentido. Comenta si algo no está claro.

useSubscribe


Finalmente, necesitamos suscribirnos al stream.

De nuevo, este tiene algunas utilidades. No es obvio todavía, pero hay una razón por la que los separé. Pero primero veamos el gancho en sí:

import { useStore, useWatch$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { subscribe } from "./subscribe.function";

export function useSubscribe<T>(store: RxStore<T>) {
  const outputStore = useStore({ value: store.value });

  useWatch$(args => {
    subscribe(args, store, outputStore);
  });

  return outputStore;
}

Así que tenemos un RxStore de entrada (con una propiedad rx) al que suscribirnos, y un nuevo almacén Qwik que contendrá los resultados.

Aquí está subscribe:

// utils/subscribe.function.ts
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function subscribe<T>(
  { track, cleanup }: WatchArgs,
  store: RxStore<T>,
  outputStore: { value: T }
) {
  const rx = track(store, "rx");
  if (rx) {
    const sub = rx.subscribe((val) => (outputStore.value = val));
    cleanup(() => sub.unsubscribe());
  }
}

Ejemplo nº 2: Suscripción del cliente


¿Qué pasa si tenemos un temporizador que queremos que se inicie tan pronto como el componente sea visible en el cliente?

Podemos definir un hook useSubscribeClient$ que haga lo mismo que el useSubscribe$ anterior, pero que también tenga un useClientEffect$ que establezca la propiedad activada del almacén del flujo padre.

Esto es lo que parece:

Y aquí está la implementación. Es lo mismo que useSubscribe, pero añadimos un efecto cliente para activar el observable padre inmediatamente:

import { useClientEffect$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { useSubscribe } from "./use-subscribe.function";

export function useSubscribeClient<T>(store: RxStore<T>) {
  useClientEffect$(() => {
    store.activated = true;
  });
  return useSubscribe(store);
}

¿Cómo definimos el temporizador? Lo trataremos como un observable independiente con una suscripción propia, pero por defecto esperaremos a que sea activado por observables inferiores. Podemos llamarlo useStreamToSubject$.

He aquí cómo lo hemos implementado:

// utils/use-stream-to-subject.function.ts
import { implicit$FirstArg, QRL, useWatch$ } from "@builder.io/qwik";
import { Observable } from "rxjs";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { streamToSubject } from "./stream-to-subject.function";
import { useSubject } from "./use-subject.function";

export function useStreamToSubjectQrl<T>(
  fn: QRL<() => Observable<T>>,
  initialState: T
): RxStore<T> {
  const store = useSubject(initialState);
  useWatch$((args) => streamToSubject(args, store, qrlToRx(fn)()));
  return store;
}
export const useStreamToSubject$ = implicit$FirstArg(useStreamToSubjectQrl);

Sólo hay una función que aún no hemos revisado:

// utils/stream-to-subject.function.ts
import { BehaviorSubject, finalize, Observable } from "rxjs";
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function streamToSubject<T>(
  { cleanup, track }: WatchArgs,
  store: RxStore<T>,
  obs$: Observable<T>
) {
  track(store, "activated");
  const sub = obs$
    .pipe(finalize(() => (store.rx as BehaviorSubject<T>)?.complete?.()))
    .subscribe(n => (store.value = n));
  cleanup(() => sub.unsubscribe());
}

Como puedes ver, espera a ser activado desde abajo, luego llama a la función que se le ha pasado para definir el observable. Un ejemplo de uso:

const intervalStore = useStreamToSubject$(() => interval(3 * 1000), 0);

Ejemplo 3: Emisiones observables independientes


¿Qué pasa si no queremos cargar todo el flujo de RxJS hasta que el temporizador emita?

Como mencioné al principio de este artículo, no tengo una solución general para la carga perezosa de segmentos del mismo gráfico observable de forma independiente, pero para el caso de un observable que pueda ser tratado de forma independiente y que obviamente tenga una suscripción, podemos definir un hook que se suscriba a un observable y asigne los valores emitidos a una propiedad value de Qwik store, de forma similar a como manejamos el input de usuario.

¿Es una buena idea, desde el punto de vista de la organización del código? No tengo ni idea. Pero así es como se ve el comportamiento:

Y aquí está la aplicación:

// utils/use-stream-to-subject-client.function.ts
import { implicit$FirstArg, QRL, useClientEffect$ } from "@builder.io/qwik";
import { Observable } from "rxjs";
import { qrlToRx } from "./qrl-to-rx.function";
import { RxStore } from "./rx-store.type";
import { streamToSubjectClient } from "./stream-to-subject-client.function";
import { useSubject } from "./use-subject.function";

export function useStreamToSubjectClientQrl<T>(
  fn: QRL<() => Observable<T>>,
  initialState: T
): RxStore<T> {
  const store = useSubject(initialState);
  useClientEffect$((args) => streamToSubjectClient(args, store, qrlToRx(fn)()));
  return store;
}
export const useStreamToSubjectClient$ = implicit$FirstArg(
  useStreamToSubjectClientQrl
);

Muy similar a useStreamToSubject, pero llama a streamToSubjectClient en su lugar, y lo hace dentro de un useClientEffect$ para que se ejecute inmediatamente.

Aquí está la implementación:

// utils/stream-to-subject-client.function.ts
import { Observable } from "rxjs";
import { RxStore } from "./rx-store.type";
import { WatchArgs } from "./watch-args.type";

export function streamToSubjectClient<T>(
  { cleanup }: WatchArgs,
  store: RxStore<T>,
  obs$: Observable<T>
) {
  const sub = obs$.subscribe(n => (store.value = n));
  cleanup(() => sub.unsubscribe());
}

Ejemplo 4: Qwik store como observable


Las tiendas Qwik son sincrónicamente reactivas, por lo que habrá muchas características que no necesiten RxJS. Sin embargo, si acabamos necesitando alguna reactividad asíncrona de RxJS, ¿no estaría bien poder añadirla y usar una tienda Qwik normal como si fuera una tienda Qwik RxJS?

Queremos ser capaces de tratar cualquier propiedad como un observable, por lo que tendremos que proporcionar tanto la tienda como la propiedad que queremos "encadenar" con nuestro código RxJS.

Así que el código se vería así:

const normalQwikStore = useStore({ prop: 'val' });
const rxStoreFromQwik = useSubjectFromStore([normalQwikStore, 'prop']);
// or
const store = useStore({ value: 'val' });
const rxStore = useSubjectFromStore(store);

Ahora rxStore está listo para ser usado dentro de usePipe$.

Y aquí está esa implementación:

// utils/use-subject-from-store.function.ts
import { useWatch$ } from "@builder.io/qwik";
import { RxStore } from "./rx-store.type";
import { useSubject } from "./use-subject.function";

export interface UseSubjectFromStore<T> {
  <Prop extends string, Store extends Record<Prop, T>>(
    input: { value: T } | [Store, Prop]
  ): RxStore<T>;
}

export function useSubjectFromStore<T>(
  input: Parameters<UseSubjectFromStore<T>>[0]
) {
  const isArray = Array.isArray(input);
  const initialValue = isArray ? input[0][input[1]] : input.value;
  const store = useSubject(initialValue);

  useWatch$(({ track }) => {
    const value = isArray ? track(input[0], input[1]) : track(input, "value");
    store.value = value;
  });

  return store;
}

Ejemplo 5: Sólo operador

RxJS tiene 114 operadores. Aunque su principal ventaja es permitir que el código se estructure de forma reactiva, esos 114 operadores pueden añadir mucha comodidad. Entonces, ¿qué pasa si queremos utilizar principalmente tiendas Qwik, pero a veces sólo utilizar un único operador de RxJS de vez en cuando?

Este enfoque básicamente ignora la pereza de RxJS asumiendo que todo ya tiene una suscripción. Para las entradas toma un operador, luego un almacén Qwik y una tupla de propiedades; y emite otro almacén Qwik, con la propiedad value conteniendo el valor emitido.

const store = useStore({ n: 0 });
const nEven = usePipe(() => filter(n => n % 2), [store, 'n']);

Una vez más, podemos asumir una propiedad por defecto de valor para el almacén de entrada.

Sólo estamos tomando en un solo operador, pero podríamos utilizar con operadores personalizados para darnos más RxJS en un solo almacén:

const results = usePipe$(
    () => stream =>
      stream.pipe(
        tap(console.log),
        map(n => n * 10),
        filter(n => n >= 100),
      ),
    normalQwikStore,
  );

Ahora, aquí está la implementación:

// utils/use-pipe.function.ts
import { implicit$FirstArg, QRL, useStore, useWatch$ } from "@builder.io/qwik";
import { OperatorFunction } from "rxjs";
import {
  UseSubjectFromStore,
  useSubjectFromStore,
} from "./use-subject-from-store.function";

export function usePipeQrl<T, U>(
  operatorQrl: QRL<() => OperatorFunction<T, U>>,
  input: Parameters<UseSubjectFromStore<T>>[0]
) {
  const rxStore = useSubjectFromStore(input);
  const outputStore = useStore({ value: undefined as any }); // Final operator output

  useWatch$(({ track, cleanup }) => {
    const rx = track(rxStore, "rx");
    if (rx) {
      operatorQrl().then(operator => {
        const sub = operator(rx).subscribe(
          value => (outputStore.value = value)
        );
        cleanup(() => sub.unsubscribe());
      });
    }
  });

  return outputStore;
}

export const usePipe$ = implicit$FirstArg(usePipeQrl);

Esta podría ser la forma más prometedora de utilizar RxJS en Qwik hoy en día, ya que pone la reactividad de Qwik en el centro.

Podría ser que el enfoque ideal para la reactividad asíncrona en Qwik no sea RxJS puro, sino principalmente un modelo de gancho Qwik. Con ganchos, llamarlos implica interés en sus valores, por lo que es similar a la suscripción, sólo que no tan granular.

Así que si eso se convierte en el nuevo modelo de "suscripción", entonces RxJS sólo se convierte en una biblioteca de utilidad que nos permite codificar declarativamente para la lógica asíncrona sin tener que implementar todo eso por nuestra cuenta por adelantado. Y ya que está centrado en Qwik, con el tiempo, cada llamada usePipe$ podría ser reemplazada por una solución más nativa de Qwik, tal vez involucrando señales, para obtener un renderizado más fino.

Notas


La solución ideal


Qwik parece seguir evolucionando rápidamente. ¿Habrá que rediseñar algo con esto de RxJS? ¿Cómo puede RxJS trabajar óptimamente con señales?

Aparentemente las promesas no resueltas pronto serán serializables en Qwik. ¿Qué aspecto tiene eso? Puede que valga la pena esperar a que el polvo se asiente un poco más antes de invertir demasiado tiempo en nada todavía. Sin embargo, estas notas pueden seguir siendo relevantes.

Para implementar el comportamiento ideal, tenemos que tratar cada segmento del vapor RxJS de forma independiente, con su propia suscripción interna, y la salida a un almacén Qwik para que podamos desencadenar un useWatch$ para cargar y ejecutar el siguiente segmento RxJS después de que el segmento actual haya hecho su trabajo.

También necesitamos poder cargar los flujos de abajo hacia arriba, para que la información de suscripción inicial pueda propagarse correctamente. Por ejemplo, si un observable se define usando un switchMap, el observable interno puede no estar suscrito inmediatamente, por lo que en el cliente deberíamos ignorar los eventos en la parte superior de ese flujo hasta que el switchMap se ejecute y se pase una suscripción hacia arriba.

El proceso general es el siguiente Un hook useSubscribe se ejecuta inicialmente en el servidor, generando un rastro del estado de suscripción hasta la parte superior de los flujos.

La aplicación se serializa, incluyendo estos rastros de suscripción. Cuando la aplicación se reanuda en el cliente, sólo se cargan los flujos RxJS que se encuentran en las rutas de suscripción, y sólo una vez que son necesarios para procesar nuevos eventos.

A medida que los valores bajan por los pipes RxJS, se carga el código RxJS en cada segmento, y si los flujos se combinan, las suscripciones se propagan hasta la parte superior de los flujos adyacentes según sea necesario, en el mismo proceso que se hizo inicialmente para el primer flujo.

Me imagino una sintaxis como esta:

const search = useSubject$('');
const debouncedSearch = useStream$(() => rx => rx(search).pipe(debounceTime(500)));

rx nos permite evitar proporcionar los almacenes Qwik como dependencias explícitas, haciendo esta sintaxis más DRY; en realidad la única forma en que podemos proporcionar almacenes Qwik como dependencias adecuadas es implícitamente dentro del flujo Rx de todos modos, porque no queremos suscribirnos ansiosamente a nada.

Considera este ejemplo:

const mode = useSubject$(Mode.Simple); // enum
const simple = useObservable(simple$);
const complex = useObservable(complex$);

const results$ = useStream$(() => rx => rx(mode).pipe(
  switchMap(m => m === Mode.Condense ? rx(simple$): rx(complex$)),
));

No queremos que Qwik establezca una suscripción a complex$ a menos que mode.value se establezca en Mode.Complex.

La única manera de propagar correctamente las suscripciones hasta los almacenes Qwik correctos es utilizando el propio mecanismo de suscripción de RxJS.

Entonces, ¿cómo sabemos cuándo RxJS realmente se suscribe y realiza un efecto secundario? Eso sería con defer. Luego podemos utilizar un finalize para limpiar después.

Por lo tanto, todo esto se puede ejecutar en el servidor inicialmente, y configurar las pistas correctamente, a continuación, sólo despierta en el cliente cuando lo que está siendo rastreado cambios.

Pero, ¿qué cambia? Cuando se define un observable padre, todavía no estamos interesados en él, sólo nos importa cuando emite un valor.

Así que tenemos que establecer una propiedad Qwik store al final de eso, para que pueda ser rastreado. Así que para cada flujo RxJS, necesitamos un almacén para ir junto con él, y una suscripción como esta:

store.sub = stream$.subscribe(value => store.value = value)

Por lo tanto, rx necesita rastrear la propiedad de valor de la tienda padre.

Pero aquí hay una situación complicada encima de todo esto: ¿Qué pasa si el evento que hace que el RxJS se cargue por primera vez en el cliente termina pasando un valor aguas abajo que cambia las suscripciones a otros almacenes?

¿Cómo actualizamos los almacenes Qwik para que actualicen estas suscripciones y no carguen RxJS innecesarios o provoquen errores por el trabajo inesperado de RxJS?

Necesitamos un contexto global que rastree cada tienda y sus suscripciones. Eso significa que cada tienda necesita un ID único. Eso puede ser proporcionado en el contexto también, así que cada vez que usamos un gancho RxJS, tenemos que llamar useContext para obtener un id para la nueva tienda Qwik correspondiente con la tubería RxJS.

Luego, cada vez que se actualicen las suscripciones RxJS, actualizaremos este almacén Qwik global con la información de suscripción correspondiente. Esto puede ser rastreado, por lo que una tienda puede detectar si algo de repente se suscribe a ella. Entonces puede activar su RxJS y establecer su propiedad value.

Así rx podría tener una implementación que devolvería algo como esto:

defer(() => {
  // Get parent store from global context; increment subscriber count
  // Track parent store `value` property
  // Add store id to a dep list in this store

  // return BehaviorSubject representing parent 
}).pipe(finalize(() => {
  // Get parent store from global context; decrement subscriber count
  // Set store.value to undefined?
  // Remove store id from dep list in this store
  // 
}))

Olvidé explicar por qué tengo este comentario ahí: // Añadir id de tienda a una lista dep de esta tienda.

¿Por qué la tienda actual también necesita una lista de las tiendas de las que depende? Necesitamos llamar a track dentro de nuestro defer para que Qwik llame a nuestro useWatch$ en el cliente cuando el observable padre emita un valor.

Esto nos da la posibilidad de cargar el RxJS y definirlo en el cliente. Necesitamos apartar los IDs de dependencia del almacén antes de suscribirnos al nuevo flujo, porque los valores iniciales en el cliente podrían causar un cambio en las dependencias, y necesitaremos actualizar el contexto global con esa información de suscripción cambiada.

Pero una vez que definimos el flujo RxJS, no va a ser llamado de nuevo; así que cuando una tienda padre actualiza un valor, no va a ser rastreado la próxima vez. Esto podría ser un problema.

Tal vez también podamos rastrear las dependencias y utilizarlas para rastrear el contexto global...

¿pero eso significa que también tenemos que almacenar los valores emitidos por RxJS en el contexto global? ¿Y podemos realmente hacer este seguimiento de forma dinámica? ¿Y es lo suficientemente eficiente?

Gracias por llegar hasta el final de este blog quiero recordarte que todo esto es gratis y posible gracias a que tu compartes. Un fuerte abrazo y recuerda que el conocimiento es poder.

Invertir en conocimientos produce siempre los mejores beneficios. (Benjamín Franklin)

Fuente

Artículos Relacionados

Mejora tu VSCODE (PRODUCTIVDAD)
· 5 min de lectura
Generando certificados SSL TOTALMENTE GRATIS
· 3 min de lectura