feat(PostgreSQL): manual commit mode

This commit is contained in:
Fabio Di Stasio 2022-02-14 18:00:26 +01:00
parent 5bfff649e9
commit d81e0911ab
2 changed files with 99 additions and 25 deletions

View File

@ -104,6 +104,7 @@ export class MySQLClient extends AntaresCore {
/** /**
* *
* @returns dbConfig * @returns dbConfig
* @memberof MySQLClient
*/ */
async getDbConfig () { async getDbConfig () {
delete this._params.application_name; delete this._params.application_name;
@ -159,7 +160,15 @@ export class MySQLClient extends AntaresCore {
async getConnection () { async getConnection () {
const dbConfig = await this.getDbConfig(); const dbConfig = await this.getDbConfig();
const connection = await mysql.createConnection(dbConfig); const connection = await mysql.createConnection({
...dbConfig,
typeCast: (field, next) => {
if (field.type === 'DATETIME')
return field.string();
else
return next();
}
});
// ANSI_QUOTES check // ANSI_QUOTES check
const [res] = await connection.query('SHOW GLOBAL VARIABLES LIKE \'%sql_mode%\''); const [res] = await connection.query('SHOW GLOBAL VARIABLES LIKE \'%sql_mode%\'');
@ -196,12 +205,12 @@ export class MySQLClient extends AntaresCore {
if (hasAnsiQuotes) if (hasAnsiQuotes)
await connection.query(`SET SESSION sql_mode = "${sqlMode.filter(m => m !== 'ANSI_QUOTES').join(',')}"`); await connection.query(`SET SESSION sql_mode = "${sqlMode.filter(m => m !== 'ANSI_QUOTES').join(',')}"`);
connection.on('connection', connection => { connection.on('connection', conn => {
if (this._params.readonly) if (this._params.readonly)
connection.query('SET SESSION TRANSACTION READ ONLY'); conn.query('SET SESSION TRANSACTION READ ONLY');
if (hasAnsiQuotes) if (hasAnsiQuotes)
connection.query(`SET SESSION sql_mode = "${sqlMode.filter(m => m !== 'ANSI_QUOTES').join(',')}"`); conn.query(`SET SESSION sql_mode = "${sqlMode.filter(m => m !== 'ANSI_QUOTES').join(',')}"`);
}); });
return connection; return connection;

View File

@ -22,6 +22,7 @@ export class PostgreSQLClient extends AntaresCore {
this._schema = null; this._schema = null;
this._runningConnections = new Map(); this._runningConnections = new Map();
this._connectionsToCommit = new Map();
this.types = {}; this.types = {};
for (const key in types.builtins) for (const key in types.builtins)
@ -71,9 +72,11 @@ export class PostgreSQLClient extends AntaresCore {
} }
/** /**
*
* @returns dbConfig
* @memberof PostgreSQLClient * @memberof PostgreSQLClient
*/ */
async connect () { async getDbConfig () {
const dbConfig = { const dbConfig = {
host: this._params.host, host: this._params.host,
port: this._params.port, port: this._params.port,
@ -102,24 +105,43 @@ export class PostgreSQLClient extends AntaresCore {
} }
} }
if (!this._poolSize) { return dbConfig;
}
/**
* @memberof PostgreSQLClient
*/
async connect () {
if (!this._poolSize)
this._connection = await this.getConnection();
else
this._connection = await this.getConnectionPool();
}
async getConnection () {
const dbConfig = await this.getDbConfig();
const client = new Client(dbConfig); const client = new Client(dbConfig);
await client.connect(); await client.connect();
this._connection = client; const connection = client;
if (this._params.readonly) if (this._params.readonly)
await this.raw('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY'); await connection.query('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY');
return connection;
} }
else {
async getConnectionPool () {
const dbConfig = await this.getDbConfig();
const pool = new Pool({ ...dbConfig, max: this._poolSize }); const pool = new Pool({ ...dbConfig, max: this._poolSize });
this._connection = pool; const connection = pool;
if (this._params.readonly) { if (this._params.readonly) {
this._connection.on('connect', connection => { connection.on('connect', conn => {
connection.query('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY'); conn.query('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY');
}); });
} }
}
return connection;
} }
/** /**
@ -1118,6 +1140,36 @@ export class PostgreSQLClient extends AntaresCore {
return await this.raw(`SELECT pg_cancel_backend(${id})`); return await this.raw(`SELECT pg_cancel_backend(${id})`);
} }
/**
*
* @param {string} tabUid
* @returns {Promise<null>}
*/
async commitTab (tabUid) {
const connection = this._connectionsToCommit.get(tabUid);
if (connection)
return await connection.query('COMMIT');
}
/**
*
* @param {string} tabUid
* @returns {Promise<null>}
*/
async rollbackTab (tabUid) {
const connection = this._connectionsToCommit.get(tabUid);
if (connection)
return await connection.query('ROLLBACK');
}
destroyConnectionToCommit (tabUid) {
const connection = this._connectionsToCommit.get(tabUid);
if (connection) {
connection.destroy();
this._connectionsToCommit.delete(tabUid);
}
}
/** /**
* CREATE TABLE * CREATE TABLE
* *
@ -1429,6 +1481,7 @@ export class PostgreSQLClient extends AntaresCore {
details: false, details: false,
split: true, split: true,
comments: true, comments: true,
autocommit: true,
...args ...args
}; };
@ -1443,8 +1496,20 @@ export class PostgreSQLClient extends AntaresCore {
.map(q => q.trim()) .map(q => q.trim())
: [sql]; : [sql];
let connection;
const isPool = this._connection instanceof Pool; const isPool = this._connection instanceof Pool;
const connection = isPool ? await this._connection.connect() : this._connection;
if (!args.autocommit && args.tabUid) { // autocommit OFF
if (this._connectionsToCommit.has(args.tabUid))
connection = this._connectionsToCommit.get(args.tabUid);
else {
connection = await this.getConnection();
await connection.query('START TRANSACTION');
this._connectionsToCommit.set(args.tabUid, connection);
}
}
else// autocommit ON
connection = isPool ? await this._connection.connect() : this._connection;
if (args.tabUid && isPool) if (args.tabUid && isPool)
this._runningConnections.set(args.tabUid, connection.processID); this._runningConnections.set(args.tabUid, connection.processID);
@ -1554,7 +1619,7 @@ export class PostgreSQLClient extends AntaresCore {
}); });
} }
catch (err) { catch (err) {
if (isPool) { if (isPool && args.autocommit) {
connection.release(); connection.release();
this._runningConnections.delete(args.tabUid); this._runningConnections.delete(args.tabUid);
} }
@ -1566,7 +1631,7 @@ export class PostgreSQLClient extends AntaresCore {
keysArr = keysArr ? [...keysArr, ...response] : response; keysArr = keysArr ? [...keysArr, ...response] : response;
} }
catch (err) { catch (err) {
if (isPool) { if (isPool && args.autocommit) {
connection.release(); connection.release();
this._runningConnections.delete(args.tabUid); this._runningConnections.delete(args.tabUid);
} }
@ -1585,7 +1650,7 @@ export class PostgreSQLClient extends AntaresCore {
}); });
} }
catch (err) { catch (err) {
if (isPool) { if (isPool && args.autocommit) {
connection.release(); connection.release();
this._runningConnections.delete(args.tabUid); this._runningConnections.delete(args.tabUid);
} }
@ -1597,7 +1662,7 @@ export class PostgreSQLClient extends AntaresCore {
resultsArr.push({ rows, report, fields, keys, duration }); resultsArr.push({ rows, report, fields, keys, duration });
} }
if (isPool) { if (isPool && args.autocommit) {
connection.release(); connection.release();
this._runningConnections.delete(args.tabUid); this._runningConnections.delete(args.tabUid);
} }