fix(MySQL): export crash with large databases

This commit is contained in:
Giulio Ganci 2021-11-04 22:36:45 +01:00
parent 409ed54608
commit 8cf738bac8
2 changed files with 77 additions and 65 deletions

View File

@ -49,10 +49,9 @@ ${footer}
return `DROP TABLE IF EXISTS \`${tableName}\`;`;
}
async getTableInsert (tableName) {
async * getTableInsert (tableName) {
let rowCount = 0;
let sqlStr = '';
const pageSize = 1000;
const countResults = await this._client.raw(
`SELECT COUNT(1) as count FROM \`${this.schemaName}\`.\`${tableName}\``
@ -60,9 +59,9 @@ ${footer}
if (countResults.rows.length === 1) rowCount = countResults.rows[0].count;
if (rowCount > 0) {
const totalPages = Math.ceil(rowCount / pageSize);
let queryLength = 0;
let rowsWritten = 0;
let rowIndex = 0;
const { sqlInsertDivider, sqlInsertAfter } = this._options;
const columns = await this._client.getTableColumns({
table: tableName,
@ -76,69 +75,70 @@ ${footer}
sqlStr += `LOCK TABLES \`${tableName}\` WRITE;\n`;
sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` DISABLE KEYS */;`;
sqlStr += '\n\n';
yield sqlStr;
for (let pageNumber = 0; pageNumber < totalPages; pageNumber++) {
const tableResult = await this._client.raw(
`SELECT ${columnNames.join(', ')} FROM \`${
this.schemaName
}\`.\`${tableName}\`
LIMIT ${pageSize} OFFSET ${pageSize * pageNumber}`
);
yield insertStmt;
sqlStr += insertStmt;
const stream = await this._queryStream(
`SELECT ${columnNames.join(', ')} FROM \`${this.schemaName}\`.\`${tableName}\``
);
for (const rowIndex in tableResult.rows) {
const row = tableResult.rows[rowIndex];
let sqlInsertString = '';
if (
(sqlInsertDivider === 'bytes' &&
queryLength >= sqlInsertAfter * 1024) ||
(sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter)
) {
sqlInsertString += `;\n${insertStmt}\n\t(`;
queryLength = 0;
rowsWritten = 0;
}
else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t(';
else sqlInsertString += ',\n\t(';
for (const i in columns) {
const column = columns[i];
const val = row[column.name];
if (val === null) sqlInsertString += 'NULL';
else if (BIT.includes(column.type)) {
sqlInsertString += `b'${hexToBinary(
Buffer.from(val).toString('hex')
)}'`;
}
else if (BLOB.includes(column.type))
sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`;
else if (val === '') sqlInsertString += '\'\'';
else {
sqlInsertString +=
typeof val === 'string' ? this.escapeAndQuote(val) : val;
}
if (parseInt(i) !== columns.length - 1) sqlInsertString += ', ';
}
sqlInsertString += ')';
sqlStr += sqlInsertString;
queryLength += sqlInsertString.length;
rowsWritten++;
for await (const row of stream) {
if (this.isCancelled) {
stream.destroy();
yield null;
return;
}
sqlStr += ';\n\n';
let sqlInsertString = '';
if (
(sqlInsertDivider === 'bytes' && queryLength >= sqlInsertAfter * 1024) ||
(sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter)
) {
sqlInsertString += `;\n${insertStmt}\n\t(`;
queryLength = 0;
rowsWritten = 0;
}
else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t(';
else sqlInsertString += ',\n\t(';
for (const i in columns) {
const column = columns[i];
const val = row[column.name];
if (val === null) sqlInsertString += 'NULL';
else if (BIT.includes(column.type))
sqlInsertString += `b'${hexToBinary(Buffer.from(val).toString('hex'))}'`;
else if (BLOB.includes(column.type))
sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`;
else if (val === '') sqlInsertString += '\'\'';
else {
sqlInsertString += typeof val === 'string'
? this.escapeAndQuote(val)
: val;
}
if (parseInt(i) !== columns.length - 1)
sqlInsertString += ', ';
}
sqlInsertString += ')';
queryLength += sqlInsertString.length;
rowsWritten++;
rowIndex++;
yield sqlInsertString;
}
sqlStr = ';\n\n';
sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` ENABLE KEYS */;\n`;
sqlStr += 'UNLOCK TABLES;';
}
return sqlStr;
yield sqlStr;
}
}
async getViews () {
@ -301,6 +301,19 @@ ${footer}
return sqlString;
}
async _queryStream (sql) {
console.log(sql);
const isPool = typeof this._client._connection.getConnection === 'function';
const connection = isPool ? await this._client._connection.getConnection() : this._client._connection;
const stream = connection.connection.query(sql).stream();
const dispose = () => connection.destroy();
stream.on('end', dispose);
stream.on('error', dispose);
stream.on('close', dispose);
return stream;
}
getEscapedDefiner (definer) {
return definer
.split('@')

View File

@ -55,7 +55,7 @@ export class SqlExporter extends BaseExporter {
exportState.currentItemIndex++;
exportState.currentItem = item.table;
exportState.op = 'PROCESSING';
exportState.op = 'FETCH';
this.emitUpdate(exportState);
@ -78,13 +78,13 @@ export class SqlExporter extends BaseExporter {
}
if (item.includeContent) {
exportState.op = 'FETCH';
this.emitUpdate(exportState);
const tableInsertSyntax = await this.getTableInsert(item.table);
exportState.op = 'WRITE';
this.emitUpdate(exportState);
this.writeString(tableInsertSyntax);
for await (const sqlStr of this.getTableInsert(item.table)) {
if (this.isCancelled) return;
this.writeString(sqlStr);
}
this.writeString('\n\n');
}
@ -92,8 +92,7 @@ export class SqlExporter extends BaseExporter {
}
for (const item of extraItems) {
const processingMethod = `get${item.charAt(0).toUpperCase() +
item.slice(1)}`;
const processingMethod = `get${item.charAt(0).toUpperCase() + item.slice(1)}`;
exportState.currentItemIndex++;
exportState.currentItem = item;
exportState.op = 'PROCESSING';