mirror of
https://github.com/Fabio286/antares.git
synced 2025-02-17 12:10:39 +01:00
perf(MySQL): import performance improvement
This commit is contained in:
parent
b4af645941
commit
f444746f46
@ -1,13 +1,5 @@
|
|||||||
import { Transform } from 'stream';
|
import { Transform } from 'stream';
|
||||||
|
|
||||||
const chars = {
|
|
||||||
NEWLINE: 0x0A,
|
|
||||||
CARRIAGE_RETURN: 0x0D,
|
|
||||||
DOUBLE_QUOTE: 0x22,
|
|
||||||
QUOTE: 0x27,
|
|
||||||
BACKSLASH: 0x5C
|
|
||||||
};
|
|
||||||
|
|
||||||
export default class SqlParser extends Transform {
|
export default class SqlParser extends Transform {
|
||||||
constructor (opts) {
|
constructor (opts) {
|
||||||
opts = {
|
opts = {
|
||||||
@ -18,7 +10,9 @@ export default class SqlParser extends Transform {
|
|||||||
...opts
|
...opts
|
||||||
};
|
};
|
||||||
super(opts);
|
super(opts);
|
||||||
this._buffer = Buffer.from([]);
|
this._buffer = '';
|
||||||
|
this._lastChar = '';
|
||||||
|
this._last9Chars = '';
|
||||||
this.encoding = opts.encoding;
|
this.encoding = opts.encoding;
|
||||||
this.delimiter = opts.delimiter;
|
this.delimiter = opts.delimiter;
|
||||||
|
|
||||||
@ -28,9 +22,15 @@ export default class SqlParser extends Transform {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_transform (chunk, encoding, next) {
|
_transform (chunk, encoding, next) {
|
||||||
for (const char of chunk) {
|
for (const char of chunk.toString(this.encoding)) {
|
||||||
this.checkEscape();
|
this.checkEscape();
|
||||||
this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]);
|
this._buffer += char;
|
||||||
|
this._lastChar = char;
|
||||||
|
this._last9Chars += char.trim().toLocaleLowerCase();
|
||||||
|
|
||||||
|
if (this._last9Chars.length > 9)
|
||||||
|
this._last9Chars = this._last9Chars.slice(-9);
|
||||||
|
|
||||||
this.checkNewDelimiter(char);
|
this.checkNewDelimiter(char);
|
||||||
this.checkQuote(char);
|
this.checkQuote(char);
|
||||||
const query = this.getQuery();
|
const query = this.getQuery();
|
||||||
@ -38,35 +38,34 @@ export default class SqlParser extends Transform {
|
|||||||
if (query)
|
if (query)
|
||||||
this.push(query);
|
this.push(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
checkEscape () {
|
checkEscape () {
|
||||||
if (this._buffer.length > 0) {
|
if (this._buffer.length > 0) {
|
||||||
this.isEscape = this._buffer[this._buffer.length - 1] === chars.BACKSLASH
|
this.isEscape = this._lastChar === '\\'
|
||||||
? !this.isEscape
|
? !this.isEscape
|
||||||
: false;
|
: false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkNewDelimiter (char) {
|
checkNewDelimiter (char) {
|
||||||
if (this._buffer.length === 9 && this.parsedStr.toLowerCase() === 'delimiter' && this.currentQuote === null) {
|
if (this.currentQuote === null && this._last9Chars === 'delimiter') {
|
||||||
this.isDelimiter = true;
|
this.isDelimiter = true;
|
||||||
this._buffer = Buffer.from([]);
|
this._buffer = '';
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
const isNewLine = [chars.NEWLINE, chars.CARRIAGE_RETURN].includes(char);
|
const isNewLine = char === '\n' || char === '\r';
|
||||||
if (isNewLine && this.isDelimiter) {
|
if (isNewLine && this.isDelimiter) {
|
||||||
this.isDelimiter = false;
|
this.isDelimiter = false;
|
||||||
this.delimiter = this.parsedStr;
|
this.delimiter = this._buffer.trim();
|
||||||
this._buffer = Buffer.from([]);
|
this._buffer = '';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkQuote (char) {
|
checkQuote (char) {
|
||||||
const isQuote = !this.isEscape && (chars.QUOTE === char || chars.DOUBLE_QUOTE === char);
|
const isQuote = !this.isEscape && (char === '\'' || char === '"');
|
||||||
if (isQuote && this.currentQuote === char)
|
if (isQuote && this.currentQuote === char)
|
||||||
this.currentQuote = null;
|
this.currentQuote = null;
|
||||||
|
|
||||||
@ -81,18 +80,14 @@ export default class SqlParser extends Transform {
|
|||||||
let query = false;
|
let query = false;
|
||||||
let demiliterFound = false;
|
let demiliterFound = false;
|
||||||
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
|
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
|
||||||
demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter;
|
demiliterFound = this._last9Chars.slice(-this.delimiter.length) === this.delimiter;
|
||||||
|
|
||||||
if (demiliterFound) {
|
if (demiliterFound) {
|
||||||
const str = this.parsedStr;
|
const parsedStr = this._buffer.trim();
|
||||||
query = str.slice(0, str.length - this.delimiter.length);
|
query = parsedStr.slice(0, parsedStr.length - this.delimiter.length);
|
||||||
this._buffer = Buffer.from([]);
|
this._buffer = '';
|
||||||
}
|
}
|
||||||
|
|
||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
get parsedStr () {
|
|
||||||
return this._buffer.toString(this.encoding).trim();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -217,8 +217,10 @@ export default connections => {
|
|||||||
event.sender.send('export-progress', payload);
|
event.sender.send('export-progress', payload);
|
||||||
break;
|
break;
|
||||||
case 'end':
|
case 'end':
|
||||||
exporter.kill();
|
setTimeout(() => { // Ensures that writing process has finished
|
||||||
exporter = null;
|
exporter.kill();
|
||||||
|
exporter = null;
|
||||||
|
}, 2000);
|
||||||
resolve({ status: 'success', response: payload });
|
resolve({ status: 'success', response: payload });
|
||||||
break;
|
break;
|
||||||
case 'cancel':
|
case 'cancel':
|
||||||
|
@ -1,4 +1,10 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
const queryLogger = sql => {
|
||||||
|
// Remove comments, newlines and multiple spaces
|
||||||
|
const escapedSql = sql.replace(/(\/\*(.|[\r\n])*?\*\/)|(--(.*|[\r\n]))/gm, '').replace(/\s\s+/g, ' ');
|
||||||
|
console.log(escapedSql);
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* As Simple As Possible Query Builder Core
|
* As Simple As Possible Query Builder Core
|
||||||
*
|
*
|
||||||
@ -17,7 +23,7 @@ export class AntaresCore {
|
|||||||
this._poolSize = args.poolSize || false;
|
this._poolSize = args.poolSize || false;
|
||||||
this._connection = null;
|
this._connection = null;
|
||||||
this._ssh = null;
|
this._ssh = null;
|
||||||
this._logger = args.logger || console.log;
|
this._logger = args.logger || queryLogger;
|
||||||
|
|
||||||
this._queryDefaults = {
|
this._queryDefaults = {
|
||||||
schema: '',
|
schema: '',
|
||||||
|
@ -2,13 +2,6 @@
|
|||||||
import { MySQLClient } from './clients/MySQLClient';
|
import { MySQLClient } from './clients/MySQLClient';
|
||||||
import { PostgreSQLClient } from './clients/PostgreSQLClient';
|
import { PostgreSQLClient } from './clients/PostgreSQLClient';
|
||||||
import { SQLiteClient } from './clients/SQLiteClient';
|
import { SQLiteClient } from './clients/SQLiteClient';
|
||||||
|
|
||||||
const queryLogger = sql => {
|
|
||||||
// Remove comments, newlines and multiple spaces
|
|
||||||
const escapedSql = sql.replace(/(\/\*(.|[\r\n])*?\*\/)|(--(.*|[\r\n]))/gm, '').replace(/\s\s+/g, ' ');
|
|
||||||
console.log(escapedSql);
|
|
||||||
};
|
|
||||||
|
|
||||||
export class ClientsFactory {
|
export class ClientsFactory {
|
||||||
/**
|
/**
|
||||||
* Returns a database connection based on received args.
|
* Returns a database connection based on received args.
|
||||||
@ -30,8 +23,6 @@ export class ClientsFactory {
|
|||||||
* @memberof ClientsFactory
|
* @memberof ClientsFactory
|
||||||
*/
|
*/
|
||||||
static getConnection (args) {
|
static getConnection (args) {
|
||||||
args.logger = queryLogger;
|
|
||||||
|
|
||||||
switch (args.client) {
|
switch (args.client) {
|
||||||
case 'mysql':
|
case 'mysql':
|
||||||
case 'maria':
|
case 'maria':
|
||||||
|
@ -68,7 +68,6 @@ export class BaseExporter extends EventEmitter {
|
|||||||
const fileName = path.basename(this._options.outputFile);
|
const fileName = path.basename(this._options.outputFile);
|
||||||
this.emit('error', `The file ${fileName} is not accessible`);
|
this.emit('error', `The file ${fileName} is not accessible`);
|
||||||
}
|
}
|
||||||
|
|
||||||
this._outputStream.write(data);
|
this._outputStream.write(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,7 +7,8 @@ export class BaseImporter extends EventEmitter {
|
|||||||
this._options = options;
|
this._options = options;
|
||||||
this._isCancelled = false;
|
this._isCancelled = false;
|
||||||
this._fileHandler = fs.createReadStream(this._options.file, {
|
this._fileHandler = fs.createReadStream(this._options.file, {
|
||||||
flags: 'r'
|
flags: 'r',
|
||||||
|
highWaterMark: 4 * 1024
|
||||||
});
|
});
|
||||||
this._state = {};
|
this._state = {};
|
||||||
|
|
||||||
|
@ -6,9 +6,11 @@ export default class MysqlImporter extends BaseImporter {
|
|||||||
constructor (client, options) {
|
constructor (client, options) {
|
||||||
super(options);
|
super(options);
|
||||||
this._client = client;
|
this._client = client;
|
||||||
|
this._queries = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
async import () {
|
async import () {
|
||||||
|
console.time('import');
|
||||||
try {
|
try {
|
||||||
const { size: totalFileSize } = await fs.stat(this._options.file);
|
const { size: totalFileSize } = await fs.stat(this._options.file);
|
||||||
const parser = new SqlParser();
|
const parser = new SqlParser();
|
||||||
@ -31,21 +33,21 @@ export default class MysqlImporter extends BaseImporter {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
this._fileHandler.pipe(parser);
|
this._fileHandler.pipe(parser);
|
||||||
|
|
||||||
parser.on('error', (err) => {
|
parser.on('error', reject);
|
||||||
console.log('err', err);
|
|
||||||
reject(err);
|
|
||||||
});
|
|
||||||
|
|
||||||
parser.on('finish', () => {
|
parser.on('finish', () => {
|
||||||
console.log('TOTAL QUERIES', queryCount);
|
Promise.all(this._queries)
|
||||||
console.log('import end');
|
.then(() => {
|
||||||
resolve();
|
console.timeEnd('import');
|
||||||
|
console.log('TOTAL QUERIES', queryCount);
|
||||||
|
console.log('import end');
|
||||||
|
resolve();
|
||||||
|
})
|
||||||
|
.catch(reject);
|
||||||
});
|
});
|
||||||
|
|
||||||
parser.on('data', async (query) => {
|
parser.on('data', async query => {
|
||||||
parser.pause();
|
this._queries.push(this._client.raw(query, { split: false }));
|
||||||
await this._client.raw(query).catch(_ => false);
|
|
||||||
parser.resume();
|
|
||||||
queryCount++;
|
queryCount++;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -8,7 +8,8 @@ process.on('message', async ({ type, dbConfig, options }) => {
|
|||||||
const connection = await ClientsFactory.getConnection({
|
const connection = await ClientsFactory.getConnection({
|
||||||
client: options.type,
|
client: options.type,
|
||||||
params: dbConfig,
|
params: dbConfig,
|
||||||
poolSize: 1
|
poolSize: 1,
|
||||||
|
logger: () => null
|
||||||
});
|
});
|
||||||
await connection.connect();
|
await connection.connect();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user