diff --git a/src/main/libs/importers/sql/MysqlImporter.js b/src/main/libs/importers/sql/MysqlImporter.js index d3b033d9..77ac529e 100644 --- a/src/main/libs/importers/sql/MysqlImporter.js +++ b/src/main/libs/importers/sql/MysqlImporter.js @@ -1,4 +1,5 @@ import fs from 'fs/promises'; +import { queue } from 'async'; import SqlParser from '../../../../common/libs/sqlParser'; import { BaseImporter } from '../BaseImporter'; @@ -6,7 +7,6 @@ export default class MysqlImporter extends BaseImporter { constructor (client, options) { super(options); this._client = client; - this._queries = []; } async import () { @@ -16,11 +16,17 @@ export default class MysqlImporter extends BaseImporter { const parser = new SqlParser(); let readPosition = 0; let queryCount = 0; + const q = queue(async (query) => await this._client.raw(query)); + + q.error((error, query) => { + this.emit('query-error', { sql: query, message: error.sqlMessage, sqlSnippet: error.sql, time: new Date().getTime() }); + }); this.emitUpdate({ fileSize: totalFileSize, readPosition: 0, - percentage: 0 + percentage: 0, + queryCount: 0 }); await this._client.use(this._options.schema); @@ -35,25 +41,22 @@ export default class MysqlImporter extends BaseImporter { parser.on('error', reject); - parser.on('finish', () => { - Promise.all(this._queries) - .then(() => { - console.timeEnd('import'); - console.log('TOTAL QUERIES', queryCount); - console.log('import end'); - resolve(); - }) - .catch(reject); + parser.on('finish', async () => { + console.log('TOTAL QUERIES', queryCount); + console.log('import end'); + await q.drain(); // not sure of this + resolve(); }); - parser.on('data', async query => { - this._queries.push(this._client.raw(query, { split: false })); + parser.on('data', async (query) => { + q.push(query); queryCount++; }); this._fileHandler.on('data', (chunk) => { readPosition += chunk.length; this.emitUpdate({ + queryCount, readPosition, percentage: readPosition / totalFileSize * 100 });