[PM-10682] introduce user state subject (#10426)

* includes first pass at standardized reactive dependency interfaces
This commit is contained in:
✨ Audrey ✨ 2024-08-07 11:06:28 -04:00 committed by GitHub
parent 3c7ca7e614
commit f8961e35e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 791 additions and 0 deletions

View File

@ -0,0 +1,125 @@
import { Observable } from "rxjs";
import { Policy } from "@bitwarden/common/admin-console/models/domain/policy";
import { UserId } from "@bitwarden/common/types/guid";
/** error emitted when the `SingleUserDependency` changes Ids */
export type UserChangedError = {
/** the userId pinned by the single user dependency */
expectedUserId: UserId;
/** the userId received in error */
actualUserId: UserId;
};
/** A pattern for types that depend upon a dynamic policy stream and return
* an observable.
*
* Consumers of this dependency should emit when `policy$`
* emits, provided that the latest message materially
* changes the output of the consumer. If `policy$` emits
* an unrecoverable error, the consumer should continue using
* the last-emitted policy. If `policy$` completes, the consumer
* should continue using the last-emitted policy.
*/
export type PolicyDependency = {
/** A stream that emits policies when subscribed and
* when the policy changes. The stream should not
* emit null or undefined.
*/
policy$: Observable<Policy[]>;
};
/** A pattern for types that depend upon a dynamic userid and return
* an observable.
*
* Consumers of this dependency should emit when `userId$` changes.
* If `userId$` completes, the consumer should also complete. If
* `userId$` emits an unrecoverable error, the consumer should
* also emit the error.
*/
export type UserDependency = {
/** A stream that emits a UserId when subscribed and when
* the userId changes. The stream should not emit null
* or undefined.
*/
userId$: Observable<UserId>;
};
/** A pattern for types that depend upon a fixed userid and return
* an observable.
*
* Consumers of this dependency should emit a `UserChangedError` if
* the value of `singleUserId$` changes. If `singleUserId$` completes,
* the consumer should also complete. If `singleUserId$` errors, the
* consumer should also emit the error.
*
* @remarks Check the consumer's documentation to determine how it
* responds to repeat emissions.
*/
export type SingleUserDependency = {
/** A stream that emits a UserId when subscribed and the user's account
* is unlocked, and completes when the account is locked or logged out.
* The stream should not emit null or undefined.
*/
singleUserId$: Observable<UserId>;
};
/** A pattern for types that emit values exclusively when the dependency
* emits a message.
*
* Consumers of this dependency should emit when `on$` emits. If `on$`
* completes, the consumer should also complete. If `on$`
* errors, the consumer should also emit the error.
*
* @remarks This dependency is useful when you have a nondeterministic
* or stateful algorithm that you would like to run when an event occurs.
*/
export type OnDependency = {
/** The stream that controls emissions
*/
on$: Observable<void>;
};
/** A pattern for types that emit when a dependency is `true`.
*
* Consumers of this dependency may emit when `when$` emits a true
* value. If `when$` completes, the consumer should also complete. If
* `when$` errors, the consumer should also emit the error.
*
* @remarks Check the consumer's documentation to determine how it
* responds to emissions.
*/
export type WhenDependency = {
/** The stream to observe for true emissions. */
when$: Observable<boolean>;
};
/** A pattern for types that allow their managed settings to
* be overridden.
*
* Consumers of this dependency should emit when `settings$`
* change. If `settings$` completes, the consumer should also
* complete. If `settings$` errors, the consumer should also
* emit the error.
*/
export type SettingsDependency<Settings> = {
/** A stream that emits settings when settings become available
* and when they change. If the settings are not available, the
* stream should wait to emit until they become available.
*/
settings$: Observable<Settings>;
};
/** A pattern for types that accept an arbitrary dependency and
* inject it into behavior-customizing functions.
*
* Unlike most other dependency types, this interface does not
* functionally constrain the behavior of the consumer.
*
* @remarks Consumers of this dependency wholly determine
* their response. Check the consumer's documentation
* to find this information.
*/
export type Dependencies<TCombine> = {
dependencies$: Observable<TCombine>;
};

View File

@ -0,0 +1,467 @@
import { BehaviorSubject, of, Subject } from "rxjs";
import { UserId } from "@bitwarden/common/types/guid";
import { awaitAsync, FakeSingleUserState } from "../../../spec";
import { UserStateSubject } from "./user-state-subject";
const SomeUser = "some user" as UserId;
type TestType = { foo: string };
describe("UserStateSubject", () => {
describe("dependencies", () => {
it("ignores repeated when$ emissions", async () => {
// this test looks for `nextValue` because a subscription isn't necessary for
// the subject to update
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const when$ = new BehaviorSubject(true);
const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ });
// the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously
subject.next({ foo: "next" });
await awaitAsync();
when$.next(true);
await awaitAsync();
when$.next(true);
when$.next(true);
await awaitAsync();
expect(nextValue).toHaveBeenCalledTimes(1);
});
it("ignores repeated singleUserId$ emissions", async () => {
// this test looks for `nextValue` because a subscription isn't necessary for
// the subject to update
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const when$ = new BehaviorSubject(true);
const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ });
// the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously
subject.next({ foo: "next" });
await awaitAsync();
singleUserId$.next(SomeUser);
await awaitAsync();
singleUserId$.next(SomeUser);
singleUserId$.next(SomeUser);
await awaitAsync();
expect(nextValue).toHaveBeenCalledTimes(1);
});
});
describe("next", () => {
it("emits the next value", async () => {
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
const expected: TestType = { foo: "next" };
let actual: TestType = null;
subject.subscribe((value) => {
actual = value;
});
subject.next(expected);
await awaitAsync();
expect(actual).toEqual(expected);
});
it("ceases emissions once complete", async () => {
const initialState = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let actual: TestType = null;
subject.subscribe((value) => {
actual = value;
});
subject.complete();
subject.next({ foo: "ignored" });
await awaitAsync();
expect(actual).toEqual(initialState);
});
it("evaluates shouldUpdate", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const shouldUpdate = jest.fn(() => true);
const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate });
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, null);
});
it("evaluates shouldUpdate with a dependency", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const shouldUpdate = jest.fn(() => true);
const dependencyValue = { bar: "dependency" };
const subject = new UserStateSubject(state, {
singleUserId$,
shouldUpdate,
dependencies$: of(dependencyValue),
});
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue);
});
it("emits a value when shouldUpdate returns `true`", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const shouldUpdate = jest.fn(() => true);
const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate });
const expected: TestType = { foo: "next" };
let actual: TestType = null;
subject.subscribe((value) => {
actual = value;
});
subject.next(expected);
await awaitAsync();
expect(actual).toEqual(expected);
});
it("retains the current value when shouldUpdate returns `false`", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const shouldUpdate = jest.fn(() => false);
const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate });
subject.next({ foo: "next" });
await awaitAsync();
let actual: TestType = null;
subject.subscribe((value) => {
actual = value;
});
expect(actual).toEqual(initialValue);
});
it("evaluates nextValue", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const subject = new UserStateSubject(state, { singleUserId$, nextValue });
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, null);
});
it("evaluates nextValue with a dependency", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const dependencyValue = { bar: "dependency" };
const subject = new UserStateSubject(state, {
singleUserId$,
nextValue,
dependencies$: of(dependencyValue),
});
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue);
});
it("evaluates nextValue when when$ is true", async () => {
// this test looks for `nextValue` because a subscription isn't necessary for
// the subject to update
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const when$ = new BehaviorSubject(true);
const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ });
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(nextValue).toHaveBeenCalled();
});
it("waits to evaluate nextValue until when$ is true", async () => {
// this test looks for `nextValue` because a subscription isn't necessary for
// the subject to update.
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const nextValue = jest.fn((_, next) => next);
const when$ = new BehaviorSubject(false);
const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ });
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(nextValue).not.toHaveBeenCalled();
when$.next(true);
await awaitAsync();
expect(nextValue).toHaveBeenCalled();
});
it("waits to evaluate nextValue until singleUserId$ emits", async () => {
// this test looks for `nextValue` because a subscription isn't necessary for
// the subject to update.
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new Subject<UserId>();
const nextValue = jest.fn((_, next) => next);
const subject = new UserStateSubject(state, { singleUserId$, nextValue });
const nextVal: TestType = { foo: "next" };
subject.next(nextVal);
await awaitAsync();
expect(nextValue).not.toHaveBeenCalled();
singleUserId$.next(SomeUser);
await awaitAsync();
expect(nextValue).toHaveBeenCalled();
});
});
describe("error", () => {
it("emits errors", async () => {
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
const expected: TestType = { foo: "error" };
let actual: TestType = null;
subject.subscribe({
error: (value) => {
actual = value;
},
});
subject.error(expected);
await awaitAsync();
expect(actual).toEqual(expected);
});
it("ceases emissions once errored", async () => {
const initialState = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let actual: TestType = null;
subject.subscribe({
error: (value) => {
actual = value;
},
});
subject.error("expectedError");
subject.error("ignored");
await awaitAsync();
expect(actual).toEqual("expectedError");
});
it("ceases emissions once complete", async () => {
const initialState = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let shouldNotRun = false;
subject.subscribe({
error: () => {
shouldNotRun = true;
},
});
subject.complete();
subject.error("ignored");
await awaitAsync();
expect(shouldNotRun).toBeFalsy();
});
});
describe("complete", () => {
it("emits completes", async () => {
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let actual = false;
subject.subscribe({
complete: () => {
actual = true;
},
});
subject.complete();
await awaitAsync();
expect(actual).toBeTruthy();
});
it("ceases emissions once errored", async () => {
const initialState = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let shouldNotRun = false;
subject.subscribe({
complete: () => {
shouldNotRun = true;
},
// prevent throw
error: () => {},
});
subject.error("occurred");
subject.complete();
await awaitAsync();
expect(shouldNotRun).toBeFalsy();
});
it("ceases emissions once complete", async () => {
const initialState = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let timesRun = 0;
subject.subscribe({
complete: () => {
timesRun++;
},
});
subject.complete();
subject.complete();
await awaitAsync();
expect(timesRun).toEqual(1);
});
});
describe("subscribe", () => {
it("completes when singleUserId$ completes", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
let actual = false;
subject.subscribe({
complete: () => {
actual = true;
},
});
singleUserId$.complete();
await awaitAsync();
expect(actual).toBeTruthy();
});
it("completes when when$ completes", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const when$ = new BehaviorSubject(true);
const subject = new UserStateSubject(state, { singleUserId$, when$ });
let actual = false;
subject.subscribe({
complete: () => {
actual = true;
},
});
when$.complete();
await awaitAsync();
expect(actual).toBeTruthy();
});
// FIXME: add test for `this.state.catch` once `FakeSingleUserState` supports
// simulated errors
it("errors when singleUserId$ changes", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
const errorUserId = "error" as UserId;
let error = false;
subject.subscribe({
error: (e) => {
error = e;
},
});
singleUserId$.next(errorUserId);
await awaitAsync();
expect(error).toEqual({ expectedUserId: SomeUser, actualUserId: errorUserId });
});
it("errors when singleUserId$ errors", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const subject = new UserStateSubject(state, { singleUserId$ });
const expected = { error: "description" };
let actual = false;
subject.subscribe({
error: (e) => {
actual = e;
},
});
singleUserId$.error(expected);
await awaitAsync();
expect(actual).toEqual(expected);
});
it("errors when when$ errors", async () => {
const initialValue: TestType = { foo: "init" };
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
const singleUserId$ = new BehaviorSubject(SomeUser);
const when$ = new BehaviorSubject(true);
const subject = new UserStateSubject(state, { singleUserId$, when$ });
const expected = { error: "description" };
let actual = false;
subject.subscribe({
error: (e) => {
actual = e;
},
});
when$.error(expected);
await awaitAsync();
expect(actual).toEqual(expected);
});
});
});

View File

@ -0,0 +1,199 @@
import {
Observer,
SubjectLike,
Unsubscribable,
ReplaySubject,
filter,
map,
Subject,
takeUntil,
pairwise,
combineLatest,
distinctUntilChanged,
BehaviorSubject,
race,
ignoreElements,
endWith,
startWith,
} from "rxjs";
import { Simplify } from "type-fest";
import { SingleUserState } from "@bitwarden/common/platform/state";
import { Dependencies, SingleUserDependency, WhenDependency } from "../dependencies";
/** dependencies accepted by the user state subject */
export type UserStateSubjectDependencies<State, Dependency> = Simplify<
SingleUserDependency &
Partial<WhenDependency> &
Partial<Dependencies<Dependency>> & {
/** Compute the next stored value. If this is not set, values
* provided to `next` unconditionally override state.
* @param current the value stored in state
* @param next the value received by the user state subject's `next` member
* @param dependencies the latest value from `Dependencies<TCombine>`
* @returns the value to store in state
*/
nextValue?: (current: State, next: State, dependencies?: Dependency) => State;
/**
* Compute whether the state should update. If this is not set, values
* provided to `next` always update the state.
* @param current the value stored in state
* @param next the value received by the user state subject's `next` member
* @param dependencies the latest value from `Dependencies<TCombine>`
* @returns `true` if the value should be stored, otherwise `false`.
*/
shouldUpdate?: (value: State, next: State, dependencies?: Dependency) => boolean;
}
>;
/**
* Adapt a state provider to an rxjs subject.
*
* This subject buffers the last value it received in memory. The buffer is erased
* if the subject receives a complete or error event. It does not persist the buffer.
*
* Warning! The user state subject has a synchronous interface, but subscriptions are
* always asynchronous.
*
* @template State the state stored by the subject
* @template Dependencies use-specific dependencies provided by the user.
*/
export class UserStateSubject<State, Dependencies = null> implements SubjectLike<State> {
/**
* Instantiates the user state subject
* @param state the backing store of the subject
* @param dependencies tailor the subject's behavior for a particular
* purpose.
* @param dependencies.when$ blocks updates to the state subject until
* this becomes true. When this occurs, only the last-received update
* is applied. The blocked update is kept in memory. It does not persist
* to disk.
* @param dependencies.singleUserId$ writes block until the singleUserId$
* is available.
*/
constructor(
private state: SingleUserState<State>,
private dependencies: UserStateSubjectDependencies<State, Dependencies>,
) {
// normalize dependencies
const when$ = (this.dependencies.when$ ?? new BehaviorSubject(true)).pipe(
distinctUntilChanged(),
);
const userIdAvailable$ = this.dependencies.singleUserId$.pipe(
startWith(state.userId),
pairwise(),
map(([expectedUserId, actualUserId]) => {
if (expectedUserId === actualUserId) {
return true;
} else {
throw { expectedUserId, actualUserId };
}
}),
distinctUntilChanged(),
);
// observe completion
const whenComplete$ = when$.pipe(ignoreElements(), endWith(true));
const inputComplete$ = this.input.pipe(ignoreElements(), endWith(true));
const userIdComplete$ = this.dependencies.singleUserId$.pipe(ignoreElements(), endWith(true));
const completion$ = race(whenComplete$, inputComplete$, userIdComplete$);
// wire subscriptions
this.outputSubscription = this.state.state$.subscribe(this.output);
this.inputSubscription = combineLatest([this.input, when$, userIdAvailable$])
.pipe(
filter(([_, when]) => when),
map(([state]) => state),
takeUntil(completion$),
)
.subscribe({
next: (r) => this.onNext(r),
error: (e: unknown) => this.onError(e),
complete: () => this.onComplete(),
});
}
next(value: State) {
this.input?.next(value);
}
error(err: any) {
this.input?.error(err);
}
complete() {
this.input?.complete();
}
/** Subscribe to the subject's event stream
* @param observer listening for events
* @returns the subscription
*/
subscribe(observer: Partial<Observer<State>> | ((value: State) => void)): Unsubscribable {
return this.output.subscribe(observer);
}
// using subjects to ensure the right semantics are followed;
// if greater efficiency becomes desirable, consider implementing
// `SubjectLike` directly
private input = new Subject<State>();
private readonly output = new ReplaySubject<State>(1);
private inputSubscription: Unsubscribable;
private outputSubscription: Unsubscribable;
private onNext(value: State) {
const nextValue = this.dependencies.nextValue ?? ((_: State, next: State) => next);
const shouldUpdate = this.dependencies.shouldUpdate ?? ((_: State) => true);
this.state
.update(
(state, dependencies) => {
const next = nextValue(state, value, dependencies);
return next;
},
{
shouldUpdate(current, dependencies) {
const update = shouldUpdate(current, value, dependencies);
return update;
},
combineLatestWith: this.dependencies.dependencies$,
},
)
.catch((e: any) => this.onError(e));
}
private onError(value: any) {
if (!this.isDisposed) {
this.output.error(value);
}
this.dispose();
}
private onComplete() {
if (!this.isDisposed) {
this.output.complete();
}
this.dispose();
}
private get isDisposed() {
return this.input === null;
}
private dispose() {
if (!this.isDisposed) {
// clean up internal subscriptions
this.inputSubscription.unsubscribe();
this.outputSubscription.unsubscribe();
this.inputSubscription = null;
this.outputSubscription = null;
// drop input to ensure its value is removed from memory
this.input = null;
}
}
}