fix update loop when overwriting state from buffer (#8834)

This commit is contained in:
✨ Audrey ✨ 2024-04-19 13:12:17 -04:00 committed by GitHub
parent fffef95c5e
commit 1e67014158
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 57 additions and 62 deletions

View File

@ -87,9 +87,13 @@ export class BufferedKeyDefinition<Input, Output = Input, Dependency = true> {
}
/** Checks whether the input type can be converted to the output type.
* @returns `true` if the definition is valid, otherwise `false`.
* @returns `true` if the definition is defined and valid, otherwise `false`.
*/
isValid(input: Input, dependency: Dependency) {
if (input === null) {
return Promise.resolve(false);
}
const isValid = this.options?.isValid;
if (isValid) {
return isValid(input, dependency);

View File

@ -75,14 +75,16 @@ describe("BufferedState", () => {
it("rolls over pending values from the buffered state immediately by default", async () => {
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
await outputState.update(() => ({ foo: true, bar: false }));
const initialValue = { foo: true, bar: false };
await outputState.update(() => initialValue);
const bufferedState = new BufferedState(provider, BUFFER_KEY, outputState);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(BUFFER_KEY.toKeyDefinition(), bufferedValue, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
const result = await trackEmissions(bufferedState.state$);
await awaitAsync();
expect(result).toEqual(bufferedValue);
expect(result).toEqual([initialValue, bufferedValue]);
});
// also important for data migrations
@ -131,14 +133,16 @@ describe("BufferedState", () => {
});
const provider = new FakeStateProvider(accountService);
const outputState = provider.getUser(SomeUser, SOME_KEY);
await outputState.update(() => ({ foo: true, bar: false }));
const initialValue = { foo: true, bar: false };
await outputState.update(() => initialValue);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const bufferedValue = { foo: true, bar: true };
await provider.setUserState(bufferedKey.toKeyDefinition(), bufferedValue, SomeUser);
const result = await firstValueFrom(bufferedState.state$);
const result = await trackEmissions(bufferedState.state$);
await awaitAsync();
expect(result).toEqual(bufferedValue);
expect(result).toEqual([initialValue, bufferedValue]);
});
it("reads from the output state when shouldOverwrite returns a falsy value", async () => {
@ -274,7 +278,7 @@ describe("BufferedState", () => {
await bufferedState.buffer(bufferedValue);
await awaitAsync();
expect(result).toEqual([firstValue, firstValue]);
expect(result).toEqual([firstValue]);
});
it("replaces the output state when its dependency becomes true", async () => {
@ -296,7 +300,7 @@ describe("BufferedState", () => {
dependency.next(true);
await awaitAsync();
expect(result).toEqual([firstValue, firstValue, bufferedValue]);
expect(result).toEqual([firstValue, bufferedValue]);
});
it.each([[null], [undefined]])("ignores `%p`", async (bufferedValue) => {
@ -325,11 +329,13 @@ describe("BufferedState", () => {
await outputState.update(() => firstValue);
const bufferedState = new BufferedState(provider, bufferedKey, outputState);
const result = trackEmissions(bufferedState.state$);
const stateResult = trackEmissions(bufferedState.state$);
await bufferedState.buffer({ foo: true, bar: true });
await awaitAsync();
const bufferedResult = await firstValueFrom(bufferedState.bufferedState$);
expect(result).toEqual([firstValue, firstValue]);
expect(stateResult).toEqual([firstValue]);
expect(bufferedResult).toBeNull();
});
it("overwrites the output when isValid returns true", async () => {

View File

@ -1,4 +1,4 @@
import { Observable, combineLatest, concatMap, filter, map, of } from "rxjs";
import { Observable, combineLatest, concatMap, filter, map, of, concat, merge } from "rxjs";
import {
StateProvider,
@ -33,68 +33,53 @@ export class BufferedState<Input, Output, Dependency> implements SingleUserState
private output: SingleUserState<Output>,
dependency$: Observable<Dependency> = null,
) {
this.bufferState = provider.getUser(output.userId, key.toKeyDefinition());
this.bufferedState = provider.getUser(output.userId, key.toKeyDefinition());
const watching = [
this.bufferState.state$,
this.output.state$,
dependency$ ?? of(true as unknown as Dependency),
] as const;
this.state$ = combineLatest(watching).pipe(
concatMap(async ([input, output, dependency]) => {
const normalized = input ?? null;
const canOverwrite = normalized !== null && key.shouldOverwrite(dependency);
if (canOverwrite) {
await this.updateOutput(dependency);
// prevent duplicate updates by suppressing the update
return [false, output] as const;
// overwrite the output value
const hasValue$ = concat(of(null), this.bufferedState.state$).pipe(
map((buffer) => (buffer ?? null) !== null),
);
const overwriteDependency$ = (dependency$ ?? of(true as unknown as Dependency)).pipe(
map((dependency) => [key.shouldOverwrite(dependency), dependency] as const),
);
const overwrite$ = combineLatest([hasValue$, overwriteDependency$]).pipe(
concatMap(async ([hasValue, [shouldOverwrite, dependency]]) => {
if (hasValue && shouldOverwrite) {
await this.overwriteOutput(dependency);
}
return [true, output] as const;
return [false, null] as const;
}),
filter(([updated]) => updated),
);
// drive overwrites only when there's a subscription;
// the output state determines when emissions occur
const output$ = this.output.state$.pipe(map((output) => [true, output] as const));
this.state$ = merge(overwrite$, output$).pipe(
filter(([emit]) => emit),
map(([, output]) => output),
);
this.combinedState$ = this.state$.pipe(map((state) => [this.output.userId, state]));
this.bufferState$ = this.bufferState.state$;
this.bufferedState$ = this.bufferedState.state$;
}
private bufferState: SingleUserState<Input>;
private bufferedState: SingleUserState<Input>;
private async updateOutput(dependency: Dependency) {
// retrieve the latest input value
let input: Input;
await this.bufferState.update((state) => state, {
shouldUpdate: (state) => {
input = state;
return false;
},
private async overwriteOutput(dependency: Dependency) {
// take the latest value from the buffer
let buffered: Input;
await this.bufferedState.update((state) => {
buffered = state ?? null;
return null;
});
// bail if this update lost the race with the last update
if (input === null) {
return;
// update the output state
const isValid = await this.key.isValid(buffered, dependency);
if (isValid) {
const output = await this.key.map(buffered, dependency);
await this.output.update(() => output);
}
// destroy invalid data and bail
if (!(await this.key.isValid(input, dependency))) {
await this.bufferState.update(() => null);
return;
}
// overwrite anything left to the output; the updates need to be awaited with `Promise.all`
// so that `inputState.update(() => null)` runs before `shouldUpdate` reads the value (above).
// This lets the emission from `this.outputState.update` renter the `concatMap`. If the
// awaits run in sequence, it can win the race and cause a double emission.
const output = await this.key.map(input, dependency);
await Promise.all([this.output.update(() => output), this.bufferState.update(() => null)]);
return;
}
/** {@link SingleUserState.userId} */
@ -119,14 +104,14 @@ export class BufferedState<Input, Output, Dependency> implements SingleUserState
async buffer(value: Input): Promise<void> {
const normalized = value ?? null;
if (normalized !== null) {
await this.bufferState.update(() => normalized);
await this.bufferedState.update(() => normalized);
}
}
/** The data presently being buffered. This emits the pending value each time
* new buffer data is provided. It emits null when the buffer is empty.
*/
readonly bufferState$: Observable<Input>;
readonly bufferedState$: Observable<Input>;
/** Updates the output state.
* @param configureState a callback that returns an updated output