From fde7007788c041854add17f32a237db10439b2e3 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 4 Jan 2024 14:20:20 +0100 Subject: [PATCH] Try to have faster indexation --- server/initializers/constants.ts | 3 + server/lib/indexers/channel-indexer.ts | 4 +- .../lib/indexers/shared/abstract-indexer.ts | 61 ++++++++++++++++--- server/lib/indexers/video-indexer.ts | 4 +- server/lib/schedulers/indexation-scheduler.ts | 15 +++-- 5 files changed, 70 insertions(+), 17 deletions(-) diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index d69275c..631bb46 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -73,6 +73,8 @@ const INDEXER_LIMIT = 500000 const INDEXER_HOST_CONCURRENCY = 3 const INDEXER_QUEUE_CONCURRENCY = 3 +const INDEXER_BULK_INDEXATION_MS = 5000 + const REQUESTS = { MAX_RETRIES: 10, WAIT: 10000 // 10 seconds @@ -103,5 +105,6 @@ export { INDEXER_HOST_CONCURRENCY, INDEXER_COUNT, INDEXER_LIMIT, + INDEXER_BULK_INDEXATION_MS, REQUESTS } diff --git a/server/lib/indexers/channel-indexer.ts b/server/lib/indexers/channel-indexer.ts index 789a180..a230603 100644 --- a/server/lib/indexers/channel-indexer.ts +++ b/server/lib/indexers/channel-indexer.ts @@ -17,10 +17,12 @@ export class ChannelIndexer extends AbstractIndexer { protected readonly rankingRules: string[] - abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }> + private elementsToBulkIndex: T[] = [] + private bulkIndexationTimer: any + private bulkProcessEnqueuedTask: EnqueuedTask + + abstract indexSpecificElement (host: string, uuid: string): Promise constructor ( protected readonly indexName: string, @@ -27,8 +32,7 @@ export abstract class AbstractIndexer { ) { this.indexQueue = queue(async (task, cb) => { try { - const { taskUid } = await this.indexSpecificElement(task.host, task.identifier) - await client.index(this.indexName).waitForTask(taskUid, { timeOutMs: 1000 * 60 * 5 }) // 5 minutes timeout + await this.indexSpecificElement(task.host, task.identifier) return cb() } catch (err) { @@ -62,10 +66,7 @@ export abstract class AbstractIndexer { } } - scheduleIndexation (host: string, identifier: string) { - this.indexQueue.push({ identifier, host }) - .catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host)) - } + // --------------------------------------------------------------------------- removeNotExisting (host: string, existingPrimaryKeys: Set) { return client.index(this.indexName).deleteDocuments({ @@ -79,7 +80,43 @@ export abstract class AbstractIndexer { }) } - async indexElements (elements: T[]) { + // --------------------------------------------------------------------------- + + scheduleParallelIndexation (host: string, identifier: string) { + this.indexQueue.push({ identifier, host }) + .catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host)) + } + + async waitForBulkIndexation () { + if (!this.bulkProcessEnqueuedTask) return + + await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000) + + this.bulkProcessEnqueuedTask = undefined + } + + addElementsToBulkIndex (elements: T[]) { + this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements) + + this.scheduleBulkIndexationProcess() + } + + private scheduleBulkIndexationProcess () { + if (this.bulkIndexationTimer) return + + this.bulkIndexationTimer = setTimeout(async () => { + try { + const elements = this.elementsToBulkIndex + this.elementsToBulkIndex = [] + + this.bulkProcessEnqueuedTask = await this.indexElements(elements) + } catch (err) { + logger.error({ err }, 'Cannot schedule bulk indexation') + } + }, INDEXER_BULK_INDEXATION_MS) + } + + private async indexElements (elements: T[]) { const documents = elements.map(e => this.formatterFn(e)) const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() }) @@ -87,4 +124,10 @@ export abstract class AbstractIndexer { return result } + + // --------------------------------------------------------------------------- + + private waitForTask (taskId: number, intervalMs?: number) { + return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs }) + } } diff --git a/server/lib/indexers/video-indexer.ts b/server/lib/indexers/video-indexer.ts index f2df9ec..80a23fa 100644 --- a/server/lib/indexers/video-indexer.ts +++ b/server/lib/indexers/video-indexer.ts @@ -52,10 +52,12 @@ export class VideoIndexer extends AbstractIndexer { } async indexSpecificElement (host: string, uuid: string) { + await this.waitForBulkIndexation() + const video = await getVideo(host, uuid) logger.info('Indexing specific video %s of %s.', uuid, host) - return this.indexElements([ video ]) + this.addElementsToBulkIndex([ video ]) } } diff --git a/server/lib/schedulers/indexation-scheduler.ts b/server/lib/schedulers/indexation-scheduler.ts index fe32790..eb34e0c 100644 --- a/server/lib/schedulers/indexation-scheduler.ts +++ b/server/lib/schedulers/indexation-scheduler.ts @@ -88,6 +88,8 @@ export class IndexationScheduler extends AbstractScheduler { logger.info('Adding video data from %s.', host) do { + await this.videoIndexer.waitForBulkIndexation() + logger.debug('Getting video results from %s (from = %d).', host, start) videos = await getVideos(host, start) @@ -98,16 +100,15 @@ export class IndexationScheduler extends AbstractScheduler { if (videos.length !== 0) { const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid)) - await this.videoIndexer.indexElements(videos) - - logger.debug('Indexed %d videos from %s.', videos.length, host) + logger.debug('Indexing %d videos from %s.', videos.length, host) + this.videoIndexer.addElementsToBulkIndex(videos) // Fetch complete video foreach created video (to get tags) if needed for (const video of videos) { const videoDB = videosFromDB.find(v => v.uuid === video.uuid) if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) { - this.videoIndexer.scheduleIndexation(host, video.uuid) + this.videoIndexer.scheduleParallelIndexation(host, video.uuid) } } } @@ -123,7 +124,7 @@ export class IndexationScheduler extends AbstractScheduler { logger.info('Added video data from %s.', host) for (const c of channelsToSync) { - this.channelIndexer.scheduleIndexation(host, c) + this.channelIndexer.scheduleParallelIndexation(host, c) } logger.info('Removing non-existing channels and videos from ' + host) @@ -143,6 +144,8 @@ export class IndexationScheduler extends AbstractScheduler { let start = 0 do { + await this.playlistIndexer.waitForBulkIndexation() + logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle) playlists = await getPlaylistsOf(host, channelHandle, start) @@ -151,7 +154,7 @@ export class IndexationScheduler extends AbstractScheduler { start += playlists.length if (playlists.length !== 0) { - await this.playlistIndexer.indexElements(playlists) + this.playlistIndexer.addElementsToBulkIndex(playlists) logger.debug('Indexed %d playlists from %s.', playlists.length, host) }