diff --git a/.gitignore b/.gitignore index 60bac061..6294b57c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ node_modules thumbs.db NOTES.md *.txt -package-lock.json \ No newline at end of file +package-lock.json +*.heapsnapshot \ No newline at end of file diff --git a/package.json b/package.json index 1631b72e..ef73fb35 100644 --- a/package.json +++ b/package.json @@ -110,7 +110,6 @@ "@turf/helpers": "^6.5.0", "@vscode/vscode-languagedetection": "^1.0.21", "ace-builds": "^1.4.13", - "async": "^3.2.3", "better-sqlite3": "^7.4.4", "electron-log": "^4.4.1", "electron-store": "^8.0.1", diff --git a/src/main/ipc-handlers/schema.js b/src/main/ipc-handlers/schema.js index bcb69094..b8149be2 100644 --- a/src/main/ipc-handlers/schema.js +++ b/src/main/ipc-handlers/schema.js @@ -292,8 +292,10 @@ export default connections => { event.sender.send('query-error', payload); break; case 'end': - importer.kill(); - importer = null; + setTimeout(() => { // Ensures that writing process has finished + importer?.kill(); + importer = null; + }, 2000); resolve({ status: 'success', response: payload }); break; case 'cancel': @@ -308,11 +310,6 @@ export default connections => { break; } }); - - importer.on('exit', code => { - importer = null; - resolve({ status: 'error', response: `Operation ended with code: ${code}` }); - }); })(); }); }); diff --git a/src/main/libs/importers/sql/MysqlImporter.js b/src/main/libs/importers/sql/MysqlImporter.js index 77ac529e..05c39941 100644 --- a/src/main/libs/importers/sql/MysqlImporter.js +++ b/src/main/libs/importers/sql/MysqlImporter.js @@ -1,5 +1,4 @@ import fs from 'fs/promises'; -import { queue } from 'async'; import SqlParser from '../../../../common/libs/sqlParser'; import { BaseImporter } from '../BaseImporter'; @@ -10,17 +9,11 @@ export default class MysqlImporter extends BaseImporter { } async import () { - console.time('import'); try { const { size: totalFileSize } = await fs.stat(this._options.file); const parser = new SqlParser(); let readPosition = 0; let queryCount = 0; - const q = queue(async (query) => await this._client.raw(query)); - - q.error((error, query) => { - this.emit('query-error', { sql: query, message: error.sqlMessage, sqlSnippet: error.sql, time: new Date().getTime() }); - }); this.emitUpdate({ fileSize: totalFileSize, @@ -29,8 +22,6 @@ export default class MysqlImporter extends BaseImporter { queryCount: 0 }); - await this._client.use(this._options.schema); - // 1. detect file encoding // 2. set fh encoding // 3. detect sql mode @@ -41,25 +32,44 @@ export default class MysqlImporter extends BaseImporter { parser.on('error', reject); - parser.on('finish', async () => { + parser.on('close', async () => { console.log('TOTAL QUERIES', queryCount); console.log('import end'); - await q.drain(); // not sure of this resolve(); }); parser.on('data', async (query) => { - q.push(query); queryCount++; - }); + parser.pause(); + + try { + await this._client.query(query); + } + catch (error) { + this.emit('query-error', { + sql: query, + message: error.sqlMessage, + sqlSnippet: error.sql, + time: new Date().getTime() + }); + } - this._fileHandler.on('data', (chunk) => { - readPosition += chunk.length; this.emitUpdate({ queryCount, readPosition, percentage: readPosition / totalFileSize * 100 }); + this._fileHandler.pipe(parser); + parser.resume(); + }); + + parser.on('pause', () => { + this._fileHandler.unpipe(parser); + this._fileHandler.readableFlowing = false; + }); + + this._fileHandler.on('data', (chunk) => { + readPosition += chunk.length; }); this._fileHandler.on('error', (err) => { diff --git a/src/main/workers/importer.js b/src/main/workers/importer.js index 840016d2..8b10f853 100644 --- a/src/main/workers/importer.js +++ b/src/main/workers/importer.js @@ -1,5 +1,4 @@ import { ClientsFactory } from '../libs/ClientsFactory'; -import fs from 'fs'; import MysqlImporter from '../libs/importers/sql/MysqlImporter'; let importer; @@ -7,17 +6,20 @@ process.on('message', async ({ type, dbConfig, options }) => { if (type === 'init') { const connection = await ClientsFactory.getConnection({ client: options.type, - params: dbConfig, - poolSize: 1, - logger: () => null + params: { + ...dbConfig, + schema: options.schema + }, + poolSize: 1 }); - await connection.connect(); + + const pool = await connection.getConnectionPool(); // TODO: importer factory class switch (options.type) { case 'mysql': case 'maria': - importer = new MysqlImporter(connection, options); + importer = new MysqlImporter(pool, options); break; default: process.send({ @@ -40,11 +42,9 @@ process.on('message', async ({ type, dbConfig, options }) => { type: 'end', payload: { cancelled: importer.isCancelled } }); - connection.destroy(); }); importer.once('cancel', () => { - fs.unlinkSync(importer.outputFile); process.send({ type: 'cancel' }); });