From 573ac6d42ef833f250d102e5b30ae6cf5877f330 Mon Sep 17 00:00:00 2001 From: Fabio Di Stasio Date: Thu, 24 Feb 2022 13:14:57 +0100 Subject: [PATCH] perf: use fork() for the import process --- src/common/libs/hexToBinary.js | 2 +- src/common/libs/sqlParser.js | 49 +++++----- src/main/ipc-handlers/schema.js | 87 ++++++++--------- src/main/libs/importers/sql/MysqlImporter.js | 93 +++++++++++-------- src/main/workers/exporter.js | 2 +- src/main/workers/importer.js | 61 ++++++++++++ src/renderer/components/ModalImportSchema.vue | 15 ++- webpack.workers.config.js | 3 +- 8 files changed, 197 insertions(+), 115 deletions(-) create mode 100644 src/main/workers/importer.js diff --git a/src/common/libs/hexToBinary.js b/src/common/libs/hexToBinary.js index 38c59e5d..305bcf7e 100644 --- a/src/common/libs/hexToBinary.js +++ b/src/common/libs/hexToBinary.js @@ -33,7 +33,7 @@ const lookup = { */ export default function hexToBinary (hex) { let binary = ''; - for (let i = 0, len = hex.length; i < len; i++) + for (let i = 0; i < hex.length; i++) binary += lookup[hex[i]]; return binary; diff --git a/src/common/libs/sqlParser.js b/src/common/libs/sqlParser.js index 4da17e7c..340c16ed 100644 --- a/src/common/libs/sqlParser.js +++ b/src/common/libs/sqlParser.js @@ -1,5 +1,13 @@ import { Duplex } from 'stream'; +const chars = { + NEWLINE: 0x0A, + CARRIAGE_RETURN: 0x0D, + DOUBLE_QUOTE: 0x22, + QUOTE: 0x27, + BACKSLASH: 0x5C +}; + export default class SqlParser extends Duplex { constructor (opts) { opts = { @@ -10,24 +18,21 @@ export default class SqlParser extends Duplex { ...opts }; super(opts); - this._buffer = []; + this._buffer = Buffer.from([]); this.encoding = opts.encoding; this.delimiter = opts.delimiter; this.isEscape = false; - this.currentQuote = ''; + this.currentQuote = null; this.isDelimiter = false; } _write (chunk, encoding, next) { - const str = chunk.toString(this.encoding); - - for (let i = 0; i < str.length; i++) { - const currentChar = str[i]; + for (const char of chunk) { this.checkEscape(); - this._buffer.push(currentChar); - // this.checkNewDelimiter(currentChar); - this.checkQuote(currentChar); + this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]); + this.checkNewDelimiter(char); + this.checkQuote(char); const query = this.getQuery(); if (query) @@ -39,33 +44,33 @@ export default class SqlParser extends Duplex { checkEscape () { if (this._buffer.length > 0) { - this.isEscape = this._buffer[this._buffer.length - 1] === '\\' + this.isEscape = this._buffer[this._buffer.length - 1] === chars.BACKSLASH ? !this.isEscape : false; } } checkNewDelimiter (char) { - if (this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === '') { + if (this._buffer.length === 9 && this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === null) { this.isDelimiter = true; - this._buffer = []; + this._buffer = Buffer.from([]); } else { - const isNewLine = ['\n', '\r'].includes(char); + const isNewLine = [chars.NEWLINE, chars.CARRIAGE_RETURN].includes(char); if (isNewLine && this.isDelimiter) { this.isDelimiter = false; this.delimiter = this.parsedStr; - this._buffer = []; + this._buffer = Buffer.from([]); } } } checkQuote (char) { - const isQuote = !this.isEscape && ['"', '\''].includes(char); + const isQuote = !this.isEscape && [chars.QUOTE, chars.DOUBLE_QUOTE].includes(char); if (isQuote && this.currentQuote === char) - this.currentQuote = ''; + this.currentQuote = null; - else if (isQuote && this.currentQuote === '') + else if (isQuote && this.currentQuote === null) this.currentQuote = char; } @@ -75,20 +80,20 @@ export default class SqlParser extends Duplex { let query = false; let demiliterFound = false; - if (this.currentQuote === '' && this._buffer.length >= this.delimiter.length) - demiliterFound = this.parsedStr.slice(-this.delimiter.length) === this.delimiter; + if (this.currentQuote === null && this._buffer.length >= this.delimiter.length) + demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter; if (demiliterFound) { - this._buffer.splice(-this.delimiter.length, this.delimiter.length); + this._buffer = this._buffer.slice(0, this._buffer.length - 1); query = this.parsedStr; - this._buffer = []; + this._buffer = Buffer.from([]); } return query; } get parsedStr () { - return this._buffer.join('').trim(); + return this._buffer.toString(this.encoding).trim(); } _read (size) { diff --git a/src/main/ipc-handlers/schema.js b/src/main/ipc-handlers/schema.js index b87a0262..f9032b93 100644 --- a/src/main/ipc-handlers/schema.js +++ b/src/main/ipc-handlers/schema.js @@ -1,10 +1,9 @@ import fs from 'fs'; import path from 'path'; import { fork } from 'child_process'; -import { ipcMain, dialog, Notification } from 'electron'; +import { ipcMain, dialog } from 'electron'; // @TODO: need some factories -import MysqlImporter from '../libs/importers/sql/MysqlImporter'; const isDevelopment = process.env.NODE_ENV !== 'production'; export default connections => { @@ -265,54 +264,48 @@ export default connections => { ipcMain.handle('import-sql', async (event, options) => { if (importer !== null) return; - switch (options.type) { - case 'mysql': - case 'maria': - importer = new MysqlImporter(connections[options.uid], options); - break; - default: - return { - status: 'error', - response: `${type} importer not aviable` - }; - } - return new Promise((resolve, reject) => { - importer.once('error', err => { - reject(err); - }); + (async () => { + const dbConfig = await connections[options.uid].getDbConfig(); - importer.once('end', () => { - resolve({ cancelled: importer.isCancelled }); - }); + // Init importer process + importer = fork(isDevelopment ? './dist/importer.js' : path.resolve(__dirname, './importer.js')); + importer.send({ + type: 'init', + dbConfig, + options + }); - importer.on('progress', state => { - event.sender.send('import-progress', state); - }); + // Importer message listener + importer.on('message', ({ type, payload }) => { + switch (type) { + case 'import-progress': + event.sender.send('import-progress', payload); + break; + case 'end': + importer.kill(); + importer = null; + resolve({ status: 'success', response: payload }); + break; + case 'cancel': + importer.kill(); + importer = null; + resolve({ status: 'error', response: 'Operation cancelled' }); + break; + case 'error': + importer.kill(); + importer = null; + resolve({ status: 'error', response: payload }); + break; + } + }); - importer.run(); - }) - .then(response => { - if (!response.cancelled) { - new Notification({ - title: 'Import finished', - body: `Finished importing ${path.basename(options.file)}` - }).show(); - } - return { status: 'success', response }; - }) - .catch(err => { - new Notification({ - title: 'Import error', - body: err.toString() - }).show(); - - return { status: 'error', response: err.toString() }; - }) - .finally(() => { - importer.removeAllListeners(); - importer = null; - }); + importer.on('exit', code => { + importer = null; + resolve({ status: 'error', response: `Operation ended with code: ${code}` }); + }); + })(); + }); }); ipcMain.handle('abort-import-sql', async event => { @@ -329,7 +322,7 @@ export default connections => { if (result.response === 1) { willAbort = true; - importer.cancel(); + importer.send({ type: 'cancel' }); } } diff --git a/src/main/libs/importers/sql/MysqlImporter.js b/src/main/libs/importers/sql/MysqlImporter.js index 9e946074..4c556c65 100644 --- a/src/main/libs/importers/sql/MysqlImporter.js +++ b/src/main/libs/importers/sql/MysqlImporter.js @@ -5,56 +5,67 @@ import { BaseImporter } from '../BaseImporter'; export default class MysqlImporter extends BaseImporter { constructor (client, options) { super(options); + this._client = client; } async import () { - const { size: totalFileSize } = await fs.stat(this._options.file); - const parser = new SqlParser(); - let readPosition = 0; - let queryCount = 0; + try { + const { size: totalFileSize } = await fs.stat(this._options.file); + const parser = new SqlParser(); + let readPosition = 0; + let queryCount = 0; - this.emitUpdate({ - fileSize: totalFileSize, - readPosition: 0, - percentage: 0 - }); - - // 1. detect file encoding - // 2. set fh encoding - // 3. detect sql mode - // 4. restore sql mode in case of exception - - return new Promise((resolve, reject) => { - this._fileHandler.pipe(parser); - - parser.on('error', (err) => { - console.log(err); - reject(err); + this.emitUpdate({ + fileSize: totalFileSize, + readPosition: 0, + percentage: 0 }); - parser.on('finish', () => { - console.log('TOTAL QUERIES', queryCount); - console.log('import end'); - resolve(); - }); + await this._client.use(this._options.schema); - parser.on('data', (q) => { - console.log('query: ', q); - queryCount++; - }); + // 1. detect file encoding + // 2. set fh encoding + // 3. detect sql mode + // 4. restore sql mode in case of exception - this._fileHandler.on('data', (chunk) => { - readPosition += chunk.length; - this.emitUpdate({ - readPosition, - percentage: readPosition / totalFileSize * 100 + return new Promise((resolve, reject) => { + this._fileHandler.pipe(parser); + + parser.on('error', (err) => { + console.log(err); + reject(err); + }); + + parser.on('finish', () => { + console.log('TOTAL QUERIES', queryCount); + console.log('import end'); + resolve(); + }); + + parser.on('data', async (query) => { + console.log('query: ', query); + parser.pause(); + await this._client.raw(query); + parser.resume(); + queryCount++; + }); + + this._fileHandler.on('data', (chunk) => { + readPosition += chunk.length; + this.emitUpdate({ + readPosition, + percentage: readPosition / totalFileSize * 100 + }); + }); + + this._fileHandler.on('error', (err) => { + console.log(err); + reject(err); }); }); - - this._fileHandler.on('error', (e) => { - console.log(e); - reject(err); - }); - }); + } + catch (err) { + console.log(err); + } } } diff --git a/src/main/workers/exporter.js b/src/main/workers/exporter.js index 19dea14c..3033967c 100644 --- a/src/main/workers/exporter.js +++ b/src/main/workers/exporter.js @@ -1,5 +1,4 @@ import { ClientsFactory } from '../libs/ClientsFactory'; -// TODO: exporter factory class import MysqlExporter from '../libs/exporters/sql/MysqlExporter.js'; import fs from 'fs'; let exporter; @@ -13,6 +12,7 @@ process.on('message', async ({ type, client, tables, options }) => { }); await connection.connect(); + // TODO: exporter factory class switch (client.name) { case 'mysql': case 'maria': diff --git a/src/main/workers/importer.js b/src/main/workers/importer.js new file mode 100644 index 00000000..14eb412a --- /dev/null +++ b/src/main/workers/importer.js @@ -0,0 +1,61 @@ +import { ClientsFactory } from '../libs/ClientsFactory'; +import fs from 'fs'; +import MysqlImporter from '../libs/importers/sql/MysqlImporter'; +let importer; + +process.on('message', async ({ type, dbConfig, options }) => { + if (type === 'init') { + const connection = await ClientsFactory.getConnection({ + client: options.type, + params: dbConfig, + poolSize: 1 + }); + await connection.connect(); + + // TODO: importer factory class + switch (options.type) { + case 'mysql': + case 'maria': + importer = new MysqlImporter(connection, options); + break; + default: + process.send({ + type: 'error', + payload: `"${options.type}" importer not aviable` + }); + return; + } + + importer.once('error', err => { + console.error(err); + process.send({ + type: 'error', + payload: err.toString() + }); + }); + + importer.once('end', () => { + process.send({ + type: 'end', + payload: { cancelled: importer.isCancelled } + }); + connection.destroy(); + }); + + importer.once('cancel', () => { + fs.unlinkSync(importer.outputFile); + process.send({ type: 'cancel' }); + }); + + importer.on('progress', state => { + process.send({ + type: 'import-progress', + payload: state + }); + }); + + importer.run(); + } + else if (type === 'cancel') + importer.cancel(); +}); diff --git a/src/renderer/components/ModalImportSchema.vue b/src/renderer/components/ModalImportSchema.vue index ca249a28..39c58130 100644 --- a/src/renderer/components/ModalImportSchema.vue +++ b/src/renderer/components/ModalImportSchema.vue @@ -86,11 +86,22 @@ export default { const params = { uid, type: client, + schema: this.selectedSchema, file: sqlFile }; - const result = await Schema.import(params); - console.log(result); + try { + const { status, response } = await Schema.import(params); + if (status === 'success') + this.progressStatus = response.cancelled ? this.$t('word.aborted') : this.$t('word.completed'); + else { + this.progressStatus = response; + this.addNotification({ status: 'error', message: response }); + } + } + catch (err) { + this.addNotification({ status: 'error', message: err.stack }); + } this.isImporting = false; }, diff --git a/webpack.workers.config.js b/webpack.workers.config.js index 9e457e75..0a71d2ef 100644 --- a/webpack.workers.config.js +++ b/webpack.workers.config.js @@ -13,7 +13,8 @@ const config = { mode: process.env.NODE_ENV, devtool: isDevMode ? 'eval-source-map' : false, entry: { - exporter: path.join(__dirname, './src/main/workers/exporter.js') + exporter: path.join(__dirname, './src/main/workers/exporter.js'), + importer: path.join(__dirname, './src/main/workers/importer.js') }, target: 'node', output: {