diff --git a/libs/common/src/tools/generator/state/buffered-key-definition.ts b/libs/common/src/tools/generator/state/buffered-key-definition.ts index 5457410f80..1f11280839 100644 --- a/libs/common/src/tools/generator/state/buffered-key-definition.ts +++ b/libs/common/src/tools/generator/state/buffered-key-definition.ts @@ -87,9 +87,13 @@ export class BufferedKeyDefinition { } /** Checks whether the input type can be converted to the output type. - * @returns `true` if the definition is valid, otherwise `false`. + * @returns `true` if the definition is defined and valid, otherwise `false`. */ isValid(input: Input, dependency: Dependency) { + if (input === null) { + return Promise.resolve(false); + } + const isValid = this.options?.isValid; if (isValid) { return isValid(input, dependency); diff --git a/libs/common/src/tools/generator/state/buffered-state.spec.ts b/libs/common/src/tools/generator/state/buffered-state.spec.ts index 7f9722d384..46e132c1bd 100644 --- a/libs/common/src/tools/generator/state/buffered-state.spec.ts +++ b/libs/common/src/tools/generator/state/buffered-state.spec.ts @@ -75,14 +75,16 @@ describe("BufferedState", () => { it("rolls over pending values from the buffered state immediately by default", async () => { const provider = new FakeStateProvider(accountService); const outputState = provider.getUser(SomeUser, SOME_KEY); - await outputState.update(() => ({ foo: true, bar: false })); + const initialValue = { foo: true, bar: false }; + await outputState.update(() => initialValue); const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState); const bufferedValue = { foo: true, bar: true }; await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser); - const result = await firstValueFrom(bufferedState.state$); + const result = await trackEmissions(bufferedState.state$); + await awaitAsync(); - expect(result).toEqual(bufferedValue); + expect(result).toEqual([initialValue, bufferedValue]); }); // also important for data migrations @@ -131,14 +133,16 @@ describe("BufferedState", () => { }); const provider = new FakeStateProvider(accountService); const outputState = provider.getUser(SomeUser, SOME_KEY); - await outputState.update(() => ({ foo: true, bar: false })); + const initialValue = { foo: true, bar: false }; + await outputState.update(() => initialValue); const bufferedState = new BufferedState(provider, bufferedKey, outputState); const bufferedValue = { foo: true, bar: true }; await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser); - const result = await firstValueFrom(bufferedState.state$); + const result = await trackEmissions(bufferedState.state$); + await awaitAsync(); - expect(result).toEqual(bufferedValue); + expect(result).toEqual([initialValue, bufferedValue]); }); it("reads from the output state when shouldOverwrite returns a falsy value", async () => { @@ -274,7 +278,7 @@ describe("BufferedState", () => { await bufferedState.buffer(bufferedValue); await awaitAsync(); - expect(result).toEqual([firstValue, firstValue]); + expect(result).toEqual([firstValue]); }); it("replaces the output state when its dependency becomes true", async () => { @@ -296,7 +300,7 @@ describe("BufferedState", () => { dependency.next(true); await awaitAsync(); - expect(result).toEqual([firstValue, firstValue, bufferedValue]); + expect(result).toEqual([firstValue, bufferedValue]); }); it.each([[null], [undefined]])("ignores `%p`", async (bufferedValue) => { @@ -325,11 +329,13 @@ describe("BufferedState", () => { await outputState.update(() => firstValue); const bufferedState = new BufferedState(provider, bufferedKey, outputState); - const result = trackEmissions(bufferedState.state$); + const stateResult = trackEmissions(bufferedState.state$); await bufferedState.buffer({ foo: true, bar: true }); await awaitAsync(); + const bufferedResult = await firstValueFrom(bufferedState.bufferedState$); - expect(result).toEqual([firstValue, firstValue]); + expect(stateResult).toEqual([firstValue]); + expect(bufferedResult).toBeNull(); }); it("overwrites the output when isValid returns true", async () => { diff --git a/libs/common/src/tools/generator/state/buffered-state.ts b/libs/common/src/tools/generator/state/buffered-state.ts index 42b14b815c..bb4de645e9 100644 --- a/libs/common/src/tools/generator/state/buffered-state.ts +++ b/libs/common/src/tools/generator/state/buffered-state.ts @@ -1,4 +1,4 @@ -import { Observable, combineLatest, concatMap, filter, map, of } from "rxjs"; +import { Observable, combineLatest, concatMap, filter, map, of, concat, merge } from "rxjs"; import { StateProvider, @@ -33,68 +33,53 @@ export class BufferedState implements SingleUserState private output: SingleUserState, dependency$: Observable = null, ) { - this.bufferState = provider.getUser(output.userId, key.toKeyDefinition()); + this.bufferedState = provider.getUser(output.userId, key.toKeyDefinition()); - const watching = [ - this.bufferState.state$, - this.output.state$, - dependency$ ?? of(true as unknown as Dependency), - ] as const; - - this.state$ = combineLatest(watching).pipe( - concatMap(async ([input, output, dependency]) => { - const normalized = input ?? null; - - const canOverwrite = normalized !== null && key.shouldOverwrite(dependency); - if (canOverwrite) { - await this.updateOutput(dependency); - - // prevent duplicate updates by suppressing the update - return [false, output] as const; + // overwrite the output value + const hasValue$ = concat(of(null), this.bufferedState.state$).pipe( + map((buffer) => (buffer ?? null) !== null), + ); + const overwriteDependency$ = (dependency$ ?? of(true as unknown as Dependency)).pipe( + map((dependency) => [key.shouldOverwrite(dependency), dependency] as const), + ); + const overwrite$ = combineLatest([hasValue$, overwriteDependency$]).pipe( + concatMap(async ([hasValue, [shouldOverwrite, dependency]]) => { + if (hasValue && shouldOverwrite) { + await this.overwriteOutput(dependency); } - - return [true, output] as const; + return [false, null] as const; }), - filter(([updated]) => updated), + ); + + // drive overwrites only when there's a subscription; + // the output state determines when emissions occur + const output$ = this.output.state$.pipe(map((output) => [true, output] as const)); + this.state$ = merge(overwrite$, output$).pipe( + filter(([emit]) => emit), map(([, output]) => output), ); this.combinedState$ = this.state$.pipe(map((state) => [this.output.userId, state])); - this.bufferState$ = this.bufferState.state$; + this.bufferedState$ = this.bufferedState.state$; } - private bufferState: SingleUserState; + private bufferedState: SingleUserState; - private async updateOutput(dependency: Dependency) { - // retrieve the latest input value - let input: Input; - await this.bufferState.update((state) => state, { - shouldUpdate: (state) => { - input = state; - return false; - }, + private async overwriteOutput(dependency: Dependency) { + // take the latest value from the buffer + let buffered: Input; + await this.bufferedState.update((state) => { + buffered = state ?? null; + return null; }); - // bail if this update lost the race with the last update - if (input === null) { - return; + // update the output state + const isValid = await this.key.isValid(buffered, dependency); + if (isValid) { + const output = await this.key.map(buffered, dependency); + await this.output.update(() => output); } - - // destroy invalid data and bail - if (!(await this.key.isValid(input, dependency))) { - await this.bufferState.update(() => null); - return; - } - - // overwrite anything left to the output; the updates need to be awaited with `Promise.all` - // so that `inputState.update(() => null)` runs before `shouldUpdate` reads the value (above). - // This lets the emission from `this.outputState.update` renter the `concatMap`. If the - // awaits run in sequence, it can win the race and cause a double emission. - const output = await this.key.map(input, dependency); - await Promise.all([this.output.update(() => output), this.bufferState.update(() => null)]); - - return; } /** {@link SingleUserState.userId} */ @@ -119,14 +104,14 @@ export class BufferedState implements SingleUserState async buffer(value: Input): Promise { const normalized = value ?? null; if (normalized !== null) { - await this.bufferState.update(() => normalized); + await this.bufferedState.update(() => normalized); } } /** The data presently being buffered. This emits the pending value each time * new buffer data is provided. It emits null when the buffer is empty. */ - readonly bufferState$: Observable; + readonly bufferedState$: Observable; /** Updates the output state. * @param configureState a callback that returns an updated output