mirror of https://github.com/Fabio286/antares.git
fix: sql parser hangs during import
This commit is contained in:
parent
251795e2d2
commit
7a6bd8bdbd
|
@ -1,4 +1,4 @@
|
||||||
import { Duplex } from 'stream';
|
import { Transform } from 'stream';
|
||||||
|
|
||||||
const chars = {
|
const chars = {
|
||||||
NEWLINE: 0x0A,
|
NEWLINE: 0x0A,
|
||||||
|
@ -8,7 +8,7 @@ const chars = {
|
||||||
BACKSLASH: 0x5C
|
BACKSLASH: 0x5C
|
||||||
};
|
};
|
||||||
|
|
||||||
export default class SqlParser extends Duplex {
|
export default class SqlParser extends Transform {
|
||||||
constructor (opts) {
|
constructor (opts) {
|
||||||
opts = {
|
opts = {
|
||||||
delimiter: ';',
|
delimiter: ';',
|
||||||
|
@ -27,7 +27,7 @@ export default class SqlParser extends Duplex {
|
||||||
this.isDelimiter = false;
|
this.isDelimiter = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
_write (chunk, encoding, next) {
|
_transform (chunk, encoding, next) {
|
||||||
for (const char of chunk) {
|
for (const char of chunk) {
|
||||||
this.checkEscape();
|
this.checkEscape();
|
||||||
this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]);
|
this._buffer = Buffer.concat([this._buffer, Buffer.from([char])]);
|
||||||
|
@ -66,7 +66,7 @@ export default class SqlParser extends Duplex {
|
||||||
}
|
}
|
||||||
|
|
||||||
checkQuote (char) {
|
checkQuote (char) {
|
||||||
const isQuote = !this.isEscape && [chars.QUOTE, chars.DOUBLE_QUOTE].includes(char);
|
const isQuote = !this.isEscape && (chars.QUOTE === char || chars.DOUBLE_QUOTE === char);
|
||||||
if (isQuote && this.currentQuote === char)
|
if (isQuote && this.currentQuote === char)
|
||||||
this.currentQuote = null;
|
this.currentQuote = null;
|
||||||
|
|
||||||
|
@ -84,8 +84,8 @@ export default class SqlParser extends Duplex {
|
||||||
demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter;
|
demiliterFound = this._buffer.slice(-this.delimiter.length).toString(this.encoding) === this.delimiter;
|
||||||
|
|
||||||
if (demiliterFound) {
|
if (demiliterFound) {
|
||||||
this._buffer = this._buffer.slice(0, this._buffer.length - 1);
|
const str = this.parsedStr;
|
||||||
query = this.parsedStr;
|
query = str.slice(0, str.length - this.delimiter.length);
|
||||||
this._buffer = Buffer.from([]);
|
this._buffer = Buffer.from([]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,8 +95,4 @@ export default class SqlParser extends Duplex {
|
||||||
get parsedStr () {
|
get parsedStr () {
|
||||||
return this._buffer.toString(this.encoding).trim();
|
return this._buffer.toString(this.encoding).trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
_read (size) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ export default class MysqlImporter extends BaseImporter {
|
||||||
this._fileHandler.pipe(parser);
|
this._fileHandler.pipe(parser);
|
||||||
|
|
||||||
parser.on('error', (err) => {
|
parser.on('error', (err) => {
|
||||||
console.log(err);
|
console.log('err', err);
|
||||||
reject(err);
|
reject(err);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -43,9 +43,8 @@ export default class MysqlImporter extends BaseImporter {
|
||||||
});
|
});
|
||||||
|
|
||||||
parser.on('data', async (query) => {
|
parser.on('data', async (query) => {
|
||||||
console.log('query: ', query);
|
|
||||||
parser.pause();
|
parser.pause();
|
||||||
await this._client.raw(query);
|
await this._client.raw(query).catch(_ => false);
|
||||||
parser.resume();
|
parser.resume();
|
||||||
queryCount++;
|
queryCount++;
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue