From 03be777c2ab8926dcbbea15f1d6d31413d65984a Mon Sep 17 00:00:00 2001 From: Fabio286 Date: Sat, 2 Dec 2023 11:35:20 +0100 Subject: [PATCH] refactor: worker threads to import sql dump instead of process --- src/main/ipc-handlers/schema.ts | 32 +++++++++++--------------------- src/main/workers/importer.ts | 23 +++++++++++++---------- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/src/main/ipc-handlers/schema.ts b/src/main/ipc-handlers/schema.ts index 3cc012c7..5b73227b 100644 --- a/src/main/ipc-handlers/schema.ts +++ b/src/main/ipc-handlers/schema.ts @@ -1,19 +1,14 @@ -import { ChildProcess, fork } from 'child_process'; import * as antares from 'common/interfaces/antares'; import * as workers from 'common/interfaces/workers'; import { dialog, ipcMain } from 'electron'; import * as fs from 'fs'; -import * as path from 'path'; import { Worker } from 'worker_threads'; import { validateSender } from '../libs/misc/validateSender'; -const isDevelopment = process.env.NODE_ENV !== 'production'; -const isFlatpak = process.platform === 'linux' && process.env.DISTRIBUTION === 'flatpak'; - export default (connections: {[key: string]: antares.Client}) => { let exporter: Worker = null; - let importer: ChildProcess = null; + let importer: Worker = null; ipcMain.handle('create-schema', async (event, params) => { if (!validateSender(event.senderFrame)) return { status: 'error', response: 'Unauthorized process' }; @@ -310,25 +305,20 @@ export default (connections: {[key: string]: antares.Client}) => { if (!validateSender(event.senderFrame)) return { status: 'error', response: 'Unauthorized process' }; if (importer !== null) { - importer.kill(); + importer.terminate(); return; } return new Promise((resolve/*, reject */) => { (async () => { - if (isFlatpak) { - resolve({ status: 'warning', response: 'Temporarily unavailable on Flatpak' }); - return; - } - const dbConfig = await connections[options.uid].getDbConfig(); - // Init importer process - importer = fork(isDevelopment ? './dist/importer.js' : path.resolve(__dirname, './importer.js'), [], { - execArgv: isDevelopment ? ['--inspect=9224'] : undefined - }); + // Init importer thread + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + importer = new Worker(new URL('../workers/importer', import.meta.url)); - importer.send({ + importer.postMessage({ type: 'init', dbConfig, options @@ -345,18 +335,18 @@ export default (connections: {[key: string]: antares.Client}) => { break; case 'end': setTimeout(() => { // Ensures that writing process has finished - importer?.kill(); + importer?.terminate(); importer = null; }, 2000); resolve({ status: 'success', response: payload }); break; case 'cancel': - importer.kill(); + importer.terminate(); importer = null; resolve({ status: 'error', response: 'Operation cancelled' }); break; case 'error': - importer.kill(); + importer.terminate(); importer = null; resolve({ status: 'error', response: payload }); break; @@ -387,7 +377,7 @@ export default (connections: {[key: string]: antares.Client}) => { if (result.response === 1) { willAbort = true; - importer.send({ type: 'cancel' }); + importer.postMessage({ type: 'cancel' }); } } diff --git a/src/main/workers/importer.ts b/src/main/workers/importer.ts index 2ef638f2..5f565988 100644 --- a/src/main/workers/importer.ts +++ b/src/main/workers/importer.ts @@ -4,6 +4,7 @@ import { ImportOptions } from 'common/interfaces/importer'; import * as log from 'electron-log/main'; import * as mysql from 'mysql2'; import * as pg from 'pg'; +import { parentPort } from 'worker_threads'; import { MySQLClient } from '../libs/clients/MySQLClient'; import { PostgreSQLClient } from '../libs/clients/PostgreSQLClient'; @@ -15,14 +16,14 @@ let importer: antares.Importer; log.transports.file.fileName = 'workers.log'; log.errorHandler.startCatching(); -// eslint-disable-next-line @typescript-eslint/no-explicit-any -process.on('message', async ({ type, dbConfig, options }: { +const importHandler = async (data: { type: string; dbConfig: mysql.ConnectionOptions & { schema: string; ssl?: mysql.SslOptions; ssh?: SSHConfig; readonly: boolean } | pg.ClientConfig & { schema: string; ssl?: mysql.SslOptions; ssh?: SSHConfig; readonly: boolean } | { databasePath: string; readonly: boolean }; options: ImportOptions; }) => { + const { type, dbConfig, options } = data; if (type === 'init') { try { const connection = await ClientsFactory.getClient({ @@ -45,7 +46,7 @@ process.on('message', async ({ type, dbConfig, options }: { importer = new PostgreSQLImporter(pool as unknown as pg.PoolClient, options); break; default: - process.send({ + parentPort.postMessage({ type: 'error', payload: `"${options.type}" importer not aviable` }); @@ -54,32 +55,32 @@ process.on('message', async ({ type, dbConfig, options }: { importer.once('error', err => { log.error(err.toString()); - process.send({ + parentPort.postMessage({ type: 'error', payload: err.toString() }); }); importer.once('end', () => { - process.send({ + parentPort.postMessage({ type: 'end', payload: { cancelled: importer.isCancelled } }); }); importer.once('cancel', () => { - process.send({ type: 'cancel' }); + parentPort.postMessage({ type: 'cancel' }); }); importer.on('progress', state => { - process.send({ + parentPort.postMessage({ type: 'import-progress', payload: state }); }); importer.on('query-error', state => { - process.send({ + parentPort.postMessage({ type: 'query-error', payload: state }); @@ -89,7 +90,7 @@ process.on('message', async ({ type, dbConfig, options }: { } catch (err) { log.error(err.toString()); - process.send({ + parentPort.postMessage({ type: 'error', payload: err.toString() }); @@ -97,4 +98,6 @@ process.on('message', async ({ type, dbConfig, options }: { } else if (type === 'cancel') importer.cancel(); -}); +}; + +parentPort.on('message', importHandler);