From 8cf738bac85698fddd0504eef7844279e8c11f44 Mon Sep 17 00:00:00 2001 From: Giulio Ganci Date: Thu, 4 Nov 2021 22:36:45 +0100 Subject: [PATCH] fix(MySQL): export crash with large databases --- src/main/libs/exporters/sql/MysqlExporter.js | 127 ++++++++++--------- src/main/libs/exporters/sql/SqlExporter.js | 15 +-- 2 files changed, 77 insertions(+), 65 deletions(-) diff --git a/src/main/libs/exporters/sql/MysqlExporter.js b/src/main/libs/exporters/sql/MysqlExporter.js index 421b1370..d9a14ba0 100644 --- a/src/main/libs/exporters/sql/MysqlExporter.js +++ b/src/main/libs/exporters/sql/MysqlExporter.js @@ -49,10 +49,9 @@ ${footer} return `DROP TABLE IF EXISTS \`${tableName}\`;`; } - async getTableInsert (tableName) { + async * getTableInsert (tableName) { let rowCount = 0; let sqlStr = ''; - const pageSize = 1000; const countResults = await this._client.raw( `SELECT COUNT(1) as count FROM \`${this.schemaName}\`.\`${tableName}\`` @@ -60,9 +59,9 @@ ${footer} if (countResults.rows.length === 1) rowCount = countResults.rows[0].count; if (rowCount > 0) { - const totalPages = Math.ceil(rowCount / pageSize); let queryLength = 0; let rowsWritten = 0; + let rowIndex = 0; const { sqlInsertDivider, sqlInsertAfter } = this._options; const columns = await this._client.getTableColumns({ table: tableName, @@ -76,69 +75,70 @@ ${footer} sqlStr += `LOCK TABLES \`${tableName}\` WRITE;\n`; sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` DISABLE KEYS */;`; sqlStr += '\n\n'; + yield sqlStr; - for (let pageNumber = 0; pageNumber < totalPages; pageNumber++) { - const tableResult = await this._client.raw( - `SELECT ${columnNames.join(', ')} FROM \`${ - this.schemaName - }\`.\`${tableName}\` - LIMIT ${pageSize} OFFSET ${pageSize * pageNumber}` - ); + yield insertStmt; - sqlStr += insertStmt; + const stream = await this._queryStream( + `SELECT ${columnNames.join(', ')} FROM \`${this.schemaName}\`.\`${tableName}\`` + ); - for (const rowIndex in tableResult.rows) { - const row = tableResult.rows[rowIndex]; - let sqlInsertString = ''; - - if ( - (sqlInsertDivider === 'bytes' && - queryLength >= sqlInsertAfter * 1024) || - (sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter) - ) { - sqlInsertString += `;\n${insertStmt}\n\t(`; - - queryLength = 0; - rowsWritten = 0; - } - else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t('; - else sqlInsertString += ',\n\t('; - - for (const i in columns) { - const column = columns[i]; - const val = row[column.name]; - - if (val === null) sqlInsertString += 'NULL'; - else if (BIT.includes(column.type)) { - sqlInsertString += `b'${hexToBinary( - Buffer.from(val).toString('hex') - )}'`; - } - else if (BLOB.includes(column.type)) - sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`; - else if (val === '') sqlInsertString += '\'\''; - else { - sqlInsertString += - typeof val === 'string' ? this.escapeAndQuote(val) : val; - } - - if (parseInt(i) !== columns.length - 1) sqlInsertString += ', '; - } - - sqlInsertString += ')'; - sqlStr += sqlInsertString; - - queryLength += sqlInsertString.length; - rowsWritten++; + for await (const row of stream) { + if (this.isCancelled) { + stream.destroy(); + yield null; + return; } - sqlStr += ';\n\n'; + + let sqlInsertString = ''; + + if ( + (sqlInsertDivider === 'bytes' && queryLength >= sqlInsertAfter * 1024) || + (sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter) + ) { + sqlInsertString += `;\n${insertStmt}\n\t(`; + queryLength = 0; + rowsWritten = 0; + } + else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t('; + else sqlInsertString += ',\n\t('; + + for (const i in columns) { + const column = columns[i]; + const val = row[column.name]; + + if (val === null) sqlInsertString += 'NULL'; + else if (BIT.includes(column.type)) + sqlInsertString += `b'${hexToBinary(Buffer.from(val).toString('hex'))}'`; + + else if (BLOB.includes(column.type)) + sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`; + + else if (val === '') sqlInsertString += '\'\''; + else { + sqlInsertString += typeof val === 'string' + ? this.escapeAndQuote(val) + : val; + } + + if (parseInt(i) !== columns.length - 1) + sqlInsertString += ', '; + } + + sqlInsertString += ')'; + + queryLength += sqlInsertString.length; + rowsWritten++; + rowIndex++; + yield sqlInsertString; } + sqlStr = ';\n\n'; sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` ENABLE KEYS */;\n`; sqlStr += 'UNLOCK TABLES;'; - } - return sqlStr; + yield sqlStr; + } } async getViews () { @@ -301,6 +301,19 @@ ${footer} return sqlString; } + async _queryStream (sql) { + console.log(sql); + const isPool = typeof this._client._connection.getConnection === 'function'; + const connection = isPool ? await this._client._connection.getConnection() : this._client._connection; + const stream = connection.connection.query(sql).stream(); + const dispose = () => connection.destroy(); + + stream.on('end', dispose); + stream.on('error', dispose); + stream.on('close', dispose); + return stream; + } + getEscapedDefiner (definer) { return definer .split('@') diff --git a/src/main/libs/exporters/sql/SqlExporter.js b/src/main/libs/exporters/sql/SqlExporter.js index e8aed7fd..0d11e000 100644 --- a/src/main/libs/exporters/sql/SqlExporter.js +++ b/src/main/libs/exporters/sql/SqlExporter.js @@ -55,7 +55,7 @@ export class SqlExporter extends BaseExporter { exportState.currentItemIndex++; exportState.currentItem = item.table; - exportState.op = 'PROCESSING'; + exportState.op = 'FETCH'; this.emitUpdate(exportState); @@ -78,13 +78,13 @@ export class SqlExporter extends BaseExporter { } if (item.includeContent) { - exportState.op = 'FETCH'; - this.emitUpdate(exportState); - const tableInsertSyntax = await this.getTableInsert(item.table); - exportState.op = 'WRITE'; this.emitUpdate(exportState); - this.writeString(tableInsertSyntax); + for await (const sqlStr of this.getTableInsert(item.table)) { + if (this.isCancelled) return; + this.writeString(sqlStr); + } + this.writeString('\n\n'); } @@ -92,8 +92,7 @@ export class SqlExporter extends BaseExporter { } for (const item of extraItems) { - const processingMethod = `get${item.charAt(0).toUpperCase() + - item.slice(1)}`; + const processingMethod = `get${item.charAt(0).toUpperCase() + item.slice(1)}`; exportState.currentItemIndex++; exportState.currentItem = item; exportState.op = 'PROCESSING';