diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts index 8a1143e29f..9a26ae1e50 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts @@ -108,24 +108,32 @@ describe("DefaultActiveUserState", () => { // #3 switched state to initial state for user2 expect(emissions).toEqual([state1, updatedState, state2]); - // Should be called three time to get state, once for each user and once for the update - expect(diskStorageService.mock.get).toHaveBeenCalledTimes(3); + // Should be called 4 time to get state, update state for user, emitting update, and switching users + expect(diskStorageService.mock.get).toHaveBeenCalledTimes(4); + // Initial subscribe to state$ expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", any(), // options ); + // The updating of state for user1 expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 2, "user_00000000-0000-1000-a000-000000000001_fake_fake", any(), // options ); + // The emission from being actively subscribed to user1 expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 3, + "user_00000000-0000-1000-a000-000000000001_fake_fake", + any(), // options + ); + // Switch to user2 + expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( + 4, "user_00000000-0000-1000-a000-000000000002_fake_fake", any(), // options ); - // Should only have saved data for the first user expect(diskStorageService.mock.save).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.ts b/libs/common/src/platform/state/implementations/default-active-user-state.ts index 595717d3f2..de84f058d3 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.ts @@ -47,7 +47,7 @@ export class DefaultActiveUserState implements ActiveUserState { // We only care about the UserId but we do want to know about no user as well. map((a) => a?.id), // To avoid going to storage when we don't need to, only get updates when there is a true change. - distinctUntilChanged(), + distinctUntilChanged((a, b) => (a == null || b == null ? a == b : a === b)), // Treat null and undefined as equal ); const userChangeAndInitial$ = this.activeUserId$.pipe( @@ -157,15 +157,22 @@ export class DefaultActiveUserState implements ActiveUserState { * The expectation is that that await is already done */ protected async getStateForUpdate() { - const [userId, data] = await firstValueFrom( - this.combinedState$.pipe( + const userId = await firstValueFrom( + this.activeUserId$.pipe( timeout({ first: 1000, - with: () => throwError(() => new Error("No active user at this time.")), + with: () => throwError(() => new Error("Timeout while retrieving active user.")), }), ), ); - return [userKeyBuilder(userId, this.keyDefinition), data] as const; + if (userId == null) { + throw new Error("No active user at this time."); + } + const fullKey = userKeyBuilder(userId, this.keyDefinition); + return [ + fullKey, + await getStoredValue(fullKey, this.chosenStorageLocation, this.keyDefinition.deserializer), + ] as const; } protected saveToStorage(key: string, data: T): Promise { diff --git a/libs/common/src/platform/state/implementations/default-global-state.spec.ts b/libs/common/src/platform/state/implementations/default-global-state.spec.ts index 35ce0fa098..7bcac23ffc 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.spec.ts @@ -3,7 +3,6 @@ * @jest-environment ../shared/test.environment.ts */ -import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -309,29 +308,11 @@ describe("DefaultGlobalState", () => { return newData; }); }); - - test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT - const val = await globalState.update((state) => { - return newData; - }); - - expect(val).toEqual(newData); - const call = diskStorageService.mock.save.mock.calls[0]; - expect(call[0]).toEqual("global_fake_fake"); - expect(call[1]).toEqual(newData); - }); }); describe("cleanup", () => { - async function assertClean() { - const emissions = trackEmissions(globalState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(globalKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates + function assertClean() { + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0); } it("should cleanup after last subscriber", async () => { @@ -339,11 +320,10 @@ describe("DefaultGlobalState", () => { await awaitAsync(); // storage updates are behind a promise subscription.unsubscribe(); - expect(globalState["subscriberCount"].getValue()).toBe(0); // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - await assertClean(); + assertClean(); }); it("should not cleanup if there are still subscribers", async () => { @@ -357,7 +337,7 @@ describe("DefaultGlobalState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - expect(globalState["subscriberCount"].getValue()).toBe(1); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); // Still be listening to storage updates diskStorageService.save(globalKey, newData); @@ -368,7 +348,7 @@ describe("DefaultGlobalState", () => { // Wait for cleanup await awaitAsync(cleanupDelayMs * 2); - await assertClean(); + assertClean(); }); it("can re-initialize after cleanup", async () => { @@ -396,12 +376,11 @@ describe("DefaultGlobalState", () => { await awaitAsync(); subscription.unsubscribe(); - expect(globalState["subscriberCount"].getValue()).toBe(0); + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); // Do not wait long enough for cleanup await awaitAsync(cleanupDelayMs / 2); - expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates + expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1); }); it("state$ observables are durable to cleanup", async () => { diff --git a/libs/common/src/platform/state/implementations/default-global-state.ts b/libs/common/src/platform/state/implementations/default-global-state.ts index 73a3fe5d04..db29619407 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.ts @@ -1,11 +1,14 @@ import { - BehaviorSubject, Observable, - Subscription, + ReplaySubject, + defer, filter, firstValueFrom, + merge, + share, switchMap, timeout, + timer, } from "rxjs"; import { @@ -17,30 +20,43 @@ import { KeyDefinition, globalKeyBuilder } from "../key-definition"; import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options"; import { getStoredValue } from "./util"; -const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultGlobalState implements GlobalState { private storageKey: string; private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; - private reinitialize = false; - protected stateSubject: BehaviorSubject = new BehaviorSubject< - T | typeof FAKE_DEFAULT - >(FAKE_DEFAULT); - - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + readonly state$: Observable; constructor( private keyDefinition: KeyDefinition, private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = globalKeyBuilder(this.keyDefinition); + const initialStorageGet$ = defer(() => { + return getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer); + }); + + const latestStorage$ = this.chosenLocation.updates$.pipe( + filter((s) => s.key === this.storageKey), + switchMap(async (storageUpdate) => { + if (storageUpdate.updateType === "remove") { + return null; + } + + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); + }), + ); + + this.state$ = merge(initialStorageGet$, latestStorage$).pipe( + share({ + connector: () => new ReplaySubject(1), + resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs), + }), + ); } async update( @@ -80,63 +96,15 @@ export class DefaultGlobalState implements GlobalState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenLocation.updates$ - .pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await this.getFromState(); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to - // trigger populating it immediately. - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - // reinitialize listeners after cleanup - if (this.reinitialize) { - this.reinitialize = false; - this.initializeObservable(); - } - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i != FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); - } - /** For use in update methods, does not wait for update to complete before yielding state. * The expectation is that that await is already done */ private async getStateForUpdate() { - const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT - ? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer) - : currentValue; + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); } async getFromState(): Promise { @@ -149,25 +117,4 @@ export class DefaultGlobalState implements GlobalState { this.keyDefinition.deserializer, ); } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription.unsubscribe(); - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - this.reinitialize = true; - } - }, this.keyDefinition.cleanupDelayMs); - } } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.ts b/libs/common/src/platform/state/implementations/default-single-user-state.ts index 13541c6c4b..8cfccef291 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.ts @@ -28,8 +28,8 @@ export class DefaultSingleUserState implements SingleUserState { private storageKey: string; private updatePromise: Promise | null = null; - state$: Observable; - combinedState$: Observable>; + readonly state$: Observable; + readonly combinedState$: Observable>; constructor( readonly userId: UserId, @@ -107,6 +107,10 @@ export class DefaultSingleUserState implements SingleUserState { * The expectation is that that await is already done */ private async getStateForUpdate() { - return await firstValueFrom(this.state$); + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); } }