From d1c3a98efb24c266783d9b1f8b1d635e288ae8ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9C=A8=20Audrey=20=E2=9C=A8?= Date: Wed, 2 Oct 2024 14:47:22 -0400 Subject: [PATCH] introduce additional rx helpers (#11361) --- libs/tools/generator/core/src/rx.spec.ts | 352 +++++++++++++++++++++++ libs/tools/generator/core/src/rx.ts | 99 ++++++- 2 files changed, 450 insertions(+), 1 deletion(-) create mode 100644 libs/tools/generator/core/src/rx.spec.ts diff --git a/libs/tools/generator/core/src/rx.spec.ts b/libs/tools/generator/core/src/rx.spec.ts new file mode 100644 index 0000000000..b98e79bb07 --- /dev/null +++ b/libs/tools/generator/core/src/rx.spec.ts @@ -0,0 +1,352 @@ +import { EmptyError, Subject, tap } from "rxjs"; + +import { anyComplete, on, ready } from "./rx"; + +describe("anyComplete", () => { + it("emits true when its input completes", () => { + const input$ = new Subject(); + + const emissions: boolean[] = []; + anyComplete(input$).subscribe((e) => emissions.push(e)); + input$.complete(); + + expect(emissions).toEqual([true]); + }); + + it("completes when its input is already complete", () => { + const input = new Subject(); + input.complete(); + + let completed = false; + anyComplete(input).subscribe({ complete: () => (completed = true) }); + + expect(completed).toBe(true); + }); + + it("completes when any input completes", () => { + const input$ = new Subject(); + const completing$ = new Subject(); + + let completed = false; + anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) }); + completing$.complete(); + + expect(completed).toBe(true); + }); + + it("ignores emissions", () => { + const input$ = new Subject(); + + const emissions: boolean[] = []; + anyComplete(input$).subscribe((e) => emissions.push(e)); + input$.next(1); + input$.next(2); + input$.complete(); + + expect(emissions).toEqual([true]); + }); + + it("forwards errors", () => { + const input$ = new Subject(); + const expected = { some: "error" }; + + let error = null; + anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) }); + input$.error(expected); + + expect(error).toEqual(expected); + }); +}); + +describe("ready", () => { + it("connects when subscribed", () => { + const watch$ = new Subject(); + let connected = false; + const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); + + // precondition: ready$ should be cold + const ready$ = source$.pipe(ready(watch$)); + expect(connected).toBe(false); + + ready$.subscribe(); + + expect(connected).toBe(true); + }); + + it("suppresses source emissions until its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // precondition: no emissions + source$.next(1); + expect(results).toEqual([]); + + watch$.next(); + + expect(results).toEqual([1]); + }); + + it("suppresses source emissions until all watches emit", () => { + const watchA$ = new Subject(); + const watchB$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready([watchA$, watchB$])); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // preconditions: no emissions + source$.next(1); + expect(results).toEqual([]); + watchA$.next(); + expect(results).toEqual([]); + + watchB$.next(); + + expect(results).toEqual([1]); + }); + + it("emits the last source emission when its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + // precondition: no emissions + source$.next(1); + expect(results).toEqual([]); + + source$.next(2); + watch$.next(); + + expect(results).toEqual([2]); + }); + + it("emits all source emissions after its watch emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + source$.next(2); + + expect(results).toEqual([1, 2]); + }); + + it("ignores repeated watch emissions", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const results: number[] = []; + ready$.subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + watch$.next(); + source$.next(2); + watch$.next(); + + expect(results).toEqual([1, 2]); + }); + + it("completes when its source completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + let completed = false; + ready$.subscribe({ complete: () => (completed = true) }); + + source$.complete(); + + expect(completed).toBeTruthy(); + }); + + it("errors when its source errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const expected = { some: "error" }; + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + source$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + const expected = { some: "error" }; + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + watch$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch completes before emitting", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const ready$ = source$.pipe(ready(watch$)); + let error = null; + ready$.subscribe({ error: (e: unknown) => (error = e) }); + + watch$.complete(); + + expect(error).toBeInstanceOf(EmptyError); + }); +}); + +describe("on", () => { + it("connects when subscribed", () => { + const watch$ = new Subject(); + let connected = false; + const source$ = new Subject().pipe(tap({ subscribe: () => (connected = true) })); + + // precondition: on$ should be cold + const on$ = source$.pipe(on(watch$)); + expect(connected).toBeFalsy(); + + on$.subscribe(); + + expect(connected).toBeTruthy(); + }); + + it("suppresses source emissions until `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + // precondition: on$ should be cold + source$.next(1); + expect(results).toEqual([]); + + watch$.next(); + + expect(results).toEqual([1]); + }); + + it("repeats source emissions when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + source$.next(1); + + watch$.next(); + watch$.next(); + + expect(results).toEqual([1, 1]); + }); + + it("updates source emissions when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + source$.next(1); + watch$.next(); + source$.next(2); + watch$.next(); + + expect(results).toEqual([1, 2]); + }); + + it("emits a value when `on` emits before the source is ready", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + watch$.next(); + source$.next(1); + + expect(results).toEqual([1]); + }); + + it("ignores repeated `on` emissions before the source is ready", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + + watch$.next(); + watch$.next(); + source$.next(1); + + expect(results).toEqual([1]); + }); + + it("emits only the latest source emission when `on` emits", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const results: number[] = []; + source$.pipe(on(watch$)).subscribe((n) => results.push(n)); + source$.next(1); + + watch$.next(); + + source$.next(2); + source$.next(3); + watch$.next(); + + expect(results).toEqual([1, 3]); + }); + + it("completes when its source completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + let complete: boolean = false; + source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); + + source$.complete(); + + expect(complete).toBeTruthy(); + }); + + it("completes when its watch completes", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + let complete: boolean = false; + source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) }); + + watch$.complete(); + + expect(complete).toBeTruthy(); + }); + + it("errors when its source errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const expected = { some: "error" }; + let error = null; + source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); + + source$.error(expected); + + expect(error).toEqual(expected); + }); + + it("errors when its watch errors", () => { + const watch$ = new Subject(); + const source$ = new Subject(); + const expected = { some: "error" }; + let error = null; + source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) }); + + watch$.error(expected); + + expect(error).toEqual(expected); + }); +}); diff --git a/libs/tools/generator/core/src/rx.ts b/libs/tools/generator/core/src/rx.ts index 070d34d37d..851b6cfe7c 100644 --- a/libs/tools/generator/core/src/rx.ts +++ b/libs/tools/generator/core/src/rx.ts @@ -1,4 +1,18 @@ -import { map, pipe } from "rxjs"; +import { + concat, + concatMap, + connect, + endWith, + first, + ignoreElements, + map, + Observable, + pipe, + race, + ReplaySubject, + takeUntil, + zip, +} from "rxjs"; import { reduceCollection, distinctIfShallowMatch } from "@bitwarden/common/tools/rx"; @@ -37,3 +51,86 @@ export function newDefaultEvaluator() { return pipe(map((_) => new DefaultPolicyEvaluator())); }; } + +/** Create an observable that, once subscribed, emits `true` then completes when + * any input completes. If an input is already complete when the subscription + * occurs, it emits immediately. + * @param watch$ the observable(s) to watch for completion; if an array is passed, + * null and undefined members are ignored. If `watch$` is empty, `anyComplete` + * will never complete. + * @returns An observable that emits `true` when any of its inputs + * complete. The observable forwards the first error from its input. + * @remarks This method is particularly useful in combination with `takeUntil` and + * streams that are not guaranteed to complete on their own. + */ +export function anyComplete(watch$: Observable | Observable[]): Observable { + if (Array.isArray(watch$)) { + const completes$ = watch$ + .filter((w$) => !!w$) + .map((w$) => w$.pipe(ignoreElements(), endWith(true))); + const completed$ = race(completes$); + return completed$; + } else { + return watch$.pipe(ignoreElements(), endWith(true)); + } +} + +/** + * Create an observable that delays the input stream until all watches have + * emitted a value. The watched values are not included in the source stream. + * The last emission from the source is output when all the watches have + * emitted at least once. + * @param watch$ the observable(s) to watch for readiness. If `watch$` is empty, + * `ready` will never emit. + * @returns An observable that emits when the source stream emits. The observable + * errors if one of its watches completes before emitting. It also errors if one + * of its watches errors. + */ +export function ready(watch$: Observable | Observable[]) { + const watching$ = Array.isArray(watch$) ? watch$ : [watch$]; + return pipe( + connect>((source$) => { + // this subscription is safe because `source$` connects only after there + // is an external subscriber. + const source = new ReplaySubject(1); + source$.subscribe(source); + + // `concat` is subscribed immediately after it's returned, at which point + // `zip` blocks until all items in `watching$` are ready. If that occurs + // after `source$` is hot, then the replay subject sends the last-captured + // emission through immediately. Otherwise, `ready` waits for the next + // emission + return concat(zip(watching$).pipe(first(), ignoreElements()), source).pipe( + takeUntil(anyComplete(source)), + ); + }), + ); +} + +/** + * Create an observable that emits the latest value of the source stream + * when `watch$` emits. If `watch$` emits before the stream emits, then + * an emission occurs as soon as a value becomes ready. + * @param watch$ the observable that triggers emissions + * @returns An observable that emits when `watch$` emits. The observable + * errors if its source stream errors. It also errors if `on` errors. It + * completes if its watch completes. + * + * @remarks This works like `audit`, but it repeats emissions when + * watch$ fires. + */ +export function on(watch$: Observable) { + return pipe( + connect>((source$) => { + const source = new ReplaySubject(1); + source$.subscribe(source); + + return watch$ + .pipe( + ready(source), + concatMap(() => source.pipe(first())), + ) + .pipe(takeUntil(anyComplete(source))); + }), + ); +}