perf(MySQL): prevent memory leak on large dump import

This commit is contained in:
Fabio Di Stasio 2022-03-10 10:23:22 +01:00
parent 1c4d5b05b3
commit f3759b6541
5 changed files with 39 additions and 32 deletions

3
.gitignore vendored
View File

@ -5,4 +5,5 @@ node_modules
thumbs.db thumbs.db
NOTES.md NOTES.md
*.txt *.txt
package-lock.json package-lock.json
*.heapsnapshot

View File

@ -110,7 +110,6 @@
"@turf/helpers": "^6.5.0", "@turf/helpers": "^6.5.0",
"@vscode/vscode-languagedetection": "^1.0.21", "@vscode/vscode-languagedetection": "^1.0.21",
"ace-builds": "^1.4.13", "ace-builds": "^1.4.13",
"async": "^3.2.3",
"better-sqlite3": "^7.4.4", "better-sqlite3": "^7.4.4",
"electron-log": "^4.4.1", "electron-log": "^4.4.1",
"electron-store": "^8.0.1", "electron-store": "^8.0.1",

View File

@ -292,8 +292,10 @@ export default connections => {
event.sender.send('query-error', payload); event.sender.send('query-error', payload);
break; break;
case 'end': case 'end':
importer.kill(); setTimeout(() => { // Ensures that writing process has finished
importer = null; importer?.kill();
importer = null;
}, 2000);
resolve({ status: 'success', response: payload }); resolve({ status: 'success', response: payload });
break; break;
case 'cancel': case 'cancel':
@ -308,11 +310,6 @@ export default connections => {
break; break;
} }
}); });
importer.on('exit', code => {
importer = null;
resolve({ status: 'error', response: `Operation ended with code: ${code}` });
});
})(); })();
}); });
}); });

View File

@ -1,5 +1,4 @@
import fs from 'fs/promises'; import fs from 'fs/promises';
import { queue } from 'async';
import SqlParser from '../../../../common/libs/sqlParser'; import SqlParser from '../../../../common/libs/sqlParser';
import { BaseImporter } from '../BaseImporter'; import { BaseImporter } from '../BaseImporter';
@ -10,17 +9,11 @@ export default class MysqlImporter extends BaseImporter {
} }
async import () { async import () {
console.time('import');
try { try {
const { size: totalFileSize } = await fs.stat(this._options.file); const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new SqlParser(); const parser = new SqlParser();
let readPosition = 0; let readPosition = 0;
let queryCount = 0; let queryCount = 0;
const q = queue(async (query) => await this._client.raw(query));
q.error((error, query) => {
this.emit('query-error', { sql: query, message: error.sqlMessage, sqlSnippet: error.sql, time: new Date().getTime() });
});
this.emitUpdate({ this.emitUpdate({
fileSize: totalFileSize, fileSize: totalFileSize,
@ -29,8 +22,6 @@ export default class MysqlImporter extends BaseImporter {
queryCount: 0 queryCount: 0
}); });
await this._client.use(this._options.schema);
// 1. detect file encoding // 1. detect file encoding
// 2. set fh encoding // 2. set fh encoding
// 3. detect sql mode // 3. detect sql mode
@ -41,25 +32,44 @@ export default class MysqlImporter extends BaseImporter {
parser.on('error', reject); parser.on('error', reject);
parser.on('finish', async () => { parser.on('close', async () => {
console.log('TOTAL QUERIES', queryCount); console.log('TOTAL QUERIES', queryCount);
console.log('import end'); console.log('import end');
await q.drain(); // not sure of this
resolve(); resolve();
}); });
parser.on('data', async (query) => { parser.on('data', async (query) => {
q.push(query);
queryCount++; queryCount++;
}); parser.pause();
try {
await this._client.query(query);
}
catch (error) {
this.emit('query-error', {
sql: query,
message: error.sqlMessage,
sqlSnippet: error.sql,
time: new Date().getTime()
});
}
this._fileHandler.on('data', (chunk) => {
readPosition += chunk.length;
this.emitUpdate({ this.emitUpdate({
queryCount, queryCount,
readPosition, readPosition,
percentage: readPosition / totalFileSize * 100 percentage: readPosition / totalFileSize * 100
}); });
this._fileHandler.pipe(parser);
parser.resume();
});
parser.on('pause', () => {
this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false;
});
this._fileHandler.on('data', (chunk) => {
readPosition += chunk.length;
}); });
this._fileHandler.on('error', (err) => { this._fileHandler.on('error', (err) => {

View File

@ -1,5 +1,4 @@
import { ClientsFactory } from '../libs/ClientsFactory'; import { ClientsFactory } from '../libs/ClientsFactory';
import fs from 'fs';
import MysqlImporter from '../libs/importers/sql/MysqlImporter'; import MysqlImporter from '../libs/importers/sql/MysqlImporter';
let importer; let importer;
@ -7,17 +6,20 @@ process.on('message', async ({ type, dbConfig, options }) => {
if (type === 'init') { if (type === 'init') {
const connection = await ClientsFactory.getConnection({ const connection = await ClientsFactory.getConnection({
client: options.type, client: options.type,
params: dbConfig, params: {
poolSize: 1, ...dbConfig,
logger: () => null schema: options.schema
},
poolSize: 1
}); });
await connection.connect();
const pool = await connection.getConnectionPool();
// TODO: importer factory class // TODO: importer factory class
switch (options.type) { switch (options.type) {
case 'mysql': case 'mysql':
case 'maria': case 'maria':
importer = new MysqlImporter(connection, options); importer = new MysqlImporter(pool, options);
break; break;
default: default:
process.send({ process.send({
@ -40,11 +42,9 @@ process.on('message', async ({ type, dbConfig, options }) => {
type: 'end', type: 'end',
payload: { cancelled: importer.isCancelled } payload: { cancelled: importer.isCancelled }
}); });
connection.destroy();
}); });
importer.once('cancel', () => { importer.once('cancel', () => {
fs.unlinkSync(importer.outputFile);
process.send({ type: 'cancel' }); process.send({ type: 'cancel' });
}); });