mirror of https://github.com/Fabio286/antares.git
perf: use fork() for the import process
This commit is contained in:
parent
1990d9a3d4
commit
573ac6d42e
|
@ -33,7 +33,7 @@ const lookup = {
|
||||||
*/
|
*/
|
||||||
export default function hexToBinary (hex) {
|
export default function hexToBinary (hex) {
|
||||||
let binary = '';
|
let binary = '';
|
||||||
for (let i = 0, len = hex.length; i < len; i++)
|
for (let i = 0; i < hex.length; i++)
|
||||||
binary += lookup[hex[i]];
|
binary += lookup[hex[i]];
|
||||||
|
|
||||||
return binary;
|
return binary;
|
||||||
|
|
|
@ -1,5 +1,13 @@
|
||||||
import { Duplex } from 'stream';
|
import { Duplex } from 'stream';
|
||||||
|
|
||||||
|
const chars = {
|
||||||
|
NEWLINE: 0x0A,
|
||||||
|
CARRIAGE_RETURN: 0x0D,
|
||||||
|
DOUBLE_QUOTE: 0x22,
|
||||||
|
QUOTE: 0x27,
|
||||||
|
BACKSLASH: 0x5C
|
||||||
|
};
|
||||||
|
|
||||||
export default class SqlParser extends Duplex {
|
export default class SqlParser extends Duplex {
|
||||||
constructor (opts) {
|
constructor (opts) {
|
||||||
opts = {
|
opts = {
|
||||||
|
@ -10,24 +18,21 @@ export default class SqlParser extends Duplex {
|
||||||
...opts
|
...opts
|
||||||
};
|
};
|
||||||
super(opts);
|
super(opts);
|
||||||
this._buffer = [];
|
this._buffer = Buffer.from([]);
|
||||||
this.encoding = opts.encoding;
|
this.encoding = opts.encoding;
|
||||||
this.delimiter = opts.delimiter;
|
this.delimiter = opts.delimiter;
|
||||||
|
|
||||||
this.isEscape = false;
|
this.isEscape = false;
|
||||||
this.currentQuote = '';
|
this.currentQuote = null;
|
||||||
this.isDelimiter = false;
|
this.isDelimiter = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
_write (chunk, encoding, next) {
|
_write (chunk, encoding, next) {
|
||||||
const str = chunk.toString(this.encoding);
|
for (const char of chunk) {
|
||||||
|
|
||||||
for (let i = 0; i < str.length; i++) {
|
|
||||||
const currentChar = str[i];
|
|
||||||
this.checkEscape();
|
this.checkEscape();
|
||||||
this._buffer.push(currentChar);
|
this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]);
|
||||||
// this.checkNewDelimiter(currentChar);
|
this.checkNewDelimiter(char);
|
||||||
this.checkQuote(currentChar);
|
this.checkQuote(char);
|
||||||
const query = this.getQuery();
|
const query = this.getQuery();
|
||||||
|
|
||||||
if (query)
|
if (query)
|
||||||
|
@ -39,33 +44,33 @@ export default class SqlParser extends Duplex {
|
||||||
|
|
||||||
checkEscape () {
|
checkEscape () {
|
||||||
if (this._buffer.length > 0) {
|
if (this._buffer.length > 0) {
|
||||||
this.isEscape = this._buffer[this._buffer.length - 1] === '\\'
|
this.isEscape = this._buffer[this._buffer.length - 1] === chars.BACKSLASH
|
||||||
? !this.isEscape
|
? !this.isEscape
|
||||||
: false;
|
: false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkNewDelimiter (char) {
|
checkNewDelimiter (char) {
|
||||||
if (this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === '') {
|
if (this._buffer.length === 9 && this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === null) {
|
||||||
this.isDelimiter = true;
|
this.isDelimiter = true;
|
||||||
this._buffer = [];
|
this._buffer = Buffer.from([]);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
const isNewLine = ['\n', '\r'].includes(char);
|
const isNewLine = [chars.NEWLINE, chars.CARRIAGE_RETURN].includes(char);
|
||||||
if (isNewLine && this.isDelimiter) {
|
if (isNewLine && this.isDelimiter) {
|
||||||
this.isDelimiter = false;
|
this.isDelimiter = false;
|
||||||
this.delimiter = this.parsedStr;
|
this.delimiter = this.parsedStr;
|
||||||
this._buffer = [];
|
this._buffer = Buffer.from([]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkQuote (char) {
|
checkQuote (char) {
|
||||||
const isQuote = !this.isEscape && ['"', '\''].includes(char);
|
const isQuote = !this.isEscape && [chars.QUOTE, chars.DOUBLE_QUOTE].includes(char);
|
||||||
if (isQuote && this.currentQuote === char)
|
if (isQuote && this.currentQuote === char)
|
||||||
this.currentQuote = '';
|
this.currentQuote = null;
|
||||||
|
|
||||||
else if (isQuote && this.currentQuote === '')
|
else if (isQuote && this.currentQuote === null)
|
||||||
this.currentQuote = char;
|
this.currentQuote = char;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,20 +80,20 @@ export default class SqlParser extends Duplex {
|
||||||
|
|
||||||
let query = false;
|
let query = false;
|
||||||
let demiliterFound = false;
|
let demiliterFound = false;
|
||||||
if (this.currentQuote === '' && this._buffer.length >= this.delimiter.length)
|
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
|
||||||
demiliterFound = this.parsedStr.slice(-this.delimiter.length) === this.delimiter;
|
demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter;
|
||||||
|
|
||||||
if (demiliterFound) {
|
if (demiliterFound) {
|
||||||
this._buffer.splice(-this.delimiter.length, this.delimiter.length);
|
this._buffer = this._buffer.slice(0, this._buffer.length - 1);
|
||||||
query = this.parsedStr;
|
query = this.parsedStr;
|
||||||
this._buffer = [];
|
this._buffer = Buffer.from([]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
get parsedStr () {
|
get parsedStr () {
|
||||||
return this._buffer.join('').trim();
|
return this._buffer.toString(this.encoding).trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
_read (size) {
|
_read (size) {
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
import { fork } from 'child_process';
|
import { fork } from 'child_process';
|
||||||
import { ipcMain, dialog, Notification } from 'electron';
|
import { ipcMain, dialog } from 'electron';
|
||||||
|
|
||||||
// @TODO: need some factories
|
// @TODO: need some factories
|
||||||
import MysqlImporter from '../libs/importers/sql/MysqlImporter';
|
|
||||||
const isDevelopment = process.env.NODE_ENV !== 'production';
|
const isDevelopment = process.env.NODE_ENV !== 'production';
|
||||||
|
|
||||||
export default connections => {
|
export default connections => {
|
||||||
|
@ -265,54 +264,48 @@ export default connections => {
|
||||||
ipcMain.handle('import-sql', async (event, options) => {
|
ipcMain.handle('import-sql', async (event, options) => {
|
||||||
if (importer !== null) return;
|
if (importer !== null) return;
|
||||||
|
|
||||||
switch (options.type) {
|
|
||||||
case 'mysql':
|
|
||||||
case 'maria':
|
|
||||||
importer = new MysqlImporter(connections[options.uid], options);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
return {
|
|
||||||
status: 'error',
|
|
||||||
response: `${type} importer not aviable`
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
importer.once('error', err => {
|
(async () => {
|
||||||
reject(err);
|
const dbConfig = await connections[options.uid].getDbConfig();
|
||||||
});
|
|
||||||
|
|
||||||
importer.once('end', () => {
|
// Init importer process
|
||||||
resolve({ cancelled: importer.isCancelled });
|
importer = fork(isDevelopment ? './dist/importer.js' : path.resolve(__dirname, './importer.js'));
|
||||||
});
|
importer.send({
|
||||||
|
type: 'init',
|
||||||
|
dbConfig,
|
||||||
|
options
|
||||||
|
});
|
||||||
|
|
||||||
importer.on('progress', state => {
|
// Importer message listener
|
||||||
event.sender.send('import-progress', state);
|
importer.on('message', ({ type, payload }) => {
|
||||||
});
|
switch (type) {
|
||||||
|
case 'import-progress':
|
||||||
|
event.sender.send('import-progress', payload);
|
||||||
|
break;
|
||||||
|
case 'end':
|
||||||
|
importer.kill();
|
||||||
|
importer = null;
|
||||||
|
resolve({ status: 'success', response: payload });
|
||||||
|
break;
|
||||||
|
case 'cancel':
|
||||||
|
importer.kill();
|
||||||
|
importer = null;
|
||||||
|
resolve({ status: 'error', response: 'Operation cancelled' });
|
||||||
|
break;
|
||||||
|
case 'error':
|
||||||
|
importer.kill();
|
||||||
|
importer = null;
|
||||||
|
resolve({ status: 'error', response: payload });
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
importer.run();
|
importer.on('exit', code => {
|
||||||
})
|
importer = null;
|
||||||
.then(response => {
|
resolve({ status: 'error', response: `Operation ended with code: ${code}` });
|
||||||
if (!response.cancelled) {
|
});
|
||||||
new Notification({
|
})();
|
||||||
title: 'Import finished',
|
});
|
||||||
body: `Finished importing ${path.basename(options.file)}`
|
|
||||||
}).show();
|
|
||||||
}
|
|
||||||
return { status: 'success', response };
|
|
||||||
})
|
|
||||||
.catch(err => {
|
|
||||||
new Notification({
|
|
||||||
title: 'Import error',
|
|
||||||
body: err.toString()
|
|
||||||
}).show();
|
|
||||||
|
|
||||||
return { status: 'error', response: err.toString() };
|
|
||||||
})
|
|
||||||
.finally(() => {
|
|
||||||
importer.removeAllListeners();
|
|
||||||
importer = null;
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
ipcMain.handle('abort-import-sql', async event => {
|
ipcMain.handle('abort-import-sql', async event => {
|
||||||
|
@ -329,7 +322,7 @@ export default connections => {
|
||||||
|
|
||||||
if (result.response === 1) {
|
if (result.response === 1) {
|
||||||
willAbort = true;
|
willAbort = true;
|
||||||
importer.cancel();
|
importer.send({ type: 'cancel' });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,56 +5,67 @@ import { BaseImporter } from '../BaseImporter';
|
||||||
export default class MysqlImporter extends BaseImporter {
|
export default class MysqlImporter extends BaseImporter {
|
||||||
constructor (client, options) {
|
constructor (client, options) {
|
||||||
super(options);
|
super(options);
|
||||||
|
this._client = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
async import () {
|
async import () {
|
||||||
const { size: totalFileSize } = await fs.stat(this._options.file);
|
try {
|
||||||
const parser = new SqlParser();
|
const { size: totalFileSize } = await fs.stat(this._options.file);
|
||||||
let readPosition = 0;
|
const parser = new SqlParser();
|
||||||
let queryCount = 0;
|
let readPosition = 0;
|
||||||
|
let queryCount = 0;
|
||||||
|
|
||||||
this.emitUpdate({
|
this.emitUpdate({
|
||||||
fileSize: totalFileSize,
|
fileSize: totalFileSize,
|
||||||
readPosition: 0,
|
readPosition: 0,
|
||||||
percentage: 0
|
percentage: 0
|
||||||
});
|
|
||||||
|
|
||||||
// 1. detect file encoding
|
|
||||||
// 2. set fh encoding
|
|
||||||
// 3. detect sql mode
|
|
||||||
// 4. restore sql mode in case of exception
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
this._fileHandler.pipe(parser);
|
|
||||||
|
|
||||||
parser.on('error', (err) => {
|
|
||||||
console.log(err);
|
|
||||||
reject(err);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
parser.on('finish', () => {
|
await this._client.use(this._options.schema);
|
||||||
console.log('TOTAL QUERIES', queryCount);
|
|
||||||
console.log('import end');
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
|
|
||||||
parser.on('data', (q) => {
|
// 1. detect file encoding
|
||||||
console.log('query: ', q);
|
// 2. set fh encoding
|
||||||
queryCount++;
|
// 3. detect sql mode
|
||||||
});
|
// 4. restore sql mode in case of exception
|
||||||
|
|
||||||
this._fileHandler.on('data', (chunk) => {
|
return new Promise((resolve, reject) => {
|
||||||
readPosition += chunk.length;
|
this._fileHandler.pipe(parser);
|
||||||
this.emitUpdate({
|
|
||||||
readPosition,
|
parser.on('error', (err) => {
|
||||||
percentage: readPosition / totalFileSize * 100
|
console.log(err);
|
||||||
|
reject(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
parser.on('finish', () => {
|
||||||
|
console.log('TOTAL QUERIES', queryCount);
|
||||||
|
console.log('import end');
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
|
||||||
|
parser.on('data', async (query) => {
|
||||||
|
console.log('query: ', query);
|
||||||
|
parser.pause();
|
||||||
|
await this._client.raw(query);
|
||||||
|
parser.resume();
|
||||||
|
queryCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
this._fileHandler.on('data', (chunk) => {
|
||||||
|
readPosition += chunk.length;
|
||||||
|
this.emitUpdate({
|
||||||
|
readPosition,
|
||||||
|
percentage: readPosition / totalFileSize * 100
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
this._fileHandler.on('error', (err) => {
|
||||||
|
console.log(err);
|
||||||
|
reject(err);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
}
|
||||||
this._fileHandler.on('error', (e) => {
|
catch (err) {
|
||||||
console.log(e);
|
console.log(err);
|
||||||
reject(err);
|
}
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import { ClientsFactory } from '../libs/ClientsFactory';
|
import { ClientsFactory } from '../libs/ClientsFactory';
|
||||||
// TODO: exporter factory class
|
|
||||||
import MysqlExporter from '../libs/exporters/sql/MysqlExporter.js';
|
import MysqlExporter from '../libs/exporters/sql/MysqlExporter.js';
|
||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
let exporter;
|
let exporter;
|
||||||
|
@ -13,6 +12,7 @@ process.on('message', async ({ type, client, tables, options }) => {
|
||||||
});
|
});
|
||||||
await connection.connect();
|
await connection.connect();
|
||||||
|
|
||||||
|
// TODO: exporter factory class
|
||||||
switch (client.name) {
|
switch (client.name) {
|
||||||
case 'mysql':
|
case 'mysql':
|
||||||
case 'maria':
|
case 'maria':
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
import { ClientsFactory } from '../libs/ClientsFactory';
|
||||||
|
import fs from 'fs';
|
||||||
|
import MysqlImporter from '../libs/importers/sql/MysqlImporter';
|
||||||
|
let importer;
|
||||||
|
|
||||||
|
process.on('message', async ({ type, dbConfig, options }) => {
|
||||||
|
if (type === 'init') {
|
||||||
|
const connection = await ClientsFactory.getConnection({
|
||||||
|
client: options.type,
|
||||||
|
params: dbConfig,
|
||||||
|
poolSize: 1
|
||||||
|
});
|
||||||
|
await connection.connect();
|
||||||
|
|
||||||
|
// TODO: importer factory class
|
||||||
|
switch (options.type) {
|
||||||
|
case 'mysql':
|
||||||
|
case 'maria':
|
||||||
|
importer = new MysqlImporter(connection, options);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
process.send({
|
||||||
|
type: 'error',
|
||||||
|
payload: `"${options.type}" importer not aviable`
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
importer.once('error', err => {
|
||||||
|
console.error(err);
|
||||||
|
process.send({
|
||||||
|
type: 'error',
|
||||||
|
payload: err.toString()
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
importer.once('end', () => {
|
||||||
|
process.send({
|
||||||
|
type: 'end',
|
||||||
|
payload: { cancelled: importer.isCancelled }
|
||||||
|
});
|
||||||
|
connection.destroy();
|
||||||
|
});
|
||||||
|
|
||||||
|
importer.once('cancel', () => {
|
||||||
|
fs.unlinkSync(importer.outputFile);
|
||||||
|
process.send({ type: 'cancel' });
|
||||||
|
});
|
||||||
|
|
||||||
|
importer.on('progress', state => {
|
||||||
|
process.send({
|
||||||
|
type: 'import-progress',
|
||||||
|
payload: state
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
importer.run();
|
||||||
|
}
|
||||||
|
else if (type === 'cancel')
|
||||||
|
importer.cancel();
|
||||||
|
});
|
|
@ -86,11 +86,22 @@ export default {
|
||||||
const params = {
|
const params = {
|
||||||
uid,
|
uid,
|
||||||
type: client,
|
type: client,
|
||||||
|
schema: this.selectedSchema,
|
||||||
file: sqlFile
|
file: sqlFile
|
||||||
};
|
};
|
||||||
|
|
||||||
const result = await Schema.import(params);
|
try {
|
||||||
console.log(result);
|
const { status, response } = await Schema.import(params);
|
||||||
|
if (status === 'success')
|
||||||
|
this.progressStatus = response.cancelled ? this.$t('word.aborted') : this.$t('word.completed');
|
||||||
|
else {
|
||||||
|
this.progressStatus = response;
|
||||||
|
this.addNotification({ status: 'error', message: response });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (err) {
|
||||||
|
this.addNotification({ status: 'error', message: err.stack });
|
||||||
|
}
|
||||||
|
|
||||||
this.isImporting = false;
|
this.isImporting = false;
|
||||||
},
|
},
|
||||||
|
|
|
@ -13,7 +13,8 @@ const config = {
|
||||||
mode: process.env.NODE_ENV,
|
mode: process.env.NODE_ENV,
|
||||||
devtool: isDevMode ? 'eval-source-map' : false,
|
devtool: isDevMode ? 'eval-source-map' : false,
|
||||||
entry: {
|
entry: {
|
||||||
exporter: path.join(__dirname, './src/main/workers/exporter.js')
|
exporter: path.join(__dirname, './src/main/workers/exporter.js'),
|
||||||
|
importer: path.join(__dirname, './src/main/workers/importer.js')
|
||||||
},
|
},
|
||||||
target: 'node',
|
target: 'node',
|
||||||
output: {
|
output: {
|
||||||
|
|
Loading…
Reference in New Issue