antares/src/main/libs/importers/sql/MysqlImporter.js

76 lines
2.1 KiB
JavaScript
Raw Normal View History

2021-12-28 15:30:07 +01:00
import fs from 'fs/promises';
import { queue } from 'async';
2021-12-28 15:30:07 +01:00
import SqlParser from '../../../../common/libs/sqlParser';
import { BaseImporter } from '../BaseImporter';
export default class MysqlImporter extends BaseImporter {
constructor (client, options) {
super(options);
this._client = client;
2021-12-28 15:30:07 +01:00
}
async import () {
console.time('import');
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new SqlParser();
let readPosition = 0;
let queryCount = 0;
const q = queue(async (query) => await this._client.raw(query));
q.error((error, query) => {
this.emit('query-error', { sql: query, message: error.sqlMessage, sqlSnippet: error.sql, time: new Date().getTime() });
});
2021-12-28 15:30:07 +01:00
this.emitUpdate({
fileSize: totalFileSize,
readPosition: 0,
percentage: 0,
queryCount: 0
2021-12-28 15:30:07 +01:00
});
await this._client.use(this._options.schema);
// 1. detect file encoding
// 2. set fh encoding
// 3. detect sql mode
// 4. restore sql mode in case of exception
2021-12-28 15:30:07 +01:00
return new Promise((resolve, reject) => {
this._fileHandler.pipe(parser);
parser.on('error', reject);
parser.on('finish', async () => {
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
await q.drain(); // not sure of this
resolve();
});
parser.on('data', async (query) => {
q.push(query);
queryCount++;
2021-12-28 15:30:07 +01:00
});
this._fileHandler.on('data', (chunk) => {
readPosition += chunk.length;
this.emitUpdate({
queryCount,
readPosition,
percentage: readPosition / totalFileSize * 100
});
});
this._fileHandler.on('error', (err) => {
console.log(err);
reject(err);
});
2021-12-28 15:30:07 +01:00
});
}
catch (err) {
console.log(err);
}
2021-12-28 15:30:07 +01:00
}
}