mirror of
https://github.com/Fabio286/antares.git
synced 2025-02-16 19:50:37 +01:00
perf(MySQL): improvements in connection handling
This commit is contained in:
parent
da56905572
commit
876d5ea481
@ -12,6 +12,7 @@ export class MySQLClient extends BaseClient {
|
||||
private _connectionsToCommit: Map<string, mysql.Connection | mysql.PoolConnection>;
|
||||
private _keepaliveTimer: NodeJS.Timer;
|
||||
private _keepaliveMs: number;
|
||||
private sqlMode?: string[];
|
||||
_connection?: mysql.Connection | mysql.Pool;
|
||||
_params: mysql.ConnectionOptions & {schema: string; ssl?: mysql.SslOptions; ssh?: SSHConfig; readonly: boolean};
|
||||
|
||||
@ -58,6 +59,10 @@ export class MySQLClient extends BaseClient {
|
||||
this._keepaliveMs = 10*60*1000;
|
||||
}
|
||||
|
||||
private get isPool () {
|
||||
return 'getConnection' in this._connection;
|
||||
}
|
||||
|
||||
private _getType (field: mysql.FieldPacket & { columnType?: number; columnLength?: number }) {
|
||||
let name = this.types[field.columnType];
|
||||
let length = field.columnLength;
|
||||
@ -181,9 +186,32 @@ export class MySQLClient extends BaseClient {
|
||||
|
||||
async connect () {
|
||||
if (!this._poolSize)
|
||||
this._connection = await this.getConnection();
|
||||
this._connection = await this.getSingleConnection();
|
||||
else
|
||||
this._connection = await this.getConnectionPool();
|
||||
|
||||
// ANSI_QUOTES check
|
||||
const [response] = await this._connection.query<mysql.RowDataPacket[]>('SHOW GLOBAL VARIABLES LIKE \'%sql_mode%\'');
|
||||
this.sqlMode = response[0]?.Value?.split(',');
|
||||
const hasAnsiQuotes = this.sqlMode.includes('ANSI') || this.sqlMode.includes('ANSI_QUOTES');
|
||||
|
||||
if (hasAnsiQuotes)
|
||||
await this._connection.query(`SET SESSION sql_mode = '${this.sqlMode.filter((m: string) => !['ANSI', 'ANSI_QUOTES'].includes(m)).join(',')}'`);
|
||||
|
||||
if (this._params.readonly)
|
||||
await this._connection.query('SET SESSION TRANSACTION READ ONLY');
|
||||
|
||||
if (this._poolSize) {
|
||||
const hasAnsiQuotes = this.sqlMode.includes('ANSI') || this.sqlMode.includes('ANSI_QUOTES');
|
||||
|
||||
this._connection.on('connection', conn => {
|
||||
if (this._params.readonly)
|
||||
conn.query('SET SESSION TRANSACTION READ ONLY');
|
||||
|
||||
if (hasAnsiQuotes)
|
||||
conn.query(`SET SESSION sql_mode = '${this.sqlMode.filter((m: string) => !['ANSI', 'ANSI_QUOTES'].includes(m)).join(',')}'`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
destroy () {
|
||||
@ -196,7 +224,7 @@ export class MySQLClient extends BaseClient {
|
||||
}
|
||||
}
|
||||
|
||||
async getConnection () {
|
||||
async getSingleConnection () {
|
||||
const dbConfig = await this.getDbConfig();
|
||||
const connection = await mysql.createConnection({
|
||||
...dbConfig,
|
||||
@ -208,17 +236,6 @@ export class MySQLClient extends BaseClient {
|
||||
}
|
||||
});
|
||||
|
||||
// ANSI_QUOTES check
|
||||
const [response] = await connection.query<mysql.RowDataPacket[]>('SHOW GLOBAL VARIABLES LIKE \'%sql_mode%\'');
|
||||
const sqlMode: string[] = response[0]?.Value?.split(',');
|
||||
const hasAnsiQuotes = sqlMode.includes('ANSI') || sqlMode.includes('ANSI_QUOTES');
|
||||
|
||||
if (this._params.readonly)
|
||||
await connection.query('SET SESSION TRANSACTION READ ONLY');
|
||||
|
||||
if (hasAnsiQuotes)
|
||||
await connection.query(`SET SESSION sql_mode = '${sqlMode.filter((m: string) => !['ANSI', 'ANSI_QUOTES'].includes(m)).join(',')}'`);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
@ -227,6 +244,7 @@ export class MySQLClient extends BaseClient {
|
||||
const connection = mysql.createPool({
|
||||
...dbConfig,
|
||||
connectionLimit: this._poolSize,
|
||||
enableKeepAlive: true,
|
||||
typeCast: (field, next) => {
|
||||
if (field.type === 'DATETIME')
|
||||
return field.string();
|
||||
@ -235,25 +253,6 @@ export class MySQLClient extends BaseClient {
|
||||
}
|
||||
});
|
||||
|
||||
// ANSI_QUOTES check
|
||||
const [res] = await connection.query<mysql.RowDataPacket[]>('SHOW GLOBAL VARIABLES LIKE \'%sql_mode%\'');
|
||||
const sqlMode: string[] = res[0]?.Value?.split(',');
|
||||
const hasAnsiQuotes = sqlMode.includes('ANSI') || sqlMode.includes('ANSI_QUOTES');
|
||||
|
||||
if (hasAnsiQuotes)
|
||||
await connection.query(`SET SESSION sql_mode = '${sqlMode.filter((m: string) => !['ANSI', 'ANSI_QUOTES'].includes(m)).join(',')}'`);
|
||||
|
||||
if (this._params.readonly)
|
||||
await connection.query('SET SESSION TRANSACTION READ ONLY');
|
||||
|
||||
connection.on('connection', conn => {
|
||||
if (this._params.readonly)
|
||||
conn.query('SET SESSION TRANSACTION READ ONLY');
|
||||
|
||||
if (hasAnsiQuotes)
|
||||
conn.query(`SET SESSION sql_mode = '${sqlMode.filter((m: string) => !['ANSI', 'ANSI_QUOTES'].includes(m)).join(',')}'`);
|
||||
});
|
||||
|
||||
this._keepaliveTimer = setInterval(async () => {
|
||||
await this.keepAlive();
|
||||
}, this._keepaliveMs);
|
||||
@ -261,6 +260,43 @@ export class MySQLClient extends BaseClient {
|
||||
return connection;
|
||||
}
|
||||
|
||||
private async getConnection (args: antares.QueryParams, retry?: boolean): Promise<mysql.Pool | mysql.PoolConnection | mysql.Connection> {
|
||||
let connection;
|
||||
|
||||
try {
|
||||
if (!args.autocommit && args.tabUid) { // autocommit OFF
|
||||
if (this._connectionsToCommit.has(args.tabUid))
|
||||
connection = this._connectionsToCommit.get(args.tabUid);
|
||||
else {
|
||||
connection = await this.getSingleConnection();
|
||||
await connection.query('SET SESSION autocommit=0');
|
||||
this._connectionsToCommit.set(args.tabUid, connection);
|
||||
}
|
||||
}
|
||||
else// autocommit ON
|
||||
connection = this.isPool ? await (this._connection as mysql.Pool).getConnection() : this._connection;
|
||||
|
||||
if (args.tabUid && this.isPool) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
this._runningConnections.set(args.tabUid, (connection as any).connection.connectionId);
|
||||
}
|
||||
|
||||
if (args.schema)
|
||||
await connection.query(`USE \`${args.schema}\``);
|
||||
|
||||
return connection;
|
||||
}
|
||||
catch (error) {
|
||||
if (error.code === 'ECONNRESET' && !retry) {
|
||||
this.destroy();
|
||||
await this.connect();
|
||||
return this.getConnection(args, true);
|
||||
}
|
||||
else
|
||||
throw new Error(error.message);
|
||||
}
|
||||
}
|
||||
|
||||
private async keepAlive () {
|
||||
try {
|
||||
const connection = await (this._connection as mysql.Pool).getConnection();
|
||||
@ -1648,28 +1684,7 @@ export class MySQLClient extends BaseClient {
|
||||
.map(q => q.trim())
|
||||
: [sql];
|
||||
|
||||
let connection: mysql.Connection | mysql.Pool | mysql.PoolConnection;
|
||||
const isPool = 'getConnection' in this._connection;
|
||||
|
||||
if (!args.autocommit && args.tabUid) { // autocommit OFF
|
||||
if (this._connectionsToCommit.has(args.tabUid))
|
||||
connection = this._connectionsToCommit.get(args.tabUid);
|
||||
else {
|
||||
connection = await this.getConnection();
|
||||
await connection.query('SET SESSION autocommit=0');
|
||||
this._connectionsToCommit.set(args.tabUid, connection);
|
||||
}
|
||||
}
|
||||
else// autocommit ON
|
||||
connection = isPool ? await (this._connection as mysql.Pool).getConnection() : this._connection;
|
||||
|
||||
if (args.tabUid && isPool) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
this._runningConnections.set(args.tabUid, (connection as any).connection.connectionId);
|
||||
}
|
||||
|
||||
if (args.schema)
|
||||
await connection.query(`USE \`${args.schema}\``);
|
||||
const connection = await this.getConnection(args);
|
||||
|
||||
for (const query of queries) {
|
||||
if (!query) continue;
|
||||
@ -1729,7 +1744,7 @@ export class MySQLClient extends BaseClient {
|
||||
});
|
||||
}
|
||||
catch (err) {
|
||||
if (isPool && args.autocommit) {
|
||||
if (this.isPool && args.autocommit) {
|
||||
(connection as mysql.PoolConnection).release();
|
||||
this._runningConnections.delete(args.tabUid);
|
||||
}
|
||||
@ -1741,7 +1756,7 @@ export class MySQLClient extends BaseClient {
|
||||
keysArr = keysArr ? [...keysArr, ...response] : response;
|
||||
}
|
||||
catch (err) {
|
||||
if (isPool && args.autocommit) {
|
||||
if (this.isPool && args.autocommit) {
|
||||
(connection as mysql.PoolConnection).release();
|
||||
this._runningConnections.delete(args.tabUid);
|
||||
}
|
||||
@ -1759,7 +1774,7 @@ export class MySQLClient extends BaseClient {
|
||||
keys: keysArr
|
||||
});
|
||||
}).catch((err) => {
|
||||
if (isPool && args.autocommit) {
|
||||
if (this.isPool && args.autocommit) {
|
||||
(connection as mysql.PoolConnection).release();
|
||||
this._runningConnections.delete(args.tabUid);
|
||||
}
|
||||
@ -1776,7 +1791,7 @@ export class MySQLClient extends BaseClient {
|
||||
});
|
||||
}
|
||||
|
||||
if (isPool && args.autocommit) {
|
||||
if (this.isPool && args.autocommit) {
|
||||
(connection as mysql.PoolConnection).release();
|
||||
this._runningConnections.delete(args.tabUid);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user