refactor: db import ts refactor

This commit is contained in:
Fabio Di Stasio 2022-04-17 12:01:07 +02:00
parent ce0f278caf
commit 79f32ca442
9 changed files with 92 additions and 29 deletions

View File

@ -2,7 +2,7 @@ import * as mysql from 'mysql2/promise';
import * as pg from 'pg'; import * as pg from 'pg';
import MysqlExporter from 'src/main/libs/exporters/sql/MysqlExporter'; import MysqlExporter from 'src/main/libs/exporters/sql/MysqlExporter';
import PostgreSQLExporter from 'src/main/libs/exporters/sql/PostgreSQLExporter'; import PostgreSQLExporter from 'src/main/libs/exporters/sql/PostgreSQLExporter';
import MySQLImporter from 'src/main/libs/importers/sql/MysqlImporter'; import MySQLImporter from 'src/main/libs/importers/sql/MySQLlImporter';
import PostgreSQLImporter from 'src/main/libs/importers/sql/PostgreSQLImporter'; import PostgreSQLImporter from 'src/main/libs/importers/sql/PostgreSQLImporter';
import SSHConfig from 'ssh2-promise/lib/sshConfig'; import SSHConfig from 'ssh2-promise/lib/sshConfig';
import { MySQLClient } from '../../main/libs/clients/MySQLClient'; import { MySQLClient } from '../../main/libs/clients/MySQLClient';

View File

@ -0,0 +1,16 @@
import * as antares from './antares';
export interface ImportOptions {
uid: string;
schema: string;
type: antares.ClientCode;
file: string;
}
export interface ImportState {
fileSize?: number;
readPosition?: number;
percentage?: number;
queryCount?: number;
op?: string;
}

View File

View File

@ -1,8 +1,14 @@
import fs from 'fs'; import * as importer from 'common/interfaces/importer';
import EventEmitter from 'events'; import * as fs from 'fs';
import * as EventEmitter from 'events';
export class BaseImporter extends EventEmitter { export class BaseImporter extends EventEmitter {
constructor (options) { protected _options;
protected _isCancelled;
protected _fileHandler;
protected _state;
constructor (options: importer.ImportOptions) {
super(); super();
this._options = options; this._options = options;
this._isCancelled = false; this._isCancelled = false;
@ -43,7 +49,7 @@ export class BaseImporter extends EventEmitter {
this.emitUpdate({ op: 'cancelling' }); this.emitUpdate({ op: 'cancelling' });
} }
emitUpdate (state) { emitUpdate (state: importer.ImportState) {
this.emit('progress', { ...this._state, ...state }); this.emit('progress', { ...this._state, ...state });
} }

View File

@ -1,14 +1,18 @@
import fs from 'fs/promises'; import * as mysql from 'mysql2';
import * as importer from 'common/interfaces/importer';
import * as fs from 'fs/promises';
import MySQLParser from '../../parsers/MySQLParser'; import MySQLParser from '../../parsers/MySQLParser';
import { BaseImporter } from '../BaseImporter'; import { BaseImporter } from '../BaseImporter';
export default class MySQLImporter extends BaseImporter { export default class MySQLImporter extends BaseImporter {
constructor (client, options) { protected _client: mysql.Pool
constructor (client: mysql.Pool, options: importer.ImportOptions) {
super(options); super(options);
this._client = client; this._client = client;
} }
async import () { async import (): Promise<void> {
try { try {
const { size: totalFileSize } = await fs.stat(this._options.file); const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new MySQLParser(); const parser = new MySQLParser();
@ -60,7 +64,8 @@ export default class MySQLImporter extends BaseImporter {
parser.on('pause', () => { parser.on('pause', () => {
this._fileHandler.unpipe(parser); this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false; // eslint-disable-next-line @typescript-eslint/no-explicit-any
(this._fileHandler as any).readableFlowing = false;
}); });
this._fileHandler.on('data', (chunk) => { this._fileHandler.on('data', (chunk) => {

View File

@ -1,14 +1,18 @@
import * as pg from 'pg';
import * as importer from 'common/interfaces/importer';
import fs from 'fs/promises'; import fs from 'fs/promises';
import PostgreSQLParser from '../../parsers/PostgreSQLParser'; import PostgreSQLParser from '../../parsers/PostgreSQLParser';
import { BaseImporter } from '../BaseImporter'; import { BaseImporter } from '../BaseImporter';
export default class PostgreSQLImporter extends BaseImporter { export default class PostgreSQLImporter extends BaseImporter {
constructor (client, options) { protected _client: pg.PoolClient;
constructor (client: pg.PoolClient, options: importer.ImportOptions) {
super(options); super(options);
this._client = client; this._client = client;
} }
async import () { async import (): Promise<void> {
try { try {
const { size: totalFileSize } = await fs.stat(this._options.file); const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new PostgreSQLParser(); const parser = new PostgreSQLParser();
@ -60,7 +64,8 @@ export default class PostgreSQLImporter extends BaseImporter {
parser.on('pause', () => { parser.on('pause', () => {
this._fileHandler.unpipe(parser); this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false; // eslint-disable-next-line @typescript-eslint/no-explicit-any
(this._fileHandler as any).readableFlowing = false;
}); });
this._fileHandler.on('data', (chunk) => { this._fileHandler.on('data', (chunk) => {

View File

@ -1,7 +1,17 @@
import { Transform } from 'stream'; import { Transform, TransformCallback, TransformOptions } from 'stream';
export default class MySQLParser extends Transform { export default class MySQLParser extends Transform {
constructor (opts) { private _buffer: string;
private _lastChar: string;
private _last9Chars: string;
encoding: BufferEncoding;
delimiter: string;
isEscape: boolean;
currentQuote: string;
isDelimiter: boolean;
constructor (opts?: TransformOptions & { delimiter: string }) {
opts = { opts = {
delimiter: ';', delimiter: ';',
encoding: 'utf8', encoding: 'utf8',
@ -21,7 +31,7 @@ export default class MySQLParser extends Transform {
this.isDelimiter = false; this.isDelimiter = false;
} }
_transform (chunk, encoding, next) { _transform (chunk: Buffer, encoding: BufferEncoding, next: TransformCallback) {
for (const char of chunk.toString(this.encoding)) { for (const char of chunk.toString(this.encoding)) {
this.checkEscape(); this.checkEscape();
this._buffer += char; this._buffer += char;
@ -49,7 +59,7 @@ export default class MySQLParser extends Transform {
} }
} }
checkNewDelimiter (char) { checkNewDelimiter (char: string) {
if (this.currentQuote === null && this._last9Chars === 'delimiter') { if (this.currentQuote === null && this._last9Chars === 'delimiter') {
this.isDelimiter = true; this.isDelimiter = true;
this._buffer = ''; this._buffer = '';
@ -64,7 +74,7 @@ export default class MySQLParser extends Transform {
} }
} }
checkQuote (char) { checkQuote (char: string) {
const isQuote = !this.isEscape && (char === '\'' || char === '"'); const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char) if (isQuote && this.currentQuote === char)
this.currentQuote = null; this.currentQuote = null;
@ -77,7 +87,7 @@ export default class MySQLParser extends Transform {
if (this.isDelimiter) if (this.isDelimiter)
return false; return false;
let query = false; let query: false | string = false;
let demiliterFound = false; let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length) if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
demiliterFound = this._last9Chars.slice(-this.delimiter.length) === this.delimiter; demiliterFound = this._last9Chars.slice(-this.delimiter.length) === this.delimiter;

View File

@ -1,7 +1,23 @@
import { Transform } from 'stream'; import { Transform, TransformCallback, TransformOptions } from 'stream';
export default class PostgreSQLParser extends Transform { export default class PostgreSQLParser extends Transform {
constructor (opts) { private _buffer: string;
private _lastChar: string;
private _lastChars: string;
private _bodyWrapper: string;
private _bodyWrapperBuffer: string;
private _firstDollarFound: boolean;
private _isBody: boolean;
private _isSingleLineComment: boolean;
private _isMultiLineComment: boolean;
encoding: BufferEncoding;
delimiter: string;
isEscape: boolean;
currentQuote: string;
isDelimiter: boolean;
constructor (opts?: TransformOptions & { delimiter: string }) {
opts = { opts = {
delimiter: ';', delimiter: ';',
encoding: 'utf8', encoding: 'utf8',
@ -30,7 +46,7 @@ export default class PostgreSQLParser extends Transform {
return this._isSingleLineComment || this._isMultiLineComment; return this._isSingleLineComment || this._isMultiLineComment;
} }
_transform (chunk, encoding, next) { _transform (chunk: Buffer, encoding: BufferEncoding, next: TransformCallback) {
for (const char of chunk.toString(this.encoding)) { for (const char of chunk.toString(this.encoding)) {
this.checkEscape(); this.checkEscape();
this._buffer += char; this._buffer += char;
@ -82,7 +98,7 @@ export default class PostgreSQLParser extends Transform {
} }
} }
checkBodyWrapper (char) { checkBodyWrapper (char: string) {
if (this._isBody) if (this._isBody)
this._isBody = this._lastChars !== this._bodyWrapper; this._isBody = this._lastChars !== this._bodyWrapper;
@ -111,7 +127,7 @@ export default class PostgreSQLParser extends Transform {
} }
} }
checkQuote (char) { checkQuote (char: string) {
const isQuote = !this.isEscape && (char === '\'' || char === '"'); const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char) if (isQuote && this.currentQuote === char)
this.currentQuote = null; this.currentQuote = null;
@ -124,7 +140,7 @@ export default class PostgreSQLParser extends Transform {
if (this._isBody || this._isComment) if (this._isBody || this._isComment)
return false; return false;
let query = false; let query: false | string = false;
let demiliterFound = false; let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length) if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)

View File

@ -1,7 +1,12 @@
import * as antares from 'common/interfaces/antares';
import * as pg from 'pg';
import * as mysql from 'mysql2';
import { MySQLClient } from '../libs/clients/MySQLClient';
import { PostgreSQLClient } from '../libs/clients/PostgreSQLClient';
import { ClientsFactory } from '../libs/ClientsFactory'; import { ClientsFactory } from '../libs/ClientsFactory';
import MySQLImporter from '../libs/importers/sql/MysqlImporter'; import MySQLImporter from '../libs/importers/sql/MySQLlImporter';
import PostgreSQLImporter from '../libs/importers/sql/PostgreSQLImporter'; import PostgreSQLImporter from '../libs/importers/sql/PostgreSQLImporter';
let importer; let importer: antares.Importer;
process.on('message', async ({ type, dbConfig, options }) => { process.on('message', async ({ type, dbConfig, options }) => {
if (type === 'init') { if (type === 'init') {
@ -12,17 +17,17 @@ process.on('message', async ({ type, dbConfig, options }) => {
schema: options.schema schema: options.schema
}, },
poolSize: 1 poolSize: 1
}); }) as MySQLClient | PostgreSQLClient;
const pool = await connection.getConnectionPool(); const pool = await connection.getConnectionPool();
switch (options.type) { switch (options.type) {
case 'mysql': case 'mysql':
case 'maria': case 'maria':
importer = new MySQLImporter(pool, options); importer = new MySQLImporter(pool as unknown as mysql.Pool, options);
break; break;
case 'pg': case 'pg':
importer = new PostgreSQLImporter(pool, options); importer = new PostgreSQLImporter(pool as unknown as pg.PoolClient, options);
break; break;
default: default:
process.send({ process.send({