[PM-8292] Fixup `ForegroundSyncService` (#9292)

* Change `object` to `Record<string, unknown>`

* Change `object` to `Record<string, unknown>` Pt. 2

* Update ForegroundSyncService

- Manage finish message in the listener to more gaurantee a message back
- Make the timeout much longer
- Allow it to throw if the background sync service threw

---------

Co-authored-by: Cesar Gonzalez <cesar.a.gonzalezcs@gmail.com>
This commit is contained in:
Justin Baur 2024-05-29 12:12:58 -04:00 committed by GitHub
parent beb930902a
commit a6df923416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 328 additions and 86 deletions

View File

@ -334,7 +334,7 @@ export default class MainBackground {
ssoLoginService: SsoLoginServiceAbstraction;
billingAccountProfileStateService: BillingAccountProfileStateService;
// eslint-disable-next-line rxjs/no-exposed-subjects -- Needed to give access to services module
intraprocessMessagingSubject: Subject<Message<object>>;
intraprocessMessagingSubject: Subject<Message<Record<string, unknown>>>;
userAutoUnlockKeyService: UserAutoUnlockKeyService;
scriptInjectorService: BrowserScriptInjectorService;
kdfConfigService: kdfConfigServiceAbstraction;
@ -384,7 +384,7 @@ export default class MainBackground {
this.keyGenerationService = new KeyGenerationService(this.cryptoFunctionService);
this.storageService = new BrowserLocalStorageService();
this.intraprocessMessagingSubject = new Subject<Message<object>>();
this.intraprocessMessagingSubject = new Subject<Message<Record<string, unknown>>>();
this.messagingService = MessageSender.combine(
new SubjectMessageSender(this.intraprocessMessagingSubject),
@ -840,7 +840,12 @@ export default class MainBackground {
this.authService,
);
this.syncServiceListener = new SyncServiceListener(this.syncService, messageListener);
this.syncServiceListener = new SyncServiceListener(
this.syncService,
messageListener,
this.messagingService,
this.logService,
);
}
this.eventUploadService = new EventUploadService(
this.apiService,
@ -1170,7 +1175,7 @@ export default class MainBackground {
this.contextMenusBackground?.init();
await this.idleBackground.init();
this.webRequestBackground?.startListening();
this.syncServiceListener?.startListening();
this.syncServiceListener?.listener$().subscribe();
return new Promise<void>((resolve) => {
setTimeout(async () => {

View File

@ -15,9 +15,9 @@ const HANDLED_ERRORS: Record<string, ErrorHandler> = {
export class ChromeMessageSender implements MessageSender {
constructor(private readonly logService: LogService) {}
send<T extends object>(
send<T extends Record<string, unknown>>(
commandDefinition: string | CommandDefinition<T>,
payload: object | T = {},
payload: Record<string, unknown> | T = {},
): void {
const command = getCommand(commandDefinition);
chrome.runtime.sendMessage(Object.assign(payload, { command: command }), () => {

View File

@ -0,0 +1,130 @@
import { mock } from "jest-mock-extended";
import { Subject } from "rxjs";
import { ApiService } from "@bitwarden/common/abstractions/api.service";
import { AccountService } from "@bitwarden/common/auth/abstractions/account.service";
import { AuthService } from "@bitwarden/common/auth/abstractions/auth.service";
import { LogService } from "@bitwarden/common/platform/abstractions/log.service";
import { StateService } from "@bitwarden/common/platform/abstractions/state.service";
import { MessageListener, MessageSender } from "@bitwarden/common/platform/messaging";
import { SendApiService } from "@bitwarden/common/tools/send/services/send-api.service.abstraction";
import { InternalSendService } from "@bitwarden/common/tools/send/services/send.service.abstraction";
import { CipherService } from "@bitwarden/common/vault/abstractions/cipher.service";
import { CollectionService } from "@bitwarden/common/vault/abstractions/collection.service";
import { FolderApiServiceAbstraction } from "@bitwarden/common/vault/abstractions/folder/folder-api.service.abstraction";
import { InternalFolderService } from "@bitwarden/common/vault/abstractions/folder/folder.service.abstraction";
import { DO_FULL_SYNC, ForegroundSyncService, FullSyncMessage } from "./foreground-sync.service";
import { FullSyncFinishedMessage } from "./sync-service.listener";
describe("ForegroundSyncService", () => {
const stateService = mock<StateService>();
const folderService = mock<InternalFolderService>();
const folderApiService = mock<FolderApiServiceAbstraction>();
const messageSender = mock<MessageSender>();
const logService = mock<LogService>();
const cipherService = mock<CipherService>();
const collectionService = mock<CollectionService>();
const apiService = mock<ApiService>();
const accountService = mock<AccountService>();
const authService = mock<AuthService>();
const sendService = mock<InternalSendService>();
const sendApiService = mock<SendApiService>();
const messageListener = mock<MessageListener>();
const sut = new ForegroundSyncService(
stateService,
folderService,
folderApiService,
messageSender,
logService,
cipherService,
collectionService,
apiService,
accountService,
authService,
sendService,
sendApiService,
messageListener,
);
beforeEach(() => {
jest.resetAllMocks();
});
describe("fullSync", () => {
const getAndAssertRequestId = (doFullSyncMessage: Omit<FullSyncMessage, "requestId">) => {
expect(messageSender.send).toHaveBeenCalledWith(
DO_FULL_SYNC,
// We don't know the request id since that is created internally
expect.objectContaining(doFullSyncMessage),
);
const message = messageSender.send.mock.calls[0][1];
if (!("requestId" in message) || typeof message.requestId !== "string") {
throw new Error("requestId property of type string was expected on the sent message.");
}
return message.requestId;
};
it("correctly relays a successful fullSync", async () => {
const messages = new Subject<FullSyncFinishedMessage>();
messageListener.messages$.mockReturnValue(messages);
const fullSyncPromise = sut.fullSync(true, false);
expect(sut.syncInProgress).toBe(true);
const requestId = getAndAssertRequestId({ forceSync: true, allowThrowOnError: false });
// Pretend the sync has finished
messages.next({ successfully: true, errorMessage: null, requestId: requestId });
const result = await fullSyncPromise;
expect(sut.syncInProgress).toBe(false);
expect(result).toBe(true);
});
it("correctly relays an unsuccessful fullSync but does not throw if allowThrowOnError = false", async () => {
const messages = new Subject<FullSyncFinishedMessage>();
messageListener.messages$.mockReturnValue(messages);
const fullSyncPromise = sut.fullSync(false, false);
expect(sut.syncInProgress).toBe(true);
const requestId = getAndAssertRequestId({ forceSync: false, allowThrowOnError: false });
// Pretend the sync has finished
messages.next({
successfully: false,
errorMessage: "Error while syncing",
requestId: requestId,
});
const result = await fullSyncPromise;
expect(sut.syncInProgress).toBe(false);
expect(result).toBe(false);
});
it("correctly relays an unsuccessful fullSync but and will throw if allowThrowOnError = true", async () => {
const messages = new Subject<FullSyncFinishedMessage>();
messageListener.messages$.mockReturnValue(messages);
const fullSyncPromise = sut.fullSync(true, true);
expect(sut.syncInProgress).toBe(true);
const requestId = getAndAssertRequestId({ forceSync: true, allowThrowOnError: true });
// Pretend the sync has finished
messages.next({
successfully: false,
errorMessage: "Error while syncing",
requestId: requestId,
});
await expect(fullSyncPromise).rejects.toThrow("Error while syncing");
expect(sut.syncInProgress).toBe(false);
});
});
});

View File

@ -1,4 +1,4 @@
import { firstValueFrom, timeout } from "rxjs";
import { filter, firstValueFrom, of, timeout } from "rxjs";
import { ApiService } from "@bitwarden/common/abstractions/api.service";
import { AccountService } from "@bitwarden/common/auth/abstractions/account.service";
@ -10,6 +10,7 @@ import {
MessageListener,
MessageSender,
} from "@bitwarden/common/platform/messaging";
import { Utils } from "@bitwarden/common/platform/misc/utils";
import { CoreSyncService } from "@bitwarden/common/platform/sync/internal";
import { SendApiService } from "@bitwarden/common/tools/send/services/send-api.service.abstraction";
import { InternalSendService } from "@bitwarden/common/tools/send/services/send.service.abstraction";
@ -18,11 +19,11 @@ import { CollectionService } from "@bitwarden/common/vault/abstractions/collecti
import { FolderApiServiceAbstraction } from "@bitwarden/common/vault/abstractions/folder/folder-api.service.abstraction";
import { InternalFolderService } from "@bitwarden/common/vault/abstractions/folder/folder.service.abstraction";
const SYNC_COMPLETED = new CommandDefinition<{ successfully: boolean }>("syncCompleted");
export const DO_FULL_SYNC = new CommandDefinition<{
forceSync: boolean;
allowThrowOnError: boolean;
}>("doFullSync");
import { FULL_SYNC_FINISHED } from "./sync-service.listener";
export type FullSyncMessage = { forceSync: boolean; allowThrowOnError: boolean; requestId: string };
export const DO_FULL_SYNC = new CommandDefinition<FullSyncMessage>("doFullSync");
export class ForegroundSyncService extends CoreSyncService {
constructor(
@ -59,18 +60,29 @@ export class ForegroundSyncService extends CoreSyncService {
async fullSync(forceSync: boolean, allowThrowOnError: boolean = false): Promise<boolean> {
this.syncInProgress = true;
try {
const requestId = Utils.newGuid();
const syncCompletedPromise = firstValueFrom(
this.messageListener.messages$(SYNC_COMPLETED).pipe(
this.messageListener.messages$(FULL_SYNC_FINISHED).pipe(
filter((m) => m.requestId === requestId),
timeout({
first: 10_000,
first: 30_000,
// If we haven't heard back in 30 seconds, just pretend we heard back about an unsuccesful sync.
with: () => {
throw new Error("Timeout while doing a fullSync call.");
this.logService.warning(
"ForegroundSyncService did not receive a message back in a reasonable time.",
);
return of({ successfully: false, errorMessage: "Sync timed out." });
},
}),
),
);
this.messageSender.send(DO_FULL_SYNC, { forceSync, allowThrowOnError });
this.messageSender.send(DO_FULL_SYNC, { forceSync, allowThrowOnError, requestId });
const result = await syncCompletedPromise;
if (allowThrowOnError && result.errorMessage != null) {
throw new Error(result.errorMessage);
}
return result.successfully;
} finally {
this.syncInProgress = false;

View File

@ -0,0 +1,60 @@
import { mock } from "jest-mock-extended";
import { Subject, firstValueFrom } from "rxjs";
import { LogService } from "@bitwarden/common/platform/abstractions/log.service";
import { MessageListener, MessageSender } from "@bitwarden/common/platform/messaging";
import { tagAsExternal } from "@bitwarden/common/platform/messaging/helpers";
import { SyncService } from "@bitwarden/common/vault/abstractions/sync/sync.service.abstraction";
import { FullSyncMessage } from "./foreground-sync.service";
import { FULL_SYNC_FINISHED, SyncServiceListener } from "./sync-service.listener";
describe("SyncServiceListener", () => {
const syncService = mock<SyncService>();
const messageListener = mock<MessageListener>();
const messageSender = mock<MessageSender>();
const logService = mock<LogService>();
const messages = new Subject<FullSyncMessage>();
messageListener.messages$.mockReturnValue(messages.asObservable().pipe(tagAsExternal()));
const sut = new SyncServiceListener(syncService, messageListener, messageSender, logService);
describe("listener$", () => {
it.each([true, false])(
"calls full sync and relays outcome when sync is [successfully = %s]",
async (value) => {
const listener = sut.listener$();
const emissionPromise = firstValueFrom(listener);
syncService.fullSync.mockResolvedValueOnce(value);
messages.next({ forceSync: true, allowThrowOnError: false, requestId: "1" });
await emissionPromise;
expect(syncService.fullSync).toHaveBeenCalledWith(true, false);
expect(messageSender.send).toHaveBeenCalledWith(FULL_SYNC_FINISHED, {
successfully: value,
errorMessage: null,
requestId: "1",
});
},
);
it("calls full sync and relays error message through messaging", async () => {
const listener = sut.listener$();
const emissionPromise = firstValueFrom(listener);
syncService.fullSync.mockRejectedValueOnce(new Error("SyncError"));
messages.next({ forceSync: true, allowThrowOnError: false, requestId: "1" });
await emissionPromise;
expect(syncService.fullSync).toHaveBeenCalledWith(true, false);
expect(messageSender.send).toHaveBeenCalledWith(FULL_SYNC_FINISHED, {
successfully: false,
errorMessage: "SyncError",
requestId: "1",
});
});
});
});

View File

@ -1,25 +1,58 @@
import { Subscription, concatMap, filter } from "rxjs";
import { Observable, concatMap, filter } from "rxjs";
import { MessageListener, isExternalMessage } from "@bitwarden/common/platform/messaging";
import { LogService } from "@bitwarden/common/platform/abstractions/log.service";
import {
CommandDefinition,
MessageListener,
MessageSender,
isExternalMessage,
} from "@bitwarden/common/platform/messaging";
import { SyncService } from "@bitwarden/common/vault/abstractions/sync/sync.service.abstraction";
import { DO_FULL_SYNC } from "./foreground-sync.service";
export type FullSyncFinishedMessage = {
successfully: boolean;
errorMessage: string;
requestId: string;
};
export const FULL_SYNC_FINISHED = new CommandDefinition<FullSyncFinishedMessage>(
"fullSyncFinished",
);
export class SyncServiceListener {
constructor(
private readonly syncService: SyncService,
private readonly messageListener: MessageListener,
private readonly messageSender: MessageSender,
private readonly logService: LogService,
) {}
startListening(): Subscription {
return this.messageListener
.messages$(DO_FULL_SYNC)
.pipe(
filter((message) => isExternalMessage(message)),
concatMap(async ({ forceSync, allowThrowOnError }) => {
await this.syncService.fullSync(forceSync, allowThrowOnError);
}),
)
.subscribe();
listener$(): Observable<void> {
return this.messageListener.messages$(DO_FULL_SYNC).pipe(
filter((message) => isExternalMessage(message)),
concatMap(async ({ forceSync, allowThrowOnError, requestId }) => {
await this.doFullSync(forceSync, allowThrowOnError, requestId);
}),
);
}
private async doFullSync(forceSync: boolean, allowThrowOnError: boolean, requestId: string) {
try {
const result = await this.syncService.fullSync(forceSync, allowThrowOnError);
this.messageSender.send(FULL_SYNC_FINISHED, {
successfully: result,
errorMessage: null,
requestId,
});
} catch (err) {
this.logService.warning("Error while doing full sync in SyncServiceListener", err);
this.messageSender.send(FULL_SYNC_FINISHED, {
successfully: false,
errorMessage: err?.message ?? "Unknown Sync Error",
requestId,
});
}
}
}

View File

@ -1,5 +1,6 @@
import { map, share } from "rxjs";
import { Message } from "@bitwarden/common/platform/messaging";
import { tagAsExternal } from "@bitwarden/common/platform/messaging/internal";
import { fromChromeEvent } from "../browser/from-chrome-event";
@ -20,7 +21,7 @@ export const fromChromeRuntimeMessaging = () => {
return message;
}),
tagAsExternal,
tagAsExternal<Message<Record<string, unknown>>>(),
share(),
);
};

View File

@ -524,7 +524,7 @@ const safeProviders: SafeProvider[] = [
}),
safeProvider({
provide: MessageListener,
useFactory: (subject: Subject<Message<object>>, ngZone: NgZone) =>
useFactory: (subject: Subject<Message<Record<string, unknown>>>, ngZone: NgZone) =>
new MessageListener(
merge(
subject.asObservable(), // For messages in the same context
@ -535,7 +535,7 @@ const safeProviders: SafeProvider[] = [
}),
safeProvider({
provide: MessageSender,
useFactory: (subject: Subject<Message<object>>, logService: LogService) =>
useFactory: (subject: Subject<Message<Record<string, unknown>>>, logService: LogService) =>
MessageSender.combine(
new SubjectMessageSender(subject), // For sending messages in the same context
new ChromeMessageSender(logService), // For sending messages to different contexts
@ -550,14 +550,14 @@ const safeProviders: SafeProvider[] = [
// we need the same instance that our in memory background is utilizing.
return getBgService("intraprocessMessagingSubject")();
} else {
return new Subject<Message<object>>();
return new Subject<Message<Record<string, unknown>>>();
}
},
deps: [],
}),
safeProvider({
provide: MessageSender,
useFactory: (subject: Subject<Message<object>>, logService: LogService) =>
useFactory: (subject: Subject<Message<Record<string, unknown>>>, logService: LogService) =>
MessageSender.combine(
new SubjectMessageSender(subject), // For sending messages in the same context
new ChromeMessageSender(logService), // For sending messages to different contexts
@ -576,7 +576,7 @@ const safeProviders: SafeProvider[] = [
// There isn't a locally created background so we will communicate with
// the true background through chrome apis, in that case, we can just create
// one for ourself.
return new Subject<Message<object>>();
return new Subject<Message<Record<string, unknown>>>();
}
},
deps: [],

View File

@ -151,7 +151,7 @@ const safeProviders: SafeProvider[] = [
}),
safeProvider({
provide: MessageSender,
useFactory: (subject: Subject<Message<object>>) =>
useFactory: (subject: Subject<Message<Record<string, unknown>>>) =>
MessageSender.combine(
new ElectronRendererMessageSender(), // Communication with main process
new SubjectMessageSender(subject), // Communication with ourself
@ -160,7 +160,7 @@ const safeProviders: SafeProvider[] = [
}),
safeProvider({
provide: MessageListener,
useFactory: (subject: Subject<Message<object>>) =>
useFactory: (subject: Subject<Message<Record<string, unknown>>>) =>
new MessageListener(
merge(
subject.asObservable(), // For messages from the same context

View File

@ -223,7 +223,7 @@ export class Main {
this.updaterMain = new UpdaterMain(this.i18nService, this.windowMain);
this.trayMain = new TrayMain(this.windowMain, this.i18nService, this.desktopSettingsService);
const messageSubject = new Subject<Message<object>>();
const messageSubject = new Subject<Message<Record<string, unknown>>>();
this.messagingService = MessageSender.combine(
new SubjectMessageSender(messageSubject), // For local messages
new ElectronMainMessagingService(this.windowMain),

View File

@ -2,9 +2,9 @@ import { MessageSender, CommandDefinition } from "@bitwarden/common/platform/mes
import { getCommand } from "@bitwarden/common/platform/messaging/internal";
export class ElectronRendererMessageSender implements MessageSender {
send<T extends object>(
send<T extends Record<string, unknown>>(
commandDefinition: CommandDefinition<T> | string,
payload: object | T = {},
payload: Record<string, unknown> | T = {},
): void {
const command = getCommand(commandDefinition);
ipc.platform.sendMessage(Object.assign({}, { command: command }, payload));

View File

@ -8,8 +8,8 @@ import { tagAsExternal } from "@bitwarden/common/platform/messaging/internal";
* @returns An observable stream of messages.
*/
export const fromIpcMessaging = () => {
return fromEventPattern<Message<object>>(
return fromEventPattern<Message<Record<string, unknown>>>(
(handler) => ipc.platform.onMessage.addListener(handler),
(handler) => ipc.platform.onMessage.removeListener(handler),
).pipe(tagAsExternal, share());
).pipe(tagAsExternal(), share());
};

View File

@ -87,7 +87,10 @@ export class ElectronMainMessagingService implements MessageSender {
});
}
send<T extends object>(commandDefinition: CommandDefinition<T> | string, arg: T | object = {}) {
send<T extends Record<string, unknown>>(
commandDefinition: CommandDefinition<T> | string,
arg: T | Record<string, unknown> = {},
) {
const command = getCommand(commandDefinition);
const message = Object.assign({}, { command: command }, arg);
if (this.windowMain.win != null) {

View File

@ -49,7 +49,7 @@ export const SYSTEM_THEME_OBSERVABLE = new SafeInjectionToken<Observable<ThemeTy
"SYSTEM_THEME_OBSERVABLE",
);
export const DEFAULT_VAULT_TIMEOUT = new SafeInjectionToken<VaultTimeout>("DEFAULT_VAULT_TIMEOUT");
export const INTRAPROCESS_MESSAGING_SUBJECT = new SafeInjectionToken<Subject<Message<object>>>(
"INTRAPROCESS_MESSAGING_SUBJECT",
);
export const INTRAPROCESS_MESSAGING_SUBJECT = new SafeInjectionToken<
Subject<Message<Record<string, unknown>>>
>("INTRAPROCESS_MESSAGING_SUBJECT");
export const CLIENT_TYPE = new SafeInjectionToken<ClientType>("CLIENT_TYPE");

View File

@ -649,7 +649,7 @@ const safeProviders: SafeProvider[] = [
safeProvider({
provide: BroadcasterService,
useClass: DefaultBroadcasterService,
deps: [MessageSender, MessageListener],
deps: [MessageListener],
}),
safeProvider({
provide: VaultTimeoutSettingsServiceAbstraction,
@ -1165,17 +1165,19 @@ const safeProviders: SafeProvider[] = [
}),
safeProvider({
provide: INTRAPROCESS_MESSAGING_SUBJECT,
useFactory: () => new Subject<Message<object>>(),
useFactory: () => new Subject<Message<Record<string, unknown>>>(),
deps: [],
}),
safeProvider({
provide: MessageListener,
useFactory: (subject: Subject<Message<object>>) => new MessageListener(subject.asObservable()),
useFactory: (subject: Subject<Message<Record<string, unknown>>>) =>
new MessageListener(subject.asObservable()),
deps: [INTRAPROCESS_MESSAGING_SUBJECT],
}),
safeProvider({
provide: MessageSender,
useFactory: (subject: Subject<Message<object>>) => new SubjectMessageSender(subject),
useFactory: (subject: Subject<Message<Record<string, unknown>>>) =>
new SubjectMessageSender(subject),
deps: [INTRAPROCESS_MESSAGING_SUBJECT],
}),
safeProvider({

View File

@ -6,10 +6,6 @@ export interface MessageBase {
* @deprecated Use the observable from the appropriate service instead.
*/
export abstract class BroadcasterService {
/**
* @deprecated Use the observable from the appropriate service instead.
*/
abstract send(message: MessageBase, id?: string): void;
/**
* @deprecated Use the observable from the appropriate service instead.
*/

View File

@ -12,7 +12,7 @@ describe("helpers", () => {
});
it("can get the command from a message definition", () => {
const commandDefinition = new CommandDefinition<object>("myCommand");
const commandDefinition = new CommandDefinition<Record<string, unknown>>("myCommand");
const command = getCommand(commandDefinition);
@ -22,9 +22,9 @@ describe("helpers", () => {
describe("tag integration", () => {
it("can tag and identify as tagged", async () => {
const messagesSubject = new Subject<Message<object>>();
const messagesSubject = new Subject<Message<Record<string, unknown>>>();
const taggedMessages = messagesSubject.asObservable().pipe(tagAsExternal);
const taggedMessages = messagesSubject.asObservable().pipe(tagAsExternal());
const firstValuePromise = firstValueFrom(taggedMessages);
@ -39,7 +39,7 @@ describe("helpers", () => {
describe("isExternalMessage", () => {
it.each([null, { command: "myCommand", test: "object" }, undefined] as Message<
Record<string, unknown>
>[])("returns false when value is %s", (value: Message<object>) => {
>[])("returns false when value is %s", (value: Message<Record<string, unknown>>) => {
expect(isExternalMessage(value)).toBe(false);
});
});

View File

@ -1,8 +1,10 @@
import { MonoTypeOperatorFunction, map } from "rxjs";
import { map } from "rxjs";
import { Message, CommandDefinition } from "./types";
import { CommandDefinition } from "./types";
export const getCommand = (commandDefinition: CommandDefinition<object> | string) => {
export const getCommand = (
commandDefinition: CommandDefinition<Record<string, unknown>> | string,
) => {
if (typeof commandDefinition === "string") {
return commandDefinition;
} else {
@ -16,8 +18,8 @@ export const isExternalMessage = (message: Record<PropertyKey, unknown>) => {
return message?.[EXTERNAL_SOURCE_TAG] === true;
};
export const tagAsExternal: MonoTypeOperatorFunction<Message<object>> = map(
(message: Message<object>) => {
export const tagAsExternal = <T extends Record<PropertyKey, unknown>>() => {
return map((message: T) => {
return Object.assign(message, { [EXTERNAL_SOURCE_TAG]: true });
},
);
});
};

View File

@ -11,7 +11,7 @@ import { Message, CommandDefinition } from "./types";
* or vault data changes and those observables should be preferred over messaging.
*/
export class MessageListener {
constructor(private readonly messageStream: Observable<Message<object>>) {}
constructor(private readonly messageStream: Observable<Message<Record<string, unknown>>>) {}
/**
* A stream of all messages sent through the application. It does not contain type information for the
@ -28,7 +28,9 @@ export class MessageListener {
*
* @param commandDefinition The CommandDefinition containing the information about the message type you care about.
*/
messages$<T extends object>(commandDefinition: CommandDefinition<T>): Observable<T> {
messages$<T extends Record<string, unknown>>(
commandDefinition: CommandDefinition<T>,
): Observable<T> {
return this.allMessages$.pipe(
filter((msg) => msg?.command === commandDefinition.command),
) as Observable<T>;

View File

@ -3,9 +3,9 @@ import { CommandDefinition } from "./types";
class MultiMessageSender implements MessageSender {
constructor(private readonly innerMessageSenders: MessageSender[]) {}
send<T extends object>(
send<T extends Record<string, unknown>>(
commandDefinition: string | CommandDefinition<T>,
payload: object | T = {},
payload: Record<string, unknown> | T = {},
): void {
for (const messageSender of this.innerMessageSenders) {
messageSender.send(commandDefinition, payload);
@ -26,7 +26,10 @@ export abstract class MessageSender {
* @param commandDefinition
* @param payload
*/
abstract send<T extends object>(commandDefinition: CommandDefinition<T>, payload: T): void;
abstract send<T extends Record<string, unknown>>(
commandDefinition: CommandDefinition<T>,
payload: T,
): void;
/**
* A legacy method for sending messages in a non-type safe way.
@ -38,12 +41,12 @@ export abstract class MessageSender {
* @param payload Extra contextual information regarding the message. Be aware that this payload may
* be serialized and lose all prototype information.
*/
abstract send(command: string, payload?: object): void;
abstract send(command: string, payload?: Record<string, unknown>): void;
/** Implementation of the other two overloads, read their docs instead. */
abstract send<T extends object>(
abstract send<T extends Record<string, unknown>>(
commandDefinition: CommandDefinition<T> | string,
payload: T | object,
payload: T | Record<string, unknown>,
): void;
/**

View File

@ -5,11 +5,11 @@ import { MessageSender } from "./message.sender";
import { Message, CommandDefinition } from "./types";
export class SubjectMessageSender implements MessageSender {
constructor(private readonly messagesSubject: Subject<Message<object>>) {}
constructor(private readonly messagesSubject: Subject<Message<Record<string, unknown>>>) {}
send<T extends object>(
send<T extends Record<string, unknown>>(
commandDefinition: string | CommandDefinition<T>,
payload: object | T = {},
payload: Record<string, unknown> | T = {},
): void {
const command = getCommand(commandDefinition);
this.messagesSubject.next(Object.assign(payload ?? {}, { command: command }));

View File

@ -5,9 +5,9 @@ declare const tag: unique symbol;
* alonside `MessageSender` and `MessageListener` for providing a type
* safe(-ish) way of sending and receiving messages.
*/
export class CommandDefinition<T extends object> {
export class CommandDefinition<T extends Record<string, unknown>> {
[tag]: T;
constructor(readonly command: string) {}
}
export type Message<T extends object> = { command: string } & T;
export type Message<T extends Record<string, unknown>> = { command: string } & T;

View File

@ -1,7 +1,7 @@
import { Subscription } from "rxjs";
import { BroadcasterService, MessageBase } from "../abstractions/broadcaster.service";
import { MessageListener, MessageSender } from "../messaging";
import { MessageListener } from "../messaging";
/**
* Temporary implementation that just delegates to the message sender and message listener
@ -10,14 +10,7 @@ import { MessageListener, MessageSender } from "../messaging";
export class DefaultBroadcasterService implements BroadcasterService {
subscriptions = new Map<string, Subscription>();
constructor(
private readonly messageSender: MessageSender,
private readonly messageListener: MessageListener,
) {}
send(message: MessageBase, id?: string) {
this.messageSender.send(message?.command, message);
}
constructor(private readonly messageListener: MessageListener) {}
subscribe(id: string, messageCallback: (message: MessageBase) => void) {
this.subscriptions.set(