refactor: db import ts refactor

This commit is contained in:
Fabio Di Stasio 2022-04-17 12:01:07 +02:00
parent 5dbc127b51
commit 75a41769bf
9 changed files with 92 additions and 29 deletions

View File

@ -2,7 +2,7 @@ import * as mysql from 'mysql2/promise';
import * as pg from 'pg';
import MysqlExporter from 'src/main/libs/exporters/sql/MysqlExporter';
import PostgreSQLExporter from 'src/main/libs/exporters/sql/PostgreSQLExporter';
import MySQLImporter from 'src/main/libs/importers/sql/MysqlImporter';
import MySQLImporter from 'src/main/libs/importers/sql/MySQLlImporter';
import PostgreSQLImporter from 'src/main/libs/importers/sql/PostgreSQLImporter';
import SSHConfig from 'ssh2-promise/lib/sshConfig';
import { MySQLClient } from '../../main/libs/clients/MySQLClient';

View File

@ -0,0 +1,16 @@
import * as antares from './antares';
export interface ImportOptions {
uid: string;
schema: string;
type: antares.ClientCode;
file: string;
}
export interface ImportState {
fileSize?: number;
readPosition?: number;
percentage?: number;
queryCount?: number;
op?: string;
}

View File

View File

@ -1,8 +1,14 @@
import fs from 'fs';
import EventEmitter from 'events';
import * as importer from 'common/interfaces/importer';
import * as fs from 'fs';
import * as EventEmitter from 'events';
export class BaseImporter extends EventEmitter {
constructor (options) {
protected _options;
protected _isCancelled;
protected _fileHandler;
protected _state;
constructor (options: importer.ImportOptions) {
super();
this._options = options;
this._isCancelled = false;
@ -43,7 +49,7 @@ export class BaseImporter extends EventEmitter {
this.emitUpdate({ op: 'cancelling' });
}
emitUpdate (state) {
emitUpdate (state: importer.ImportState) {
this.emit('progress', { ...this._state, ...state });
}

View File

@ -1,14 +1,18 @@
import fs from 'fs/promises';
import * as mysql from 'mysql2';
import * as importer from 'common/interfaces/importer';
import * as fs from 'fs/promises';
import MySQLParser from '../../parsers/MySQLParser';
import { BaseImporter } from '../BaseImporter';
export default class MySQLImporter extends BaseImporter {
constructor (client, options) {
protected _client: mysql.Pool
constructor (client: mysql.Pool, options: importer.ImportOptions) {
super(options);
this._client = client;
}
async import () {
async import (): Promise<void> {
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new MySQLParser();
@ -60,7 +64,8 @@ export default class MySQLImporter extends BaseImporter {
parser.on('pause', () => {
this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this._fileHandler as any).readableFlowing = false;
});
this._fileHandler.on('data', (chunk) => {

View File

@ -1,14 +1,18 @@
import * as pg from 'pg';
import * as importer from 'common/interfaces/importer';
import fs from 'fs/promises';
import PostgreSQLParser from '../../parsers/PostgreSQLParser';
import { BaseImporter } from '../BaseImporter';
export default class PostgreSQLImporter extends BaseImporter {
constructor (client, options) {
protected _client: pg.PoolClient;
constructor (client: pg.PoolClient, options: importer.ImportOptions) {
super(options);
this._client = client;
}
async import () {
async import (): Promise<void> {
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new PostgreSQLParser();
@ -60,7 +64,8 @@ export default class PostgreSQLImporter extends BaseImporter {
parser.on('pause', () => {
this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this._fileHandler as any).readableFlowing = false;
});
this._fileHandler.on('data', (chunk) => {

View File

@ -1,7 +1,17 @@
import { Transform } from 'stream';
import { Transform, TransformCallback, TransformOptions } from 'stream';
export default class MySQLParser extends Transform {
constructor (opts) {
private _buffer: string;
private _lastChar: string;
private _last9Chars: string;
encoding: BufferEncoding;
delimiter: string;
isEscape: boolean;
currentQuote: string;
isDelimiter: boolean;
constructor (opts?: TransformOptions & { delimiter: string }) {
opts = {
delimiter: ';',
encoding: 'utf8',
@ -21,7 +31,7 @@ export default class MySQLParser extends Transform {
this.isDelimiter = false;
}
_transform (chunk, encoding, next) {
_transform (chunk: Buffer, encoding: BufferEncoding, next: TransformCallback) {
for (const char of chunk.toString(this.encoding)) {
this.checkEscape();
this._buffer += char;
@ -49,7 +59,7 @@ export default class MySQLParser extends Transform {
}
}
checkNewDelimiter (char) {
checkNewDelimiter (char: string) {
if (this.currentQuote === null && this._last9Chars === 'delimiter') {
this.isDelimiter = true;
this._buffer = '';
@ -64,7 +74,7 @@ export default class MySQLParser extends Transform {
}
}
checkQuote (char) {
checkQuote (char: string) {
const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char)
this.currentQuote = null;
@ -77,7 +87,7 @@ export default class MySQLParser extends Transform {
if (this.isDelimiter)
return false;
let query = false;
let query: false | string = false;
let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
demiliterFound = this._last9Chars.slice(-this.delimiter.length) === this.delimiter;

View File

@ -1,7 +1,23 @@
import { Transform } from 'stream';
import { Transform, TransformCallback, TransformOptions } from 'stream';
export default class PostgreSQLParser extends Transform {
constructor (opts) {
private _buffer: string;
private _lastChar: string;
private _lastChars: string;
private _bodyWrapper: string;
private _bodyWrapperBuffer: string;
private _firstDollarFound: boolean;
private _isBody: boolean;
private _isSingleLineComment: boolean;
private _isMultiLineComment: boolean;
encoding: BufferEncoding;
delimiter: string;
isEscape: boolean;
currentQuote: string;
isDelimiter: boolean;
constructor (opts?: TransformOptions & { delimiter: string }) {
opts = {
delimiter: ';',
encoding: 'utf8',
@ -30,7 +46,7 @@ export default class PostgreSQLParser extends Transform {
return this._isSingleLineComment || this._isMultiLineComment;
}
_transform (chunk, encoding, next) {
_transform (chunk: Buffer, encoding: BufferEncoding, next: TransformCallback) {
for (const char of chunk.toString(this.encoding)) {
this.checkEscape();
this._buffer += char;
@ -82,7 +98,7 @@ export default class PostgreSQLParser extends Transform {
}
}
checkBodyWrapper (char) {
checkBodyWrapper (char: string) {
if (this._isBody)
this._isBody = this._lastChars !== this._bodyWrapper;
@ -111,7 +127,7 @@ export default class PostgreSQLParser extends Transform {
}
}
checkQuote (char) {
checkQuote (char: string) {
const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char)
this.currentQuote = null;
@ -124,7 +140,7 @@ export default class PostgreSQLParser extends Transform {
if (this._isBody || this._isComment)
return false;
let query = false;
let query: false | string = false;
let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)

View File

@ -1,7 +1,12 @@
import * as antares from 'common/interfaces/antares';
import * as pg from 'pg';
import * as mysql from 'mysql2';
import { MySQLClient } from '../libs/clients/MySQLClient';
import { PostgreSQLClient } from '../libs/clients/PostgreSQLClient';
import { ClientsFactory } from '../libs/ClientsFactory';
import MySQLImporter from '../libs/importers/sql/MysqlImporter';
import MySQLImporter from '../libs/importers/sql/MySQLlImporter';
import PostgreSQLImporter from '../libs/importers/sql/PostgreSQLImporter';
let importer;
let importer: antares.Importer;
process.on('message', async ({ type, dbConfig, options }) => {
if (type === 'init') {
@ -12,17 +17,17 @@ process.on('message', async ({ type, dbConfig, options }) => {
schema: options.schema
},
poolSize: 1
});
}) as MySQLClient | PostgreSQLClient;
const pool = await connection.getConnectionPool();
switch (options.type) {
case 'mysql':
case 'maria':
importer = new MySQLImporter(pool, options);
importer = new MySQLImporter(pool as unknown as mysql.Pool, options);
break;
case 'pg':
importer = new PostgreSQLImporter(pool, options);
importer = new PostgreSQLImporter(pool as unknown as pg.PoolClient, options);
break;
default:
process.send({