feat(PostgreSQL): sql dump importer

This commit is contained in:
Fabio Di Stasio 2022-04-07 12:49:34 +02:00
parent 0f9c991f53
commit 6086ca4a80
6 changed files with 238 additions and 11 deletions

View File

@ -6,6 +6,12 @@ import moment from 'moment';
import { lineString, point, polygon } from '@turf/helpers';
export default class MysqlExporter extends SqlExporter {
constructor (...args) {
super(...args);
this._commentChar = '#';
}
async getSqlHeader () {
let dump = await super.getSqlHeader();
dump += `

View File

@ -1,8 +1,8 @@
import fs from 'fs/promises';
import SqlParser from '../../../../common/libs/sqlParser';
import MySQLParser from '../../parsers/MySQLParser';
import { BaseImporter } from '../BaseImporter';
export default class MysqlImporter extends BaseImporter {
export default class MySQLImporter extends BaseImporter {
constructor (client, options) {
super(options);
this._client = client;
@ -11,7 +11,7 @@ export default class MysqlImporter extends BaseImporter {
async import () {
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new SqlParser();
const parser = new MySQLParser();
let readPosition = 0;
let queryCount = 0;
@ -22,11 +22,6 @@ export default class MysqlImporter extends BaseImporter {
queryCount: 0
});
// 1. detect file encoding
// 2. set fh encoding
// 3. detect sql mode
// 4. restore sql mode in case of exception
return new Promise((resolve, reject) => {
this._fileHandler.pipe(parser);

View File

@ -0,0 +1,80 @@
import fs from 'fs/promises';
import PostgreSQLParser from '../../parsers/PostgreSQLParser';
import { BaseImporter } from '../BaseImporter';
export default class PostgreSQLImporter extends BaseImporter {
constructor (client, options) {
super(options);
this._client = client;
}
async import () {
try {
const { size: totalFileSize } = await fs.stat(this._options.file);
const parser = new PostgreSQLParser();
let readPosition = 0;
let queryCount = 0;
this.emitUpdate({
fileSize: totalFileSize,
readPosition: 0,
percentage: 0,
queryCount: 0
});
return new Promise((resolve, reject) => {
this._fileHandler.pipe(parser);
parser.on('error', reject);
parser.on('close', async () => {
console.log('TOTAL QUERIES', queryCount);
console.log('import end');
resolve();
});
parser.on('data', async (query) => {
queryCount++;
parser.pause();
try {
await this._client.query(query);
}
catch (error) {
this.emit('query-error', {
sql: query,
message: error.hint || error.toString(),
sqlSnippet: error.sql,
time: new Date().getTime()
});
}
this.emitUpdate({
queryCount,
readPosition,
percentage: readPosition / totalFileSize * 100
});
this._fileHandler.pipe(parser);
parser.resume();
});
parser.on('pause', () => {
this._fileHandler.unpipe(parser);
this._fileHandler.readableFlowing = false;
});
this._fileHandler.on('data', (chunk) => {
readPosition += chunk.length;
});
this._fileHandler.on('error', (err) => {
console.log(err);
reject(err);
});
});
}
catch (err) {
console.log(err);
}
}
}

View File

@ -1,6 +1,6 @@
import { Transform } from 'stream';
export default class SqlParser extends Transform {
export default class MySQLParser extends Transform {
constructor (opts) {
opts = {
delimiter: ';',

View File

@ -0,0 +1,142 @@
import { Transform } from 'stream';
export default class PostgreSQLParser extends Transform {
constructor (opts) {
opts = {
delimiter: ';',
encoding: 'utf8',
writableObjectMode: true,
readableObjectMode: true,
...opts
};
super(opts);
this._buffer = '';
this._lastChar = '';
this._lastChars = '';
this.encoding = opts.encoding;
this.delimiter = opts.delimiter;// ';'
this._bodyWrapper = '';
this._bodyWrapperBuffer = '';
this.isEscape = false;
this.currentQuote = null;
this._firstDollarFound = false;
this._isBody = false;
this._isSingleLineComment = false;
this._isMultiLineComment = false;
}
get _isComment () {
return this._isSingleLineComment || this._isMultiLineComment;
}
_transform (chunk, encoding, next) {
for (const char of chunk.toString(this.encoding)) {
this.checkEscape();
this._buffer += char;
this._lastChar = char;
this._lastChars += char;
if (this._lastChars.length > this._bodyWrapper.length)
this._lastChars = this._lastChars.slice(-(this._bodyWrapper.length || 2));
this.checkBodyWrapper(char);
this.checkQuote(char);
this.checkCommentRow();
const query = this.getQuery();
if (query)
this.push(query);
}
next();
}
checkEscape () {
if (this._buffer.length > 0) {
this.isEscape = this._lastChar === '\\'
? !this.isEscape
: false;
}
}
checkCommentRow () {
if (this._isBody) return;
if (!this._isComment) {
if (this.currentQuote === null && this._lastChars.includes('--'))
this._isSingleLineComment = true;
if (this.currentQuote === null && this._lastChars.includes('/*'))
this._isMultiLineComment = true;
}
else {
if (this._isSingleLineComment && (this._lastChar === '\n' || this._lastChar === '\r')) {
this._buffer = '';
this._isSingleLineComment = false;
}
if (this._isMultiLineComment && this._lastChars.includes('*/')) {
this._buffer = '';
this._isMultiLineComment = false;
}
}
}
checkBodyWrapper (char) {
if (this._isBody)
this._isBody = this._lastChars !== this._bodyWrapper;
if (this.currentQuote === null && char === '$' && !this._firstDollarFound && !this._bodyWrapper) {
this._firstDollarFound = true;
this._bodyWrapperBuffer += char;
this._isBody = true;
}
else if (this._firstDollarFound) {
if (char === '\n' || char === ' ') {
this._firstDollarFound = false;
this._bodyWrapperBuffer = '';
this._bodyWrapper = '';
this._isBody = false;
return;
}
this._bodyWrapperBuffer += char;
const isEndDollar = char === '$';
if (isEndDollar) {
this._firstDollarFound = false;
this._bodyWrapper = this._bodyWrapperBuffer;
this._bodyWrapperBuffer = '';
}
}
}
checkQuote (char) {
const isQuote = !this.isEscape && (char === '\'' || char === '"');
if (isQuote && this.currentQuote === char)
this.currentQuote = null;
else if (isQuote && this.currentQuote === null)
this.currentQuote = char;
}
getQuery () {
if (this._isBody || this._isComment)
return false;
let query = false;
let demiliterFound = false;
if (this.currentQuote === null && this._buffer.length >= this.delimiter.length)
demiliterFound = this._lastChars.slice(-this.delimiter.length) === this.delimiter;
if (demiliterFound) {
const parsedStr = this._buffer.trim();
query = parsedStr;
this._buffer = '';
this._bodyWrapper = '';
}
return query;
}
}

View File

@ -1,5 +1,6 @@
import { ClientsFactory } from '../libs/ClientsFactory';
import MysqlImporter from '../libs/importers/sql/MysqlImporter';
import MySQLImporter from '../libs/importers/sql/MysqlImporter';
import PostgreSQLImporter from '../libs/importers/sql/PostgreSQLImporter';
let importer;
process.on('message', async ({ type, dbConfig, options }) => {
@ -18,7 +19,10 @@ process.on('message', async ({ type, dbConfig, options }) => {
switch (options.type) {
case 'mysql':
case 'maria':
importer = new MysqlImporter(pool, options);
importer = new MySQLImporter(pool, options);
break;
case 'pg':
importer = new PostgreSQLImporter(pool, options);
break;
default:
process.send({