From f8961e35e7314c690d929960ddffaeb3e9b62c04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9C=A8=20Audrey=20=E2=9C=A8?= Date: Wed, 7 Aug 2024 11:06:28 -0400 Subject: [PATCH] [PM-10682] introduce user state subject (#10426) * includes first pass at standardized reactive dependency interfaces --- libs/common/src/tools/dependencies.ts | 125 +++++ .../tools/state/user-state-subject.spec.ts | 467 ++++++++++++++++++ .../src/tools/state/user-state-subject.ts | 199 ++++++++ 3 files changed, 791 insertions(+) create mode 100644 libs/common/src/tools/dependencies.ts create mode 100644 libs/common/src/tools/state/user-state-subject.spec.ts create mode 100644 libs/common/src/tools/state/user-state-subject.ts diff --git a/libs/common/src/tools/dependencies.ts b/libs/common/src/tools/dependencies.ts new file mode 100644 index 0000000000..0488291b44 --- /dev/null +++ b/libs/common/src/tools/dependencies.ts @@ -0,0 +1,125 @@ +import { Observable } from "rxjs"; + +import { Policy } from "@bitwarden/common/admin-console/models/domain/policy"; +import { UserId } from "@bitwarden/common/types/guid"; + +/** error emitted when the `SingleUserDependency` changes Ids */ +export type UserChangedError = { + /** the userId pinned by the single user dependency */ + expectedUserId: UserId; + /** the userId received in error */ + actualUserId: UserId; +}; + +/** A pattern for types that depend upon a dynamic policy stream and return + * an observable. + * + * Consumers of this dependency should emit when `policy$` + * emits, provided that the latest message materially + * changes the output of the consumer. If `policy$` emits + * an unrecoverable error, the consumer should continue using + * the last-emitted policy. If `policy$` completes, the consumer + * should continue using the last-emitted policy. + */ +export type PolicyDependency = { + /** A stream that emits policies when subscribed and + * when the policy changes. The stream should not + * emit null or undefined. + */ + policy$: Observable; +}; + +/** A pattern for types that depend upon a dynamic userid and return + * an observable. + * + * Consumers of this dependency should emit when `userId$` changes. + * If `userId$` completes, the consumer should also complete. If + * `userId$` emits an unrecoverable error, the consumer should + * also emit the error. + */ +export type UserDependency = { + /** A stream that emits a UserId when subscribed and when + * the userId changes. The stream should not emit null + * or undefined. + */ + userId$: Observable; +}; + +/** A pattern for types that depend upon a fixed userid and return + * an observable. + * + * Consumers of this dependency should emit a `UserChangedError` if + * the value of `singleUserId$` changes. If `singleUserId$` completes, + * the consumer should also complete. If `singleUserId$` errors, the + * consumer should also emit the error. + * + * @remarks Check the consumer's documentation to determine how it + * responds to repeat emissions. + */ +export type SingleUserDependency = { + /** A stream that emits a UserId when subscribed and the user's account + * is unlocked, and completes when the account is locked or logged out. + * The stream should not emit null or undefined. + */ + singleUserId$: Observable; +}; + +/** A pattern for types that emit values exclusively when the dependency + * emits a message. + * + * Consumers of this dependency should emit when `on$` emits. If `on$` + * completes, the consumer should also complete. If `on$` + * errors, the consumer should also emit the error. + * + * @remarks This dependency is useful when you have a nondeterministic + * or stateful algorithm that you would like to run when an event occurs. + */ +export type OnDependency = { + /** The stream that controls emissions + */ + on$: Observable; +}; + +/** A pattern for types that emit when a dependency is `true`. + * + * Consumers of this dependency may emit when `when$` emits a true + * value. If `when$` completes, the consumer should also complete. If + * `when$` errors, the consumer should also emit the error. + * + * @remarks Check the consumer's documentation to determine how it + * responds to emissions. + */ +export type WhenDependency = { + /** The stream to observe for true emissions. */ + when$: Observable; +}; + +/** A pattern for types that allow their managed settings to + * be overridden. + * + * Consumers of this dependency should emit when `settings$` + * change. If `settings$` completes, the consumer should also + * complete. If `settings$` errors, the consumer should also + * emit the error. + */ +export type SettingsDependency = { + /** A stream that emits settings when settings become available + * and when they change. If the settings are not available, the + * stream should wait to emit until they become available. + */ + settings$: Observable; +}; + +/** A pattern for types that accept an arbitrary dependency and + * inject it into behavior-customizing functions. + * + * Unlike most other dependency types, this interface does not + * functionally constrain the behavior of the consumer. + * + * @remarks Consumers of this dependency wholly determine + * their response. Check the consumer's documentation + * to find this information. + */ +export type Dependencies = { + dependencies$: Observable; +}; diff --git a/libs/common/src/tools/state/user-state-subject.spec.ts b/libs/common/src/tools/state/user-state-subject.spec.ts new file mode 100644 index 0000000000..9d019abb0b --- /dev/null +++ b/libs/common/src/tools/state/user-state-subject.spec.ts @@ -0,0 +1,467 @@ +import { BehaviorSubject, of, Subject } from "rxjs"; + +import { UserId } from "@bitwarden/common/types/guid"; + +import { awaitAsync, FakeSingleUserState } from "../../../spec"; + +import { UserStateSubject } from "./user-state-subject"; + +const SomeUser = "some user" as UserId; +type TestType = { foo: string }; + +describe("UserStateSubject", () => { + describe("dependencies", () => { + it("ignores repeated when$ emissions", async () => { + // this test looks for `nextValue` because a subscription isn't necessary for + // the subject to update + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const when$ = new BehaviorSubject(true); + const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ }); + + // the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously + subject.next({ foo: "next" }); + await awaitAsync(); + when$.next(true); + await awaitAsync(); + when$.next(true); + when$.next(true); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalledTimes(1); + }); + + it("ignores repeated singleUserId$ emissions", async () => { + // this test looks for `nextValue` because a subscription isn't necessary for + // the subject to update + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const when$ = new BehaviorSubject(true); + const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ }); + + // the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously + subject.next({ foo: "next" }); + await awaitAsync(); + singleUserId$.next(SomeUser); + await awaitAsync(); + singleUserId$.next(SomeUser); + singleUserId$.next(SomeUser); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalledTimes(1); + }); + }); + + describe("next", () => { + it("emits the next value", async () => { + const state = new FakeSingleUserState(SomeUser, { foo: "init" }); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + const expected: TestType = { foo: "next" }; + + let actual: TestType = null; + subject.subscribe((value) => { + actual = value; + }); + subject.next(expected); + await awaitAsync(); + + expect(actual).toEqual(expected); + }); + + it("ceases emissions once complete", async () => { + const initialState = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialState); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let actual: TestType = null; + subject.subscribe((value) => { + actual = value; + }); + subject.complete(); + subject.next({ foo: "ignored" }); + await awaitAsync(); + + expect(actual).toEqual(initialState); + }); + + it("evaluates shouldUpdate", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const shouldUpdate = jest.fn(() => true); + const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + + expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, null); + }); + + it("evaluates shouldUpdate with a dependency", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const shouldUpdate = jest.fn(() => true); + const dependencyValue = { bar: "dependency" }; + const subject = new UserStateSubject(state, { + singleUserId$, + shouldUpdate, + dependencies$: of(dependencyValue), + }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + + expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue); + }); + + it("emits a value when shouldUpdate returns `true`", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const shouldUpdate = jest.fn(() => true); + const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate }); + const expected: TestType = { foo: "next" }; + + let actual: TestType = null; + subject.subscribe((value) => { + actual = value; + }); + subject.next(expected); + await awaitAsync(); + + expect(actual).toEqual(expected); + }); + + it("retains the current value when shouldUpdate returns `false`", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const shouldUpdate = jest.fn(() => false); + const subject = new UserStateSubject(state, { singleUserId$, shouldUpdate }); + + subject.next({ foo: "next" }); + await awaitAsync(); + let actual: TestType = null; + subject.subscribe((value) => { + actual = value; + }); + + expect(actual).toEqual(initialValue); + }); + + it("evaluates nextValue", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const subject = new UserStateSubject(state, { singleUserId$, nextValue }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, null); + }); + + it("evaluates nextValue with a dependency", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const dependencyValue = { bar: "dependency" }; + const subject = new UserStateSubject(state, { + singleUserId$, + nextValue, + dependencies$: of(dependencyValue), + }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue); + }); + + it("evaluates nextValue when when$ is true", async () => { + // this test looks for `nextValue` because a subscription isn't necessary for + // the subject to update + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const when$ = new BehaviorSubject(true); + const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalled(); + }); + + it("waits to evaluate nextValue until when$ is true", async () => { + // this test looks for `nextValue` because a subscription isn't necessary for + // the subject to update. + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const nextValue = jest.fn((_, next) => next); + const when$ = new BehaviorSubject(false); + const subject = new UserStateSubject(state, { singleUserId$, nextValue, when$ }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + expect(nextValue).not.toHaveBeenCalled(); + + when$.next(true); + await awaitAsync(); + expect(nextValue).toHaveBeenCalled(); + }); + + it("waits to evaluate nextValue until singleUserId$ emits", async () => { + // this test looks for `nextValue` because a subscription isn't necessary for + // the subject to update. + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new Subject(); + const nextValue = jest.fn((_, next) => next); + const subject = new UserStateSubject(state, { singleUserId$, nextValue }); + + const nextVal: TestType = { foo: "next" }; + subject.next(nextVal); + await awaitAsync(); + expect(nextValue).not.toHaveBeenCalled(); + singleUserId$.next(SomeUser); + await awaitAsync(); + + expect(nextValue).toHaveBeenCalled(); + }); + }); + + describe("error", () => { + it("emits errors", async () => { + const state = new FakeSingleUserState(SomeUser, { foo: "init" }); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + const expected: TestType = { foo: "error" }; + + let actual: TestType = null; + subject.subscribe({ + error: (value) => { + actual = value; + }, + }); + subject.error(expected); + await awaitAsync(); + + expect(actual).toEqual(expected); + }); + + it("ceases emissions once errored", async () => { + const initialState = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialState); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let actual: TestType = null; + subject.subscribe({ + error: (value) => { + actual = value; + }, + }); + subject.error("expectedError"); + subject.error("ignored"); + await awaitAsync(); + + expect(actual).toEqual("expectedError"); + }); + + it("ceases emissions once complete", async () => { + const initialState = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialState); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let shouldNotRun = false; + subject.subscribe({ + error: () => { + shouldNotRun = true; + }, + }); + subject.complete(); + subject.error("ignored"); + await awaitAsync(); + + expect(shouldNotRun).toBeFalsy(); + }); + }); + + describe("complete", () => { + it("emits completes", async () => { + const state = new FakeSingleUserState(SomeUser, { foo: "init" }); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let actual = false; + subject.subscribe({ + complete: () => { + actual = true; + }, + }); + subject.complete(); + await awaitAsync(); + + expect(actual).toBeTruthy(); + }); + + it("ceases emissions once errored", async () => { + const initialState = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialState); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let shouldNotRun = false; + subject.subscribe({ + complete: () => { + shouldNotRun = true; + }, + // prevent throw + error: () => {}, + }); + subject.error("occurred"); + subject.complete(); + await awaitAsync(); + + expect(shouldNotRun).toBeFalsy(); + }); + + it("ceases emissions once complete", async () => { + const initialState = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialState); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let timesRun = 0; + subject.subscribe({ + complete: () => { + timesRun++; + }, + }); + subject.complete(); + subject.complete(); + await awaitAsync(); + + expect(timesRun).toEqual(1); + }); + }); + + describe("subscribe", () => { + it("completes when singleUserId$ completes", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + + let actual = false; + subject.subscribe({ + complete: () => { + actual = true; + }, + }); + singleUserId$.complete(); + await awaitAsync(); + + expect(actual).toBeTruthy(); + }); + + it("completes when when$ completes", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const when$ = new BehaviorSubject(true); + const subject = new UserStateSubject(state, { singleUserId$, when$ }); + + let actual = false; + subject.subscribe({ + complete: () => { + actual = true; + }, + }); + when$.complete(); + await awaitAsync(); + + expect(actual).toBeTruthy(); + }); + + // FIXME: add test for `this.state.catch` once `FakeSingleUserState` supports + // simulated errors + + it("errors when singleUserId$ changes", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + const errorUserId = "error" as UserId; + + let error = false; + subject.subscribe({ + error: (e) => { + error = e; + }, + }); + singleUserId$.next(errorUserId); + await awaitAsync(); + + expect(error).toEqual({ expectedUserId: SomeUser, actualUserId: errorUserId }); + }); + + it("errors when singleUserId$ errors", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const subject = new UserStateSubject(state, { singleUserId$ }); + const expected = { error: "description" }; + + let actual = false; + subject.subscribe({ + error: (e) => { + actual = e; + }, + }); + singleUserId$.error(expected); + await awaitAsync(); + + expect(actual).toEqual(expected); + }); + + it("errors when when$ errors", async () => { + const initialValue: TestType = { foo: "init" }; + const state = new FakeSingleUserState(SomeUser, initialValue); + const singleUserId$ = new BehaviorSubject(SomeUser); + const when$ = new BehaviorSubject(true); + const subject = new UserStateSubject(state, { singleUserId$, when$ }); + const expected = { error: "description" }; + + let actual = false; + subject.subscribe({ + error: (e) => { + actual = e; + }, + }); + when$.error(expected); + await awaitAsync(); + + expect(actual).toEqual(expected); + }); + }); +}); diff --git a/libs/common/src/tools/state/user-state-subject.ts b/libs/common/src/tools/state/user-state-subject.ts new file mode 100644 index 0000000000..290103664b --- /dev/null +++ b/libs/common/src/tools/state/user-state-subject.ts @@ -0,0 +1,199 @@ +import { + Observer, + SubjectLike, + Unsubscribable, + ReplaySubject, + filter, + map, + Subject, + takeUntil, + pairwise, + combineLatest, + distinctUntilChanged, + BehaviorSubject, + race, + ignoreElements, + endWith, + startWith, +} from "rxjs"; +import { Simplify } from "type-fest"; + +import { SingleUserState } from "@bitwarden/common/platform/state"; + +import { Dependencies, SingleUserDependency, WhenDependency } from "../dependencies"; + +/** dependencies accepted by the user state subject */ +export type UserStateSubjectDependencies = Simplify< + SingleUserDependency & + Partial & + Partial> & { + /** Compute the next stored value. If this is not set, values + * provided to `next` unconditionally override state. + * @param current the value stored in state + * @param next the value received by the user state subject's `next` member + * @param dependencies the latest value from `Dependencies` + * @returns the value to store in state + */ + nextValue?: (current: State, next: State, dependencies?: Dependency) => State; + /** + * Compute whether the state should update. If this is not set, values + * provided to `next` always update the state. + * @param current the value stored in state + * @param next the value received by the user state subject's `next` member + * @param dependencies the latest value from `Dependencies` + * @returns `true` if the value should be stored, otherwise `false`. + */ + shouldUpdate?: (value: State, next: State, dependencies?: Dependency) => boolean; + } +>; + +/** + * Adapt a state provider to an rxjs subject. + * + * This subject buffers the last value it received in memory. The buffer is erased + * if the subject receives a complete or error event. It does not persist the buffer. + * + * Warning! The user state subject has a synchronous interface, but subscriptions are + * always asynchronous. + * + * @template State the state stored by the subject + * @template Dependencies use-specific dependencies provided by the user. + */ +export class UserStateSubject implements SubjectLike { + /** + * Instantiates the user state subject + * @param state the backing store of the subject + * @param dependencies tailor the subject's behavior for a particular + * purpose. + * @param dependencies.when$ blocks updates to the state subject until + * this becomes true. When this occurs, only the last-received update + * is applied. The blocked update is kept in memory. It does not persist + * to disk. + * @param dependencies.singleUserId$ writes block until the singleUserId$ + * is available. + */ + constructor( + private state: SingleUserState, + private dependencies: UserStateSubjectDependencies, + ) { + // normalize dependencies + const when$ = (this.dependencies.when$ ?? new BehaviorSubject(true)).pipe( + distinctUntilChanged(), + ); + const userIdAvailable$ = this.dependencies.singleUserId$.pipe( + startWith(state.userId), + pairwise(), + map(([expectedUserId, actualUserId]) => { + if (expectedUserId === actualUserId) { + return true; + } else { + throw { expectedUserId, actualUserId }; + } + }), + distinctUntilChanged(), + ); + + // observe completion + const whenComplete$ = when$.pipe(ignoreElements(), endWith(true)); + const inputComplete$ = this.input.pipe(ignoreElements(), endWith(true)); + const userIdComplete$ = this.dependencies.singleUserId$.pipe(ignoreElements(), endWith(true)); + const completion$ = race(whenComplete$, inputComplete$, userIdComplete$); + + // wire subscriptions + this.outputSubscription = this.state.state$.subscribe(this.output); + this.inputSubscription = combineLatest([this.input, when$, userIdAvailable$]) + .pipe( + filter(([_, when]) => when), + map(([state]) => state), + takeUntil(completion$), + ) + .subscribe({ + next: (r) => this.onNext(r), + error: (e: unknown) => this.onError(e), + complete: () => this.onComplete(), + }); + } + + next(value: State) { + this.input?.next(value); + } + + error(err: any) { + this.input?.error(err); + } + + complete() { + this.input?.complete(); + } + + /** Subscribe to the subject's event stream + * @param observer listening for events + * @returns the subscription + */ + subscribe(observer: Partial> | ((value: State) => void)): Unsubscribable { + return this.output.subscribe(observer); + } + + // using subjects to ensure the right semantics are followed; + // if greater efficiency becomes desirable, consider implementing + // `SubjectLike` directly + private input = new Subject(); + private readonly output = new ReplaySubject(1); + + private inputSubscription: Unsubscribable; + private outputSubscription: Unsubscribable; + + private onNext(value: State) { + const nextValue = this.dependencies.nextValue ?? ((_: State, next: State) => next); + const shouldUpdate = this.dependencies.shouldUpdate ?? ((_: State) => true); + + this.state + .update( + (state, dependencies) => { + const next = nextValue(state, value, dependencies); + return next; + }, + { + shouldUpdate(current, dependencies) { + const update = shouldUpdate(current, value, dependencies); + return update; + }, + combineLatestWith: this.dependencies.dependencies$, + }, + ) + .catch((e: any) => this.onError(e)); + } + + private onError(value: any) { + if (!this.isDisposed) { + this.output.error(value); + } + + this.dispose(); + } + + private onComplete() { + if (!this.isDisposed) { + this.output.complete(); + } + + this.dispose(); + } + + private get isDisposed() { + return this.input === null; + } + + private dispose() { + if (!this.isDisposed) { + // clean up internal subscriptions + this.inputSubscription.unsubscribe(); + this.outputSubscription.unsubscribe(); + this.inputSubscription = null; + this.outputSubscription = null; + + // drop input to ensure its value is removed from memory + this.input = null; + } + } +}