introduce additional rx helpers (#11361)

This commit is contained in:
✨ Audrey ✨ 2024-10-02 14:47:22 -04:00 committed by GitHub
parent 398f9be351
commit d1c3a98efb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 450 additions and 1 deletions

View File

@ -0,0 +1,352 @@
import { EmptyError, Subject, tap } from "rxjs";
import { anyComplete, on, ready } from "./rx";
describe("anyComplete", () => {
it("emits true when its input completes", () => {
const input$ = new Subject<void>();
const emissions: boolean[] = [];
anyComplete(input$).subscribe((e) => emissions.push(e));
input$.complete();
expect(emissions).toEqual([true]);
});
it("completes when its input is already complete", () => {
const input = new Subject<void>();
input.complete();
let completed = false;
anyComplete(input).subscribe({ complete: () => (completed = true) });
expect(completed).toBe(true);
});
it("completes when any input completes", () => {
const input$ = new Subject<void>();
const completing$ = new Subject<void>();
let completed = false;
anyComplete([input$, completing$]).subscribe({ complete: () => (completed = true) });
completing$.complete();
expect(completed).toBe(true);
});
it("ignores emissions", () => {
const input$ = new Subject<number>();
const emissions: boolean[] = [];
anyComplete(input$).subscribe((e) => emissions.push(e));
input$.next(1);
input$.next(2);
input$.complete();
expect(emissions).toEqual([true]);
});
it("forwards errors", () => {
const input$ = new Subject<void>();
const expected = { some: "error" };
let error = null;
anyComplete(input$).subscribe({ error: (e: unknown) => (error = e) });
input$.error(expected);
expect(error).toEqual(expected);
});
});
describe("ready", () => {
it("connects when subscribed", () => {
const watch$ = new Subject<void>();
let connected = false;
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
// precondition: ready$ should be cold
const ready$ = source$.pipe(ready(watch$));
expect(connected).toBe(false);
ready$.subscribe();
expect(connected).toBe(true);
});
it("suppresses source emissions until its watch emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const results: number[] = [];
ready$.subscribe((n) => results.push(n));
// precondition: no emissions
source$.next(1);
expect(results).toEqual([]);
watch$.next();
expect(results).toEqual([1]);
});
it("suppresses source emissions until all watches emit", () => {
const watchA$ = new Subject<void>();
const watchB$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready([watchA$, watchB$]));
const results: number[] = [];
ready$.subscribe((n) => results.push(n));
// preconditions: no emissions
source$.next(1);
expect(results).toEqual([]);
watchA$.next();
expect(results).toEqual([]);
watchB$.next();
expect(results).toEqual([1]);
});
it("emits the last source emission when its watch emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const results: number[] = [];
ready$.subscribe((n) => results.push(n));
// precondition: no emissions
source$.next(1);
expect(results).toEqual([]);
source$.next(2);
watch$.next();
expect(results).toEqual([2]);
});
it("emits all source emissions after its watch emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const results: number[] = [];
ready$.subscribe((n) => results.push(n));
watch$.next();
source$.next(1);
source$.next(2);
expect(results).toEqual([1, 2]);
});
it("ignores repeated watch emissions", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const results: number[] = [];
ready$.subscribe((n) => results.push(n));
watch$.next();
source$.next(1);
watch$.next();
source$.next(2);
watch$.next();
expect(results).toEqual([1, 2]);
});
it("completes when its source completes", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
let completed = false;
ready$.subscribe({ complete: () => (completed = true) });
source$.complete();
expect(completed).toBeTruthy();
});
it("errors when its source errors", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const expected = { some: "error" };
let error = null;
ready$.subscribe({ error: (e: unknown) => (error = e) });
source$.error(expected);
expect(error).toEqual(expected);
});
it("errors when its watch errors", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
const expected = { some: "error" };
let error = null;
ready$.subscribe({ error: (e: unknown) => (error = e) });
watch$.error(expected);
expect(error).toEqual(expected);
});
it("errors when its watch completes before emitting", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const ready$ = source$.pipe(ready(watch$));
let error = null;
ready$.subscribe({ error: (e: unknown) => (error = e) });
watch$.complete();
expect(error).toBeInstanceOf(EmptyError);
});
});
describe("on", () => {
it("connects when subscribed", () => {
const watch$ = new Subject<void>();
let connected = false;
const source$ = new Subject<number>().pipe(tap({ subscribe: () => (connected = true) }));
// precondition: on$ should be cold
const on$ = source$.pipe(on(watch$));
expect(connected).toBeFalsy();
on$.subscribe();
expect(connected).toBeTruthy();
});
it("suppresses source emissions until `on` emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
// precondition: on$ should be cold
source$.next(1);
expect(results).toEqual([]);
watch$.next();
expect(results).toEqual([1]);
});
it("repeats source emissions when `on` emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
source$.next(1);
watch$.next();
watch$.next();
expect(results).toEqual([1, 1]);
});
it("updates source emissions when `on` emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
source$.next(1);
watch$.next();
source$.next(2);
watch$.next();
expect(results).toEqual([1, 2]);
});
it("emits a value when `on` emits before the source is ready", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
watch$.next();
source$.next(1);
expect(results).toEqual([1]);
});
it("ignores repeated `on` emissions before the source is ready", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
watch$.next();
watch$.next();
source$.next(1);
expect(results).toEqual([1]);
});
it("emits only the latest source emission when `on` emits", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const results: number[] = [];
source$.pipe(on(watch$)).subscribe((n) => results.push(n));
source$.next(1);
watch$.next();
source$.next(2);
source$.next(3);
watch$.next();
expect(results).toEqual([1, 3]);
});
it("completes when its source completes", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
let complete: boolean = false;
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
source$.complete();
expect(complete).toBeTruthy();
});
it("completes when its watch completes", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
let complete: boolean = false;
source$.pipe(on(watch$)).subscribe({ complete: () => (complete = true) });
watch$.complete();
expect(complete).toBeTruthy();
});
it("errors when its source errors", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const expected = { some: "error" };
let error = null;
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
source$.error(expected);
expect(error).toEqual(expected);
});
it("errors when its watch errors", () => {
const watch$ = new Subject<void>();
const source$ = new Subject<number>();
const expected = { some: "error" };
let error = null;
source$.pipe(on(watch$)).subscribe({ error: (e: unknown) => (error = e) });
watch$.error(expected);
expect(error).toEqual(expected);
});
});

View File

@ -1,4 +1,18 @@
import { map, pipe } from "rxjs";
import {
concat,
concatMap,
connect,
endWith,
first,
ignoreElements,
map,
Observable,
pipe,
race,
ReplaySubject,
takeUntil,
zip,
} from "rxjs";
import { reduceCollection, distinctIfShallowMatch } from "@bitwarden/common/tools/rx";
@ -37,3 +51,86 @@ export function newDefaultEvaluator<Target>() {
return pipe(map((_) => new DefaultPolicyEvaluator<Target>()));
};
}
/** Create an observable that, once subscribed, emits `true` then completes when
* any input completes. If an input is already complete when the subscription
* occurs, it emits immediately.
* @param watch$ the observable(s) to watch for completion; if an array is passed,
* null and undefined members are ignored. If `watch$` is empty, `anyComplete`
* will never complete.
* @returns An observable that emits `true` when any of its inputs
* complete. The observable forwards the first error from its input.
* @remarks This method is particularly useful in combination with `takeUntil` and
* streams that are not guaranteed to complete on their own.
*/
export function anyComplete(watch$: Observable<any> | Observable<any>[]): Observable<any> {
if (Array.isArray(watch$)) {
const completes$ = watch$
.filter((w$) => !!w$)
.map((w$) => w$.pipe(ignoreElements(), endWith(true)));
const completed$ = race(completes$);
return completed$;
} else {
return watch$.pipe(ignoreElements(), endWith(true));
}
}
/**
* Create an observable that delays the input stream until all watches have
* emitted a value. The watched values are not included in the source stream.
* The last emission from the source is output when all the watches have
* emitted at least once.
* @param watch$ the observable(s) to watch for readiness. If `watch$` is empty,
* `ready` will never emit.
* @returns An observable that emits when the source stream emits. The observable
* errors if one of its watches completes before emitting. It also errors if one
* of its watches errors.
*/
export function ready<T>(watch$: Observable<any> | Observable<any>[]) {
const watching$ = Array.isArray(watch$) ? watch$ : [watch$];
return pipe(
connect<T, Observable<T>>((source$) => {
// this subscription is safe because `source$` connects only after there
// is an external subscriber.
const source = new ReplaySubject<T>(1);
source$.subscribe(source);
// `concat` is subscribed immediately after it's returned, at which point
// `zip` blocks until all items in `watching$` are ready. If that occurs
// after `source$` is hot, then the replay subject sends the last-captured
// emission through immediately. Otherwise, `ready` waits for the next
// emission
return concat(zip(watching$).pipe(first(), ignoreElements()), source).pipe(
takeUntil(anyComplete(source)),
);
}),
);
}
/**
* Create an observable that emits the latest value of the source stream
* when `watch$` emits. If `watch$` emits before the stream emits, then
* an emission occurs as soon as a value becomes ready.
* @param watch$ the observable that triggers emissions
* @returns An observable that emits when `watch$` emits. The observable
* errors if its source stream errors. It also errors if `on` errors. It
* completes if its watch completes.
*
* @remarks This works like `audit`, but it repeats emissions when
* watch$ fires.
*/
export function on<T>(watch$: Observable<any>) {
return pipe(
connect<T, Observable<T>>((source$) => {
const source = new ReplaySubject<T>(1);
source$.subscribe(source);
return watch$
.pipe(
ready(source),
concatMap(() => source.pipe(first())),
)
.pipe(takeUntil(anyComplete(source)));
}),
);
}