Ps/avoid state emit until updated (#7198)

* Add a small default time to limit timing failures

* Handle subscription race conditions

* Add Symbols to tracked emission types

This is a bit of a cheat, but Symbols can't be cloned, so
we need to nudge them to something we can handle.
They are rare enough that anyone hitting this is likely to
expect some special handling.

* Ref count state listeners to minimize storage activity

* Ensure statuses are updated

* Remove notes

* Use `test` when gramatically more proper

* Copy race and subscription improvements to single user

* Simplify observer initialization

* Correct parameter names

* Simplify update promises

test we don't accidentally deadlock along the `getFromState` path

* Fix save mock

* WIP: most tests working

* Avoid infinite update loop

* Avoid potential deadlocks with awaiting assigned promises

We were awaiting a promise assigned in a thenable. It turns out that
assignment occurs before all thenables are concatenated, which can cause
deadlocks. Likely, these were not showing up in tests because we're
using very quick memory storage.

* Fix update deadlock test

* Add user update tests

* Assert no double emit for multiple observers

* Add use intent to method name

* Ensure new subscriptions receive only newest data

TODO: is this worth doing for active user state?

* Remove unnecessary design requirement

We don't need to await an executing update promise, we
can support two emissions as long as the observable is
guaranteed to get the new data.

* Cleanup await spam

* test cleanup option behavior

* Remove unnecessary typecast

* Throw over coerce for definition options

* Fix state$ observable durability on cleanup
This commit is contained in:
Matt Gibson 2023-12-13 08:06:24 -05:00 committed by GitHub
parent d4c4d345d1
commit fd85d13b18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1207 additions and 207 deletions

View File

@ -59,7 +59,7 @@ export class FakeStorageService implements AbstractStorageService {
return Promise.resolve(this.store[key] != null); return Promise.resolve(this.store[key] != null);
} }
save<T>(key: string, obj: T, options?: StorageOptions): Promise<void> { save<T>(key: string, obj: T, options?: StorageOptions): Promise<void> {
this.mock.save(key, options); this.mock.save(key, obj, options);
this.store[key] = obj; this.store[key] = obj;
this.updatesSubject.next({ key: key, updateType: "save" }); this.updatesSubject.next({ key: key, updateType: "save" });
return Promise.resolve(); return Promise.resolve();

View File

@ -69,6 +69,10 @@ export function trackEmissions<T>(observable: Observable<T>): T[] {
case "boolean": case "boolean":
emissions.push(value); emissions.push(value);
break; break;
case "symbol":
// Cheating types to make symbols work at all
emissions.push(value.toString() as T);
break;
default: { default: {
emissions.push(clone(value)); emissions.push(clone(value));
} }
@ -85,7 +89,7 @@ function clone(value: any): any {
} }
} }
export async function awaitAsync(ms = 0) { export async function awaitAsync(ms = 1) {
if (ms < 1) { if (ms < 1) {
await Promise.resolve(); await Promise.resolve();
} else { } else {

View File

@ -2,7 +2,7 @@
* need to update test environment so trackEmissions works appropriately * need to update test environment so trackEmissions works appropriately
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { any, mock } from "jest-mock-extended"; import { any, anySymbol, mock } from "jest-mock-extended";
import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -11,7 +11,7 @@ import { FakeStorageService } from "../../../../spec/fake-storage.service";
import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service";
import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status";
import { UserId } from "../../../types/guid"; import { UserId } from "../../../types/guid";
import { KeyDefinition } from "../key-definition"; import { KeyDefinition, userKeyBuilder } from "../key-definition";
import { StateDefinition } from "../state-definition"; import { StateDefinition } from "../state-definition";
import { DefaultActiveUserState } from "./default-active-user-state"; import { DefaultActiveUserState } from "./default-active-user-state";
@ -32,9 +32,10 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
describe("DefaultActiveUserState", () => { describe("DefaultActiveUserState", () => {
@ -56,10 +57,14 @@ describe("DefaultActiveUserState", () => {
); );
}); });
const makeUserId = (id: string) => {
return id != null ? (`00000000-0000-1000-a000-00000000000${id}` as UserId) : undefined;
};
const changeActiveUser = async (id: string) => { const changeActiveUser = async (id: string) => {
const userId = id != null ? `00000000-0000-1000-a000-00000000000${id}` : undefined; const userId = makeUserId(id);
activeAccountSubject.next({ activeAccountSubject.next({
id: userId as UserId, id: userId,
email: `test${id}@example.com`, email: `test${id}@example.com`,
name: `Test User ${id}`, name: `Test User ${id}`,
status: AuthenticationStatus.Unlocked, status: AuthenticationStatus.Unlocked,
@ -90,7 +95,7 @@ describe("DefaultActiveUserState", () => {
const emissions = trackEmissions(userState.state$); const emissions = trackEmissions(userState.state$);
// User signs in // User signs in
changeActiveUser("1"); await changeActiveUser("1");
await awaitAsync(); await awaitAsync();
// Service does an update // Service does an update
@ -111,17 +116,17 @@ describe("DefaultActiveUserState", () => {
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
1, 1,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), any(), // options
); );
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
2, 2,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), any(), // options
); );
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
3, 3,
"user_00000000-0000-1000-a000-000000000002_fake_fake", "user_00000000-0000-1000-a000-000000000002_fake_fake",
any(), any(), // options
); );
// Should only have saved data for the first user // Should only have saved data for the first user
@ -129,7 +134,8 @@ describe("DefaultActiveUserState", () => {
expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( expect(diskStorageService.mock.save).toHaveBeenNthCalledWith(
1, 1,
"user_00000000-0000-1000-a000-000000000001_fake_fake", "user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), updatedState,
any(), // options
); );
}); });
@ -183,15 +189,17 @@ describe("DefaultActiveUserState", () => {
}); });
it("should not emit a previous users value if that user is no longer active", async () => { it("should not emit a previous users value if that user is no longer active", async () => {
diskStorageService.internalUpdateStore({ const user1Data: Jsonify<TestState> = {
"user_00000000-0000-1000-a000-000000000001_fake_fake": {
date: "2020-09-21T13:14:17.648Z", date: "2020-09-21T13:14:17.648Z",
array: ["value"], array: ["value"],
} as Jsonify<TestState>, };
"user_00000000-0000-1000-a000-000000000002_fake_fake": { const user2Data: Jsonify<TestState> = {
date: "2020-09-21T13:14:17.648Z", date: "2020-09-21T13:14:17.648Z",
array: [], array: [],
} as Jsonify<TestState>, };
diskStorageService.internalUpdateStore({
"user_00000000-0000-1000-a000-000000000001_fake_fake": user1Data,
"user_00000000-0000-1000-a000-000000000002_fake_fake": user2Data,
}); });
// This starts one subscription on the observable for tracking emissions throughout // This starts one subscription on the observable for tracking emissions throughout
@ -203,7 +211,7 @@ describe("DefaultActiveUserState", () => {
// This should always return a value right await // This should always return a value right await
const value = await firstValueFrom(userState.state$); const value = await firstValueFrom(userState.state$);
expect(value).toBeTruthy(); expect(value).toEqual(user1Data);
// Make it such that there is no active user // Make it such that there is no active user
await changeActiveUser(undefined); await changeActiveUser(undefined);
@ -222,20 +230,34 @@ describe("DefaultActiveUserState", () => {
rejectedError = err; rejectedError = err;
}); });
expect(resolvedValue).toBeFalsy(); expect(resolvedValue).toBeUndefined();
expect(rejectedError).toBeTruthy(); expect(rejectedError).not.toBeUndefined();
expect(rejectedError.message).toBe("Timeout has occurred"); expect(rejectedError.message).toBe("Timeout has occurred");
// We need to figure out if something should be emitted // We need to figure out if something should be emitted
// when there becomes no active user, if we don't want that to emit // when there becomes no active user, if we don't want that to emit
// this value is correct. // this value is correct.
expect(emissions).toHaveLength(2); expect(emissions).toEqual([user1Data]);
});
it("should not emit twice if there are two listeners", async () => {
await changeActiveUser("1");
const emissions = trackEmissions(userState.state$);
const emissions2 = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([
null, // Initial value
]);
expect(emissions2).toEqual([
null, // Initial value
]);
}); });
describe("update", () => { describe("update", () => {
const newData = { date: new Date(), array: ["test"] }; const newData = { date: new Date(), array: ["test"] };
beforeEach(async () => { beforeEach(async () => {
changeActiveUser("1"); await changeActiveUser("1");
}); });
it("should save on update", async () => { it("should save on update", async () => {
@ -315,6 +337,8 @@ describe("DefaultActiveUserState", () => {
return initialData; return initialData;
}); });
await awaitAsync();
await userState.update((state, dependencies) => { await userState.update((state, dependencies) => {
expect(state).toEqual(initialData); expect(state).toEqual(initialData);
return newData; return newData;
@ -329,4 +353,303 @@ describe("DefaultActiveUserState", () => {
]); ]);
}); });
}); });
describe("update races", () => {
const newData = { date: new Date(), array: ["test"] };
const userId = makeUserId("1");
beforeEach(async () => {
await changeActiveUser("1");
await awaitAsync();
});
test("subscriptions during an update should receive the current and latest", async () => {
const oldData = { date: new Date(2019, 1, 1), array: ["oldValue1"] };
await userState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(userState.state$);
await originalSave(key, obj);
});
const val = await userState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await userState.update(
(state) => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(userState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await Promise.race([
userState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await userState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual(`user_${userId}_fake_fake`);
expect(call[1]).toEqual(newData);
});
it("does not await updates if the active user changes", async () => {
const initialUserId = (await firstValueFrom(accountService.activeAccount$)).id;
expect(initialUserId).toBe(userId);
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await changeActiveUser("2");
await Promise.race([
userState.update(() => {
// should not deadlock because we updated the user
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(true);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
});
it("stores updates for users in the correct place when active user changes mid-update", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const user2Data = { date: new Date(), array: ["user 2 data"] };
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async (key: string, obj: any) => {
let resolved = false;
await changeActiveUser("2");
await Promise.race([
userState.update(() => {
// should not deadlock because we updated the user
resolved = true;
return user2Data;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(true);
await originalSave(key, obj);
})
.mockImplementation((...args) => {
return originalSave(...args);
});
await userState.update(() => {
return newData;
});
await awaitAsync();
expect(diskStorageService.mock.save).toHaveBeenCalledTimes(2);
const innerCall = diskStorageService.mock.save.mock.calls[0];
expect(innerCall[0]).toEqual(`user_${makeUserId("2")}_fake_fake`);
expect(innerCall[1]).toEqual(user2Data);
const outerCall = diskStorageService.mock.save.mock.calls[1];
expect(outerCall[0]).toEqual(`user_${makeUserId("1")}_fake_fake`);
expect(outerCall[1]).toEqual(newData);
});
});
describe("cleanup", () => {
const newData = { date: new Date(), array: ["test"] };
const userId = makeUserId("1");
let userKey: string;
beforeEach(async () => {
await changeActiveUser("1");
userKey = userKeyBuilder(userId, testKeyDefinition);
});
async function assertClean() {
const emissions = trackEmissions(userState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = userState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(userState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(userState.state$);
await awaitAsync();
diskStorageService.save(userKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
it("state$ observables are durable to cleanup", async () => {
const observable = userState.state$;
let subscription = observable.subscribe();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
subscription = observable.subscribe();
await diskStorageService.save(userKey, newData);
await awaitAsync();
expect(await firstValueFrom(observable)).toEqual(newData);
});
});
}); });

View File

@ -4,12 +4,12 @@ import {
map, map,
shareReplay, shareReplay,
switchMap, switchMap,
tap,
defer,
firstValueFrom, firstValueFrom,
combineLatestWith, combineLatestWith,
filter, filter,
timeout, timeout,
Subscription,
tap,
} from "rxjs"; } from "rxjs";
import { AccountService } from "../../../auth/abstractions/account.service"; import { AccountService } from "../../../auth/abstractions/account.service";
@ -31,13 +31,22 @@ const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultActiveUserState<T> implements ActiveUserState<T> { export class DefaultActiveUserState<T> implements ActiveUserState<T> {
[activeMarker]: true; [activeMarker]: true;
private formattedKey$: Observable<string>; private formattedKey$: Observable<string>;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private activeAccountUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
private reinitialize = false;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
private stateSubject$ = this.stateSubject.asObservable(); private stateSubject$ = this.stateSubject.asObservable();
state$: Observable<T>; get state$() {
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
protected keyDefinition: KeyDefinition<T>, protected keyDefinition: KeyDefinition<T>,
@ -51,62 +60,12 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
? userKeyBuilder(account.id, this.keyDefinition) ? userKeyBuilder(account.id, this.keyDefinition)
: null, : null,
), ),
tap(() => {
// We have a new key, so we should forget about previous update promises
this.updatePromise = null;
}),
shareReplay({ bufferSize: 1, refCount: false }), shareReplay({ bufferSize: 1, refCount: false }),
); );
const activeAccountData$ = this.formattedKey$.pipe(
switchMap(async (key) => {
if (key == null) {
return FAKE_DEFAULT;
}
return await getStoredValue(
key,
this.chosenStorageLocation,
this.keyDefinition.deserializer,
);
}),
// Share the execution
shareReplay({ refCount: false, bufferSize: 1 }),
);
const storageUpdates$ = this.chosenStorageLocation.updates$.pipe(
combineLatestWith(this.formattedKey$),
filter(([update, key]) => key !== null && update.key === key),
switchMap(async ([update, key]) => {
if (update.updateType === "remove") {
return null;
}
const data = await getStoredValue(
key,
this.chosenStorageLocation,
this.keyDefinition.deserializer,
);
return data;
}),
);
// Whomever subscribes to this data, should be notified of updated data
// if someone calls my update() method, or the active user changes.
this.state$ = defer(() => {
const accountChangeSubscription = activeAccountData$.subscribe((data) => {
this.stateSubject.next(data);
});
const storageUpdateSubscription = storageUpdates$.subscribe((data) => {
this.stateSubject.next(data);
});
return this.stateSubject$.pipe(
tap({
complete: () => {
accountChangeSubscription.unsubscribe();
storageUpdateSubscription.unsubscribe();
},
}),
);
})
// I fake the generic here because I am filtering out the other union type
// and this makes it so that typescript understands the true type
.pipe(filter<T>((value) => value != FAKE_DEFAULT));
} }
async update<TCombine>( async update<TCombine>(
@ -114,8 +73,34 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
try {
if (this.updatePromise != null) {
await this.updatePromise;
}
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
// TODO: this should be removed
async getFromState(): Promise<T> {
const key = await this.createKey(); const key = await this.createKey();
const currentState = await this.getGuaranteedState(key); return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer);
}
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
) {
const key = await this.createKey();
const currentState = await this.getStateForUpdate(key);
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -130,13 +115,59 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
return newState; return newState;
} }
async getFromState(): Promise<T> { private initializeObservable() {
const key = await this.createKey(); this.storageUpdateSubscription = this.chosenStorageLocation.updates$
return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); .pipe(
combineLatestWith(this.formattedKey$),
filter(([update, key]) => key !== null && update.key === key),
switchMap(async ([update, key]) => {
if (update.updateType === "remove") {
return null;
}
return await this.getState(key);
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.activeAccountUpdateSubscription = this.formattedKey$
.pipe(
switchMap(async (key) => {
if (key == null) {
return FAKE_DEFAULT;
}
return await this.getState(key);
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
// reinitialize listeners after cleanup
if (this.reinitialize) {
this.reinitialize = false;
this.initializeObservable();
} }
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> { const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this); subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter((i) => i !== FAKE_DEFAULT),
)
.subscribe(subscriber);
});
} }
protected async createKey(): Promise<string> { protected async createKey(): Promise<string> {
@ -147,22 +178,47 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
return formattedKey; return formattedKey;
} }
protected async getGuaranteedState(key: string) { /** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
protected async getStateForUpdate(key: string) {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT ? await this.seedInitial(key) : currentValue; return currentValue === FAKE_DEFAULT
? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer)
: currentValue;
} }
private async seedInitial(key: string): Promise<T> { /** To be used in observables. Awaits updates to ensure they are complete */
const value = await getStoredValue( private async getState(key: string): Promise<T> {
key, if (this.updatePromise != null) {
this.chosenStorageLocation, await this.updatePromise;
this.keyDefinition.deserializer, }
); return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer);
this.stateSubject.next(value);
return value;
} }
protected saveToStorage(key: string, data: T): Promise<void> { protected saveToStorage(key: string, data: T): Promise<void> {
return this.chosenStorageLocation.save(key, data); return this.chosenStorageLocation.save(key, data);
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription?.unsubscribe();
this.activeAccountUpdateSubscription?.unsubscribe();
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
this.reinitialize = true;
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -3,6 +3,7 @@
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs"; import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -28,9 +29,10 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
const globalKey = globalKeyBuilder(testKeyDefinition); const globalKey = globalKeyBuilder(testKeyDefinition);
@ -79,6 +81,19 @@ describe("DefaultGlobalState", () => {
expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined); expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined);
expect(state).toBeTruthy(); expect(state).toBeTruthy();
}); });
it("should not emit twice if there are two listeners", async () => {
const emissions = trackEmissions(globalState.state$);
const emissions2 = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([
null, // Initial value
]);
expect(emissions2).toEqual([
null, // Initial value
]);
});
}); });
describe("update", () => { describe("update", () => {
@ -133,6 +148,7 @@ describe("DefaultGlobalState", () => {
it("should not update if shouldUpdate returns false", async () => { it("should not update if shouldUpdate returns false", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise
const result = await globalState.update( const result = await globalState.update(
(state) => { (state) => {
@ -198,4 +214,212 @@ describe("DefaultGlobalState", () => {
expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); expect(emissions).toEqual(expect.arrayContaining([initialState, newState]));
}); });
}); });
describe("update races", () => {
test("subscriptions during an update should receive the current and latest data", async () => {
const oldData = { date: new Date(2019, 1, 1) };
await globalState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1) };
await globalState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(globalState.state$);
await originalSave(key, obj);
});
const val = await globalState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) };
await globalState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await globalState.update(
() => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(globalState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(globalState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async () => {
let resolved = false;
await Promise.race([
globalState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation(originalSave);
await globalState.update((state) => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await globalState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual("global_fake_fake");
expect(call[1]).toEqual(newData);
});
});
describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(globalState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = globalState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = globalState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(globalState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(globalState.state$);
await awaitAsync();
diskStorageService.save(globalKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(globalKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
it("state$ observables are durable to cleanup", async () => {
const observable = globalState.state$;
let subscription = observable.subscribe();
await diskStorageService.save(globalKey, newData);
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
subscription = observable.subscribe();
await diskStorageService.save(globalKey, newData);
await awaitAsync();
expect(await firstValueFrom(observable)).toEqual(newData);
});
});
}); });

View File

@ -1,12 +1,10 @@
import { import {
BehaviorSubject, BehaviorSubject,
Observable, Observable,
defer, Subscription,
filter, filter,
firstValueFrom, firstValueFrom,
shareReplay,
switchMap, switchMap,
tap,
timeout, timeout,
} from "rxjs"; } from "rxjs";
@ -23,54 +21,26 @@ const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultGlobalState<T> implements GlobalState<T> { export class DefaultGlobalState<T> implements GlobalState<T> {
private storageKey: string; private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
private reinitialize = false;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
state$: Observable<T>; get state$() {
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
private keyDefinition: KeyDefinition<T>, private keyDefinition: KeyDefinition<T>,
private chosenLocation: AbstractStorageService & ObservableStorageService, private chosenLocation: AbstractStorageService & ObservableStorageService,
) { ) {
this.storageKey = globalKeyBuilder(this.keyDefinition); this.storageKey = globalKeyBuilder(this.keyDefinition);
const storageUpdates$ = this.chosenLocation.updates$.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
shareReplay({ bufferSize: 1, refCount: false }),
);
this.state$ = defer(() => {
const storageUpdateSubscription = storageUpdates$.subscribe((value) => {
this.stateSubject.next(value);
});
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return this.stateSubject.pipe(
tap({
complete: () => {
storageUpdateSubscription.unsubscribe();
},
}),
);
}).pipe(
shareReplay({ refCount: false, bufferSize: 1 }),
filter<T>((i) => i != FAKE_DEFAULT),
);
} }
async update<TCombine>( async update<TCombine>(
@ -78,7 +48,24 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
const currentState = await this.getGuaranteedState(); if (this.updatePromise != null) {
await this.updatePromise;
}
try {
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
): Promise<T> {
const currentState = await this.getStateForUpdate();
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -93,16 +80,94 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
return newState; return newState;
} }
private async getGuaranteedState() { private initializeObservable() {
this.storageUpdateSubscription = this.chosenLocation.updates$
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
// reinitialize listeners after cleanup
if (this.reinitialize) {
this.reinitialize = false;
this.initializeObservable();
}
const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
}
/** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; return currentValue === FAKE_DEFAULT
? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer)
: currentValue;
} }
async getFromState(): Promise<T> { async getFromState(): Promise<T> {
if (this.updatePromise != null) {
return await this.updatePromise;
}
return await getStoredValue( return await getStoredValue(
this.storageKey, this.storageKey,
this.chosenLocation, this.chosenLocation,
this.keyDefinition.deserializer, this.keyDefinition.deserializer,
); );
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
this.reinitialize = true;
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -3,6 +3,7 @@
* @jest-environment ../shared/test.environment.ts * @jest-environment ../shared/test.environment.ts
*/ */
import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs"; import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest"; import { Jsonify } from "type-fest";
@ -30,21 +31,22 @@ class TestState {
} }
const testStateDefinition = new StateDefinition("fake", "disk"); const testStateDefinition = new StateDefinition("fake", "disk");
const cleanupDelayMs = 10;
const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", { const testKeyDefinition = new KeyDefinition<TestState>(testStateDefinition, "fake", {
deserializer: TestState.fromJSON, deserializer: TestState.fromJSON,
cleanupDelayMs,
}); });
const userId = Utils.newGuid() as UserId; const userId = Utils.newGuid() as UserId;
const userKey = userKeyBuilder(userId, testKeyDefinition); const userKey = userKeyBuilder(userId, testKeyDefinition);
describe("DefaultSingleUserState", () => { describe("DefaultSingleUserState", () => {
let diskStorageService: FakeStorageService; let diskStorageService: FakeStorageService;
let globalState: DefaultSingleUserState<TestState>; let userState: DefaultSingleUserState<TestState>;
const newData = { date: new Date() }; const newData = { date: new Date() };
beforeEach(() => { beforeEach(() => {
diskStorageService = new FakeStorageService(); diskStorageService = new FakeStorageService();
globalState = new DefaultSingleUserState( userState = new DefaultSingleUserState(
userId, userId,
testKeyDefinition, testKeyDefinition,
null, // Not testing anything with encrypt service null, // Not testing anything with encrypt service
@ -58,7 +60,7 @@ describe("DefaultSingleUserState", () => {
describe("state$", () => { describe("state$", () => {
it("should emit when storage updates", async () => { it("should emit when storage updates", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await diskStorageService.save(userKey, newData); await diskStorageService.save(userKey, newData);
await awaitAsync(); await awaitAsync();
@ -69,7 +71,7 @@ describe("DefaultSingleUserState", () => {
}); });
it("should not emit when update key does not match", async () => { it("should not emit when update key does not match", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await diskStorageService.save("wrong_key", newData); await diskStorageService.save("wrong_key", newData);
expect(emissions).toHaveLength(0); expect(emissions).toHaveLength(0);
@ -82,7 +84,7 @@ describe("DefaultSingleUserState", () => {
}); });
diskStorageService.internalUpdateStore(initialStorage); diskStorageService.internalUpdateStore(initialStorage);
const state = await firstValueFrom(globalState.state$); const state = await firstValueFrom(userState.state$);
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1);
expect(diskStorageService.mock.get).toHaveBeenCalledWith( expect(diskStorageService.mock.get).toHaveBeenCalledWith(
`user_${userId}_fake_fake`, `user_${userId}_fake_fake`,
@ -94,7 +96,7 @@ describe("DefaultSingleUserState", () => {
describe("update", () => { describe("update", () => {
it("should save on update", async () => { it("should save on update", async () => {
const result = await globalState.update((state) => { const result = await userState.update((state) => {
return newData; return newData;
}); });
@ -103,10 +105,10 @@ describe("DefaultSingleUserState", () => {
}); });
it("should emit once per update", async () => { it("should emit once per update", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
await globalState.update((state) => { await userState.update((state) => {
return newData; return newData;
}); });
@ -119,12 +121,12 @@ describe("DefaultSingleUserState", () => {
}); });
it("should provided combined dependencies", async () => { it("should provided combined dependencies", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
const combinedDependencies = { date: new Date() }; const combinedDependencies = { date: new Date() };
await globalState.update( await userState.update(
(state, dependencies) => { (state, dependencies) => {
expect(dependencies).toEqual(combinedDependencies); expect(dependencies).toEqual(combinedDependencies);
return newData; return newData;
@ -143,9 +145,10 @@ describe("DefaultSingleUserState", () => {
}); });
it("should not update if shouldUpdate returns false", async () => { it("should not update if shouldUpdate returns false", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const result = await globalState.update( const result = await userState.update(
(state) => { (state) => {
return newData; return newData;
}, },
@ -160,18 +163,18 @@ describe("DefaultSingleUserState", () => {
}); });
it("should provide the update callback with the current State", async () => { it("should provide the update callback with the current State", async () => {
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
// Seed with interesting data // Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) }; const initialData = { date: new Date(2020, 1, 1) };
await globalState.update((state, dependencies) => { await userState.update((state, dependencies) => {
return initialData; return initialData;
}); });
await awaitAsync(); await awaitAsync();
await globalState.update((state) => { await userState.update((state) => {
expect(state).toEqual(initialData); expect(state).toEqual(initialData);
return newData; return newData;
}); });
@ -193,14 +196,14 @@ describe("DefaultSingleUserState", () => {
initialStorage[userKey] = initialState; initialStorage[userKey] = initialState;
diskStorageService.internalUpdateStore(initialStorage); diskStorageService.internalUpdateStore(initialStorage);
const emissions = trackEmissions(globalState.state$); const emissions = trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise await awaitAsync(); // storage updates are behind a promise
const newState = { const newState = {
...initialState, ...initialState,
date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1), date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1),
}; };
const actual = await globalState.update((existingState) => newState); const actual = await userState.update((existingState) => newState);
await awaitAsync(); await awaitAsync();
@ -209,4 +212,212 @@ describe("DefaultSingleUserState", () => {
expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); expect(emissions).toEqual(expect.arrayContaining([initialState, newState]));
}); });
}); });
describe("update races", () => {
test("subscriptions during an update should receive the current and latest data", async () => {
const oldData = { date: new Date(2019, 1, 1) };
await userState.update(() => {
return oldData;
});
const initialData = { date: new Date(2020, 1, 1) };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => {
emissions2 = trackEmissions(userState.state$);
await originalSave(key, obj);
});
const val = await userState.update(() => {
return newData;
});
await awaitAsync(10);
expect(val).toEqual(newData);
expect(emissions).toEqual([initialData, newData]);
expect(emissions2).toEqual([initialData, newData]);
});
test("subscription during an aborted update should receive the last value", async () => {
// Seed with interesting data
const initialData = { date: new Date(2020, 1, 1) };
await userState.update(() => {
return initialData;
});
await awaitAsync();
const emissions = trackEmissions(userState.state$);
await awaitAsync();
expect(emissions).toEqual([initialData]);
let emissions2: TestState[];
const val = await userState.update(
(state) => {
return newData;
},
{
shouldUpdate: () => {
emissions2 = trackEmissions(userState.state$);
return false;
},
},
);
await awaitAsync();
expect(val).toEqual(initialData);
expect(emissions).toEqual([initialData]);
expect(emissions2).toEqual([initialData]);
});
test("updates should wait until previous update is complete", async () => {
trackEmissions(userState.state$);
await awaitAsync(); // storage updates are behind a promise
const originalSave = diskStorageService.save.bind(diskStorageService);
diskStorageService.save = jest
.fn()
.mockImplementationOnce(async () => {
let resolved = false;
await Promise.race([
userState.update(() => {
// deadlocks
resolved = true;
return newData;
}),
awaitAsync(100), // limit test to 100ms
]);
expect(resolved).toBe(false);
})
.mockImplementation(originalSave);
await userState.update((state) => {
return newData;
});
});
test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await userState.update((state) => {
return newData;
});
expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual(`user_${userId}_fake_fake`);
expect(call[1]).toEqual(newData);
});
});
describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(userState["stateSubject"]);
const initial = structuredClone(emissions);
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(emissions).toEqual(initial); // no longer listening to storage updates
}
it("should cleanup after last subscriber", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("should not cleanup if there are still subscribers", async () => {
const subscription1 = userState.state$.subscribe();
const sub2Emissions: TestState[] = [];
const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v));
await awaitAsync(); // storage updates are behind a promise
subscription1.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
expect(userState["subscriberCount"].getValue()).toBe(1);
// Still be listening to storage updates
diskStorageService.save(userKey, newData);
await awaitAsync(); // storage updates are behind a promise
expect(sub2Emissions).toEqual([null, newData]);
subscription2.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
await assertClean();
});
it("can re-initialize after cleanup", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
const emissions = trackEmissions(userState.state$);
await awaitAsync();
diskStorageService.save(userKey, newData);
await awaitAsync();
expect(emissions).toEqual([null, newData]);
});
it("should not cleanup if a subscriber joins during the cleanup delay", async () => {
const subscription = userState.state$.subscribe();
await awaitAsync();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
expect(userState["subscriberCount"].getValue()).toBe(0);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);
expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
});
it("state$ observables are durable to cleanup", async () => {
const observable = userState.state$;
let subscription = observable.subscribe();
await diskStorageService.save(userKey, newData);
await awaitAsync();
subscription.unsubscribe();
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);
subscription = observable.subscribe();
await diskStorageService.save(userKey, newData);
await awaitAsync();
expect(await firstValueFrom(observable)).toEqual(newData);
});
});
}); });

View File

@ -1,12 +1,10 @@
import { import {
BehaviorSubject, BehaviorSubject,
Observable, Observable,
defer, Subscription,
filter, filter,
firstValueFrom, firstValueFrom,
shareReplay,
switchMap, switchMap,
tap,
timeout, timeout,
} from "rxjs"; } from "rxjs";
@ -23,16 +21,25 @@ import { Converter, SingleUserState } from "../user-state";
import { DefaultDerivedUserState } from "./default-derived-state"; import { DefaultDerivedUserState } from "./default-derived-state";
import { getStoredValue } from "./util"; import { getStoredValue } from "./util";
const FAKE_DEFAULT = Symbol("fakeDefault"); const FAKE_DEFAULT = Symbol("fakeDefault");
export class DefaultSingleUserState<T> implements SingleUserState<T> { export class DefaultSingleUserState<T> implements SingleUserState<T> {
private storageKey: string; private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
private reinitialize = false;
protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject< protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT); >(FAKE_DEFAULT);
state$: Observable<T>; get state$() {
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
constructor( constructor(
readonly userId: UserId, readonly userId: UserId,
@ -41,42 +48,6 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
private chosenLocation: AbstractStorageService & ObservableStorageService, private chosenLocation: AbstractStorageService & ObservableStorageService,
) { ) {
this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); this.storageKey = userKeyBuilder(this.userId, this.keyDefinition);
const storageUpdates$ = this.chosenLocation.updates$.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
shareReplay({ bufferSize: 1, refCount: false }),
);
this.state$ = defer(() => {
const storageUpdateSubscription = storageUpdates$.subscribe((value) => {
this.stateSubject.next(value);
});
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return this.stateSubject.pipe(
tap({
complete: () => {
storageUpdateSubscription.unsubscribe();
},
}),
);
}).pipe(
shareReplay({ refCount: false, bufferSize: 1 }),
filter<T>((i) => i != FAKE_DEFAULT),
);
} }
async update<TCombine>( async update<TCombine>(
@ -84,7 +55,28 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
options: StateUpdateOptions<T, TCombine> = {}, options: StateUpdateOptions<T, TCombine> = {},
): Promise<T> { ): Promise<T> {
options = populateOptionsWithDefault(options); options = populateOptionsWithDefault(options);
const currentState = await this.getGuaranteedState(); if (this.updatePromise != null) {
await this.updatePromise;
}
try {
this.updatePromise = this.internalUpdate(configureState, options);
const newState = await this.updatePromise;
return newState;
} finally {
this.updatePromise = null;
}
}
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> {
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this);
}
private async internalUpdate<TCombine>(
configureState: (state: T, dependency: TCombine) => T,
options: StateUpdateOptions<T, TCombine>,
): Promise<T> {
const currentState = await this.getStateForUpdate();
const combinedDependencies = const combinedDependencies =
options.combineLatestWith != null options.combineLatestWith != null
? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout)))
@ -99,20 +91,94 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
return newState; return newState;
} }
createDerived<TTo>(converter: Converter<T, TTo>): DerivedUserState<TTo> { private initializeObservable() {
return new DefaultDerivedUserState<T, TTo>(converter, this.encryptService, this); this.storageUpdateSubscription = this.chosenLocation.updates$
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));
this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});
// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});
return new Observable<T>((subscriber) => {
this.incrementSubscribers();
// reinitialize listeners after cleanup
if (this.reinitialize) {
this.reinitialize = false;
this.initializeObservable();
} }
private async getGuaranteedState() { const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};
return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
}
/** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue(); const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; return currentValue === FAKE_DEFAULT
? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer)
: currentValue;
} }
async getFromState(): Promise<T> { async getFromState(): Promise<T> {
if (this.updatePromise != null) {
return await this.updatePromise;
}
return await getStoredValue( return await getStoredValue(
this.storageKey, this.storageKey,
this.chosenLocation, this.chosenLocation,
this.keyDefinition.deserializer, this.keyDefinition.deserializer,
); );
} }
private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}
private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}
private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
this.reinitialize = true;
}
}, this.keyDefinition.cleanupDelayMs);
}
} }

View File

@ -18,6 +18,37 @@ describe("KeyDefinition", () => {
}); });
}); });
describe("cleanupDelayMs", () => {
it("defaults to 1000ms", () => {
const keyDefinition = new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
});
expect(keyDefinition).toBeTruthy();
expect(keyDefinition.cleanupDelayMs).toBe(1000);
});
it("can be overridden", () => {
const keyDefinition = new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
cleanupDelayMs: 500,
});
expect(keyDefinition).toBeTruthy();
expect(keyDefinition.cleanupDelayMs).toBe(500);
});
it.each([0, -1])("throws on 0 or negative (%s)", (testValue: number) => {
expect(
() =>
new KeyDefinition<boolean>(fakeStateDefinition, "fake", {
deserializer: (value) => value,
cleanupDelayMs: testValue,
}),
).toThrow();
});
});
describe("record", () => { describe("record", () => {
it("runs custom deserializer for each record value", () => { it("runs custom deserializer for each record value", () => {
const recordDefinition = KeyDefinition.record<boolean>(fakeStateDefinition, "fake", { const recordDefinition = KeyDefinition.record<boolean>(fakeStateDefinition, "fake", {

View File

@ -19,6 +19,11 @@ type KeyDefinitionOptions<T> = {
* @returns The fully typed version of your state. * @returns The fully typed version of your state.
*/ */
readonly deserializer: (jsonValue: Jsonify<T>) => T; readonly deserializer: (jsonValue: Jsonify<T>) => T;
/**
* The number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed.
* Defaults to 1000ms.
*/
readonly cleanupDelayMs?: number;
}; };
/** /**
@ -42,8 +47,12 @@ export class KeyDefinition<T> {
private readonly options: KeyDefinitionOptions<T>, private readonly options: KeyDefinitionOptions<T>,
) { ) {
if (options.deserializer == null) { if (options.deserializer == null) {
throw new Error(`'deserializer' is a required property on key ${this.errorKeyName}`);
}
if (options.cleanupDelayMs <= 0) {
throw new Error( throw new Error(
`'deserializer' is a required property on key ${stateDefinition.name} > ${key}`, `'cleanupDelayMs' must be greater than 0. Value of ${options.cleanupDelayMs} passed to key ${this.errorKeyName} `,
); );
} }
} }
@ -55,6 +64,13 @@ export class KeyDefinition<T> {
return this.options.deserializer; return this.options.deserializer;
} }
/**
* Gets the number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed.
*/
get cleanupDelayMs() {
return this.options.cleanupDelayMs < 0 ? 0 : this.options.cleanupDelayMs ?? 1000;
}
/** /**
* Creates a {@link KeyDefinition} for state that is an array. * Creates a {@link KeyDefinition} for state that is an array.
* @param stateDefinition The state definition to be added to the KeyDefinition * @param stateDefinition The state definition to be added to the KeyDefinition
@ -137,6 +153,10 @@ export class KeyDefinition<T> {
? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}` ? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}`
: `${scope}_${this.stateDefinition.name}_${this.key}`; : `${scope}_${this.stateDefinition.name}_${this.key}`;
} }
private get errorKeyName() {
return `${this.stateDefinition.name} > ${this.key}`;
}
} }
export type StorageKey = Opaque<string, "StorageKey">; export type StorageKey = Opaque<string, "StorageKey">;