1
1
mirror of https://github.com/Fabio286/antares.git synced 2025-02-08 07:48:40 +01:00

perf(MySQL): import tasks managed with async queue

This commit is contained in:
Giulio Ganci 2022-03-05 18:54:08 +01:00
parent f444746f46
commit bbe13f27dc

View File

@ -1,4 +1,5 @@
import fs from 'fs/promises';
import { queue } from 'async';
import SqlParser from '../../../../common/libs/sqlParser';
import { BaseImporter } from '../BaseImporter';
@ -6,7 +7,6 @@ export default class MysqlImporter extends BaseImporter {
constructor (client, options) {
super(options);
this._client = client;
this._queries = [];
}
async import () {
@ -16,11 +16,17 @@ export default class MysqlImporter extends BaseImporter {
const parser = new SqlParser();
let readPosition = 0;
let queryCount = 0;
const q = queue(async (query) => await this._client.raw(query));
q.error((error, query) => {
this.emit('query-error', { sql: query, message: error.sqlMessage, sqlSnippet: error.sql, time: new Date().getTime() });
});
this.emitUpdate({
fileSize: totalFileSize,
readPosition: 0,
percentage: 0
percentage: 0,
queryCount: 0
});
await this._client.use(this._options.schema);
@ -35,25 +41,22 @@ export default class MysqlImporter extends BaseImporter {
parser.on('error', reject);
parser.on('finish', () => {
Promise.all(this._queries)
.then(() => {
console.timeEnd('import');
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
resolve();
})
.catch(reject);
parser.on('finish', async () => {
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
await q.drain(); // not sure of this
resolve();
});
parser.on('data', async query => {
this._queries.push(this._client.raw(query, { split: false }));
parser.on('data', async (query) => {
q.push(query);
queryCount++;
});
this._fileHandler.on('data', (chunk) => {
readPosition += chunk.length;
this.emitUpdate({
queryCount,
readPosition,
percentage: readPosition / totalFileSize * 100
});