Change endpoint from persons to people

This commit is contained in:
xfarrow
2025-03-23 21:00:08 +01:00
parent 4ae263662c
commit d005193f63
7158 changed files with 700476 additions and 735 deletions

View File

@ -0,0 +1,51 @@
const chunk = require('lodash/chunk');
const flatten = require('lodash/flatten');
const delay = require('./internal/delay');
const { isNumber } = require('../util/is');
function batchInsert(client, tableName, batch, chunkSize = 1000) {
let returning = undefined;
let transaction = null;
if (!isNumber(chunkSize) || chunkSize < 1) {
throw new TypeError(`Invalid chunkSize: ${chunkSize}`);
}
if (!Array.isArray(batch)) {
throw new TypeError(`Invalid batch: Expected array, got ${typeof batch}`);
}
const chunks = chunk(batch, chunkSize);
const runInTransaction = (cb) => {
if (transaction) {
return cb(transaction);
}
return client.transaction(cb);
};
return Object.assign(
Promise.resolve().then(async () => {
//Next tick to ensure wrapper functions are called if needed
await delay(1);
return runInTransaction(async (tr) => {
const chunksResults = [];
for (const items of chunks) {
chunksResults.push(await tr(tableName).insert(items, returning));
}
return flatten(chunksResults);
});
}),
{
returning(columns) {
returning = columns;
return this;
},
transacting(tr) {
transaction = tr;
return this;
},
}
);
}
module.exports = batchInsert;

View File

@ -0,0 +1,6 @@
/**
* @param {number} delay
* @returns {Promise<void>}
*/
module.exports = (delay) =>
new Promise((resolve) => setTimeout(resolve, delay));

View File

@ -0,0 +1,41 @@
function ensureConnectionCallback(runner) {
runner.client.emit('start', runner.builder);
runner.builder.emit('start', runner.builder);
const sql = runner.builder.toSQL();
if (runner.builder._debug) {
runner.client.logger.debug(sql);
}
if (Array.isArray(sql)) {
return runner.queryArray(sql);
}
return runner.query(sql);
}
function ensureConnectionStreamCallback(runner, params) {
try {
const sql = runner.builder.toSQL();
if (Array.isArray(sql) && params.hasHandler) {
throw new Error(
'The stream may only be used with a single query statement.'
);
}
return runner.client.stream(
runner.connection,
sql,
params.stream,
params.options
);
} catch (e) {
params.stream.emit('error', e);
throw e;
}
}
module.exports = {
ensureConnectionCallback,
ensureConnectionStreamCallback,
};

View File

@ -0,0 +1,62 @@
const _debugQuery = require('debug')('knex:query');
const debugBindings = require('debug')('knex:bindings');
const debugQuery = (sql, txId) => _debugQuery(sql.replace(/%/g, '%%'), txId);
const { isString } = require('../../util/is');
function formatQuery(sql, bindings, timeZone, client) {
bindings = bindings == null ? [] : [].concat(bindings);
let index = 0;
return sql.replace(/\\?\?/g, (match) => {
if (match === '\\?') {
return '?';
}
if (index === bindings.length) {
return match;
}
const value = bindings[index++];
return client._escapeBinding(value, { timeZone });
});
}
function enrichQueryObject(connection, queryParam, client) {
const queryObject = isString(queryParam) ? { sql: queryParam } : queryParam;
queryObject.bindings = client.prepBindings(queryObject.bindings);
queryObject.sql = client.positionBindings(queryObject.sql);
const { __knexUid, __knexTxId } = connection;
client.emit('query', Object.assign({ __knexUid, __knexTxId }, queryObject));
debugQuery(queryObject.sql, __knexTxId);
debugBindings(queryObject.bindings, __knexTxId);
return queryObject;
}
function executeQuery(connection, queryObject, client) {
return client._query(connection, queryObject).catch((err) => {
if (client.config && client.config.compileSqlOnError === false) {
err.message = queryObject.sql + ' - ' + err.message;
} else {
err.message =
formatQuery(queryObject.sql, queryObject.bindings, undefined, client) +
' - ' +
err.message;
}
client.emit(
'query-error',
err,
Object.assign(
{ __knexUid: connection.__knexUid, __knexTxId: connection.__knexUid },
queryObject
)
);
throw err;
});
}
module.exports = {
enrichQueryObject,
executeQuery,
formatQuery,
};

View File

@ -0,0 +1,325 @@
const { KnexTimeoutError } = require('../util/timeout');
const { timeout } = require('../util/timeout');
const {
ensureConnectionCallback,
ensureConnectionStreamCallback,
} = require('./internal/ensure-connection-callback');
let Transform;
// The "Runner" constructor takes a "builder" (query, schema, or raw)
// and runs through each of the query statements, calling any additional
// "output" method provided alongside the query and bindings.
class Runner {
constructor(client, builder) {
this.client = client;
this.builder = builder;
this.queries = [];
// The "connection" object is set on the runner when
// "run" is called.
this.connection = undefined;
}
// "Run" the target, calling "toSQL" on the builder, returning
// an object or array of queries to run, each of which are run on
// a single connection.
async run() {
const runner = this;
try {
const res = await this.ensureConnection(ensureConnectionCallback);
// Fire a single "end" event on the builder when
// all queries have successfully completed.
runner.builder.emit('end');
return res;
// If there are any "error" listeners, we fire an error event
// and then re-throw the error to be eventually handled by
// the promise chain. Useful if you're wrapping in a custom `Promise`.
} catch (err) {
if (runner.builder._events && runner.builder._events.error) {
runner.builder.emit('error', err);
}
throw err;
}
}
// Stream the result set, by passing through to the dialect's streaming
// capabilities. If the options are
stream(optionsOrHandler, handlerOrNil) {
const firstOptionIsHandler =
typeof optionsOrHandler === 'function' && arguments.length === 1;
const options = firstOptionIsHandler ? {} : optionsOrHandler;
const handler = firstOptionIsHandler ? optionsOrHandler : handlerOrNil;
// Determines whether we emit an error or throw here.
const hasHandler = typeof handler === 'function';
// Lazy-load the "Transform" dependency.
Transform = Transform || require('stream').Transform;
const queryContext = this.builder.queryContext();
const stream = new Transform({
objectMode: true,
transform: (chunk, _, callback) => {
callback(null, this.client.postProcessResponse(chunk, queryContext));
},
});
stream.on('close', () => {
this.client.releaseConnection(this.connection);
});
// If the stream is manually destroyed, the close event is not
// propagated to the top of the pipe chain. We need to manually verify
// that the source stream is closed and if not, manually destroy it.
stream.on('pipe', (sourceStream) => {
const cleanSourceStream = () => {
if (!sourceStream.closed) {
sourceStream.destroy();
}
};
// Stream already closed, cleanup immediately
if (stream.closed) {
cleanSourceStream();
} else {
stream.on('close', cleanSourceStream);
}
});
const connectionAcquirePromise = this.ensureConnection(
ensureConnectionStreamCallback,
{
options,
hasHandler,
stream,
}
)
// Emit errors on the stream if the error occurred before a connection
// could be acquired.
// If the connection was acquired, assume the error occurred in the client
// code and has already been emitted on the stream. Don't emit it twice.
.catch((err) => {
if (!this.connection) {
stream.emit('error', err);
}
});
// If a function is passed to handle the stream, send the stream
// there and return the promise, otherwise just return the stream
// and the promise will take care of itself.
if (hasHandler) {
handler(stream);
return connectionAcquirePromise;
}
return stream;
}
// Allow you to pipe the stream to a writable stream.
pipe(writable, options) {
return this.stream(options).pipe(writable);
}
// "Runs" a query, returning a promise. All queries specified by the builder are guaranteed
// to run in sequence, and on the same connection, especially helpful when schema building
// and dealing with foreign key constraints, etc.
async query(obj) {
const { __knexUid, __knexTxId } = this.connection;
this.builder.emit('query', Object.assign({ __knexUid, __knexTxId }, obj));
const runner = this;
const queryContext = this.builder.queryContext();
// query-error events are emitted before the queryPromise continuations.
// pass queryContext into client.query so it can be raised properly.
if (obj !== null && typeof obj === 'object') {
obj.queryContext = queryContext;
}
let queryPromise = this.client.query(this.connection, obj);
if (obj.timeout) {
queryPromise = timeout(queryPromise, obj.timeout);
}
// Await the return value of client.processResponse; in the case of sqlite3's
// dropColumn()/renameColumn(), it will be a Promise for the transaction
// containing the complete rename procedure.
return queryPromise
.then((resp) => this.client.processResponse(resp, runner))
.then((processedResponse) => {
const postProcessedResponse = this.client.postProcessResponse(
processedResponse,
queryContext
);
this.builder.emit(
'query-response',
postProcessedResponse,
Object.assign({ __knexUid, __knexTxId }, obj),
this.builder
);
this.client.emit(
'query-response',
postProcessedResponse,
Object.assign({ __knexUid, __knexTxId }, obj),
this.builder
);
return postProcessedResponse;
})
.catch((error) => {
if (!(error instanceof KnexTimeoutError)) {
return Promise.reject(error);
}
const { timeout, sql, bindings } = obj;
let cancelQuery;
if (obj.cancelOnTimeout) {
cancelQuery = this.client.cancelQuery(this.connection);
} else {
// If we don't cancel the query, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error;
cancelQuery = Promise.resolve();
}
return cancelQuery
.catch((cancelError) => {
// If the cancellation failed, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
// return the connection to the pool, it will be useless until the current operation
// that timed out, finally finishes.
this.connection.__knex__disposed = error;
// cancellation failed
throw Object.assign(cancelError, {
message: `After query timeout of ${timeout}ms exceeded, cancelling of query failed.`,
sql,
bindings,
timeout,
});
})
.then(() => {
// cancellation succeeded, rethrow timeout error
throw Object.assign(error, {
message: `Defined query timeout of ${timeout}ms exceeded when running query.`,
sql,
bindings,
timeout,
});
});
})
.catch((error) => {
this.builder.emit(
'query-error',
error,
Object.assign({ __knexUid, __knexTxId, queryContext }, obj)
);
throw error;
});
}
// In the case of the "schema builder" we call `queryArray`, which runs each
// of the queries in sequence.
async queryArray(queries) {
if (queries.length === 1) {
const query = queries[0];
if (!query.statementsProducer) {
return this.query(query);
}
const statements = await query.statementsProducer(
undefined,
this.connection
);
const sqlQueryObjects = statements.sql.map((statement) => ({
sql: statement,
bindings: query.bindings,
}));
const preQueryObjects = statements.pre.map((statement) => ({
sql: statement,
bindings: query.bindings,
}));
const postQueryObjects = statements.post.map((statement) => ({
sql: statement,
bindings: query.bindings,
}));
let results = [];
await this.queryArray(preQueryObjects);
try {
await this.client.transaction(
async (trx) => {
const transactionRunner = new Runner(trx.client, this.builder);
transactionRunner.connection = this.connection;
results = await transactionRunner.queryArray(sqlQueryObjects);
if (statements.check) {
const foreignViolations = await trx.raw(statements.check);
if (foreignViolations.length > 0) {
throw new Error('FOREIGN KEY constraint failed');
}
}
},
{ connection: this.connection }
);
} finally {
await this.queryArray(postQueryObjects);
}
return results;
}
const results = [];
for (const query of queries) {
results.push(await this.queryArray([query]));
}
return results;
}
// Check whether there's a transaction flag, and that it has a connection.
async ensureConnection(cb, cbParams) {
// Use override from a builder if passed
if (this.builder._connection) {
this.connection = this.builder._connection;
}
if (this.connection) {
return cb(this, cbParams);
}
let acquiredConnection;
try {
acquiredConnection = await this.client.acquireConnection();
} catch (error) {
if (!(error instanceof KnexTimeoutError)) {
return Promise.reject(error);
}
if (this.builder) {
error.sql = this.builder.sql;
error.bindings = this.builder.bindings;
}
throw error;
}
try {
this.connection = acquiredConnection;
return await cb(this, cbParams);
} finally {
await this.client.releaseConnection(acquiredConnection);
}
}
}
module.exports = Runner;

View File

@ -0,0 +1,413 @@
// Transaction
// -------
const { EventEmitter } = require('events');
const Debug = require('debug');
const uniqueId = require('lodash/uniqueId');
const { callbackify } = require('util');
const makeKnex = require('../knex-builder/make-knex');
const { timeout, KnexTimeoutError } = require('../util/timeout');
const finallyMixin = require('../util/finally-mixin');
const debug = Debug('knex:tx');
// FYI: This is defined as a function instead of a constant so that
// each Transactor can have its own copy of the default config.
// This will minimize the impact of bugs that might be introduced
// if a Transactor ever mutates its config.
function DEFAULT_CONFIG() {
return {
userParams: {},
doNotRejectOnRollback: true,
};
}
// These aren't supported in sqlite3 which is serialized already so it's as
// safe as reasonable, except for a special read_uncommitted pragma
const validIsolationLevels = [
// Doesn't really work in postgres, it treats it as read committed
'read uncommitted',
'read committed',
'snapshot',
// snapshot and repeatable read are basically the same, most "repeatable
// read" implementations are actually "snapshot" also known as Multi Version
// Concurrency Control (MVCC). Mssql's repeatable read doesn't stop
// repeated reads for inserts as it uses a pessimistic locking system so
// you should probably use 'snapshot' to stop read skew.
'repeatable read',
// mysql pretends to have serializable, but it is not
'serializable',
];
// Acts as a facade for a Promise, keeping the internal state
// and managing any child transactions.
class Transaction extends EventEmitter {
constructor(client, container, config = DEFAULT_CONFIG(), outerTx = null) {
super();
this.userParams = config.userParams;
this.doNotRejectOnRollback = config.doNotRejectOnRollback;
const txid = (this.txid = uniqueId('trx'));
this.client = client;
this.logger = client.logger;
this.outerTx = outerTx;
this.trxClient = undefined;
this._completed = false;
this._debug = client.config && client.config.debug;
this.readOnly = config.readOnly;
if (config.isolationLevel) {
this.setIsolationLevel(config.isolationLevel);
}
debug(
'%s: Starting %s transaction',
txid,
outerTx ? 'nested' : 'top level'
);
// `this` can potentially serve as an `outerTx` for another
// Transaction. So, go ahead and establish `_lastChild` now.
this._lastChild = Promise.resolve();
const _previousSibling = outerTx ? outerTx._lastChild : Promise.resolve();
// FYI: As you will see in a moment, this Promise will be used to construct
// 2 separate Promise Chains. This ensures that each Promise Chain
// can establish its error-handling semantics without interfering
// with the other Promise Chain.
const basePromise = _previousSibling.then(() =>
this._evaluateContainer(config, container)
);
// FYI: This is the Promise Chain for EXTERNAL use. It ensures that the
// caller must handle any exceptions that result from `basePromise`.
this._promise = basePromise.then((x) => x);
if (outerTx) {
// FYI: This is the Promise Chain for INTERNAL use. It serves as a signal
// for when the next sibling should begin its execution. Therefore,
// exceptions are caught and ignored.
outerTx._lastChild = basePromise.catch(() => {});
}
}
isCompleted() {
return (
this._completed || (this.outerTx && this.outerTx.isCompleted()) || false
);
}
begin(conn) {
const trxMode = [
this.isolationLevel ? `ISOLATION LEVEL ${this.isolationLevel}` : '',
this.readOnly ? 'READ ONLY' : '',
]
.join(' ')
.trim();
if (trxMode.length === 0) {
return this.query(conn, 'BEGIN;');
}
return this.query(conn, `SET TRANSACTION ${trxMode};`).then(() =>
this.query(conn, 'BEGIN;')
);
}
savepoint(conn) {
return this.query(conn, `SAVEPOINT ${this.txid};`);
}
commit(conn, value) {
return this.query(conn, 'COMMIT;', 1, value);
}
release(conn, value) {
return this.query(conn, `RELEASE SAVEPOINT ${this.txid};`, 1, value);
}
setIsolationLevel(isolationLevel) {
if (!validIsolationLevels.includes(isolationLevel)) {
throw new Error(
`Invalid isolationLevel, supported isolation levels are: ${JSON.stringify(
validIsolationLevels
)}`
);
}
this.isolationLevel = isolationLevel;
return this;
}
rollback(conn, error) {
return timeout(this.query(conn, 'ROLLBACK', 2, error), 5000).catch(
(err) => {
if (!(err instanceof KnexTimeoutError)) {
return Promise.reject(err);
}
this._rejecter(error);
}
);
}
rollbackTo(conn, error) {
return timeout(
this.query(conn, `ROLLBACK TO SAVEPOINT ${this.txid}`, 2, error),
5000
).catch((err) => {
if (!(err instanceof KnexTimeoutError)) {
return Promise.reject(err);
}
this._rejecter(error);
});
}
query(conn, sql, status, value) {
const q = this.trxClient
.query(conn, sql)
.catch((err) => {
status = 2;
value = err;
this._completed = true;
debug('%s error running transaction query', this.txid);
})
.then((res) => {
if (status === 1) {
this._resolver(value);
}
if (status === 2) {
if (value === undefined) {
if (this.doNotRejectOnRollback && /^ROLLBACK\b/i.test(sql)) {
this._resolver();
return;
}
value = new Error(`Transaction rejected with non-error: ${value}`);
}
this._rejecter(value);
}
return res;
});
if (status === 1 || status === 2) {
this._completed = true;
}
return q;
}
debug(enabled) {
this._debug = arguments.length ? enabled : true;
return this;
}
async _evaluateContainer(config, container) {
return this.acquireConnection(config, (connection) => {
const trxClient = (this.trxClient = makeTxClient(
this,
this.client,
connection
));
const init = this.client.transacting
? this.savepoint(connection)
: this.begin(connection);
const executionPromise = new Promise((resolver, rejecter) => {
this._resolver = resolver;
this._rejecter = rejecter;
});
init
.then(() => {
return makeTransactor(this, connection, trxClient);
})
.then((transactor) => {
this.transactor = transactor;
if (this.outerTx) {
transactor.parentTransaction = this.outerTx.transactor;
}
transactor.executionPromise = executionPromise;
// If we've returned a "thenable" from the transaction container, assume
// the rollback and commit are chained to this object's success / failure.
// Directly thrown errors are treated as automatic rollbacks.
let result;
try {
result = container(transactor);
} catch (err) {
result = Promise.reject(err);
}
if (result && result.then && typeof result.then === 'function') {
result
.then((val) => {
return transactor.commit(val);
})
.catch((err) => {
return transactor.rollback(err);
});
}
return null;
})
.catch((e) => {
return this._rejecter(e);
});
return executionPromise;
});
}
// Acquire a connection and create a disposer - either using the one passed
// via config or getting one off the client. The disposer will be called once
// the original promise is marked completed.
async acquireConnection(config, cb) {
const configConnection = config && config.connection;
const connection =
configConnection || (await this.client.acquireConnection());
try {
connection.__knexTxId = this.txid;
return await cb(connection);
} finally {
if (!configConnection) {
debug('%s: releasing connection', this.txid);
this.client.releaseConnection(connection);
} else {
debug('%s: not releasing external connection', this.txid);
}
}
}
then(onResolve, onReject) {
return this._promise.then(onResolve, onReject);
}
catch(...args) {
return this._promise.catch(...args);
}
asCallback(cb) {
callbackify(() => this._promise)(cb);
return this._promise;
}
}
finallyMixin(Transaction.prototype);
// The transactor is a full featured knex object, with a "commit", a "rollback"
// and a "savepoint" function. The "savepoint" is just sugar for creating a new
// transaction. If the rollback is run inside a savepoint, it rolls back to the
// last savepoint - otherwise it rolls back the transaction.
function makeTransactor(trx, connection, trxClient) {
const transactor = makeKnex(trxClient);
transactor.context.withUserParams = () => {
throw new Error(
'Cannot set user params on a transaction - it can only inherit params from main knex instance'
);
};
transactor.isTransaction = true;
transactor.userParams = trx.userParams || {};
transactor.context.transaction = function (container, options) {
if (!options) {
options = { doNotRejectOnRollback: true };
} else if (options.doNotRejectOnRollback === undefined) {
options.doNotRejectOnRollback = true;
}
return this._transaction(container, options, trx);
};
transactor.savepoint = function (container, options) {
return transactor.transaction(container, options);
};
if (trx.client.transacting) {
transactor.commit = (value) => trx.release(connection, value);
transactor.rollback = (error) => trx.rollbackTo(connection, error);
} else {
transactor.commit = (value) => trx.commit(connection, value);
transactor.rollback = (error) => trx.rollback(connection, error);
}
transactor.isCompleted = () => trx.isCompleted();
return transactor;
}
// We need to make a client object which always acquires the same
// connection and does not release back into the pool.
function makeTxClient(trx, client, connection) {
const trxClient = Object.create(client.constructor.prototype);
trxClient.version = client.version;
trxClient.config = client.config;
trxClient.driver = client.driver;
trxClient.connectionSettings = client.connectionSettings;
trxClient.transacting = true;
trxClient.valueForUndefined = client.valueForUndefined;
trxClient.logger = client.logger;
trxClient.on('start', function (arg) {
trx.emit('start', arg);
client.emit('start', arg);
});
trxClient.on('query', function (arg) {
trx.emit('query', arg);
client.emit('query', arg);
});
trxClient.on('query-error', function (err, obj) {
trx.emit('query-error', err, obj);
client.emit('query-error', err, obj);
});
trxClient.on('query-response', function (response, obj, builder) {
trx.emit('query-response', response, obj, builder);
client.emit('query-response', response, obj, builder);
});
const _query = trxClient.query;
trxClient.query = function (conn, obj) {
const completed = trx.isCompleted();
return new Promise(function (resolve, reject) {
try {
if (conn !== connection)
throw new Error('Invalid connection for transaction query.');
if (completed) completedError(trx, obj);
resolve(_query.call(trxClient, conn, obj));
} catch (e) {
reject(e);
}
});
};
const _stream = trxClient.stream;
trxClient.stream = function (conn, obj, stream, options) {
const completed = trx.isCompleted();
return new Promise(function (resolve, reject) {
try {
if (conn !== connection)
throw new Error('Invalid connection for transaction query.');
if (completed) completedError(trx, obj);
resolve(_stream.call(trxClient, conn, obj, stream, options));
} catch (e) {
reject(e);
}
});
};
trxClient.acquireConnection = function () {
return Promise.resolve(connection);
};
trxClient.releaseConnection = function () {
return Promise.resolve();
};
return trxClient;
}
function completedError(trx, obj) {
const sql = typeof obj === 'string' ? obj : obj && obj.sql;
debug('%s: Transaction completed: %s', trx.txid, sql);
throw new Error(
'Transaction query already complete, run with DEBUG=knex:tx for more info'
);
}
module.exports = Transaction;