Try to have faster indexation

This commit is contained in:
Chocobozzz 2024-01-04 14:20:20 +01:00
parent c9f901d6a3
commit fde7007788
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
5 changed files with 70 additions and 17 deletions

View File

@ -73,6 +73,8 @@ const INDEXER_LIMIT = 500000
const INDEXER_HOST_CONCURRENCY = 3 const INDEXER_HOST_CONCURRENCY = 3
const INDEXER_QUEUE_CONCURRENCY = 3 const INDEXER_QUEUE_CONCURRENCY = 3
const INDEXER_BULK_INDEXATION_MS = 5000
const REQUESTS = { const REQUESTS = {
MAX_RETRIES: 10, MAX_RETRIES: 10,
WAIT: 10000 // 10 seconds WAIT: 10000 // 10 seconds
@ -103,5 +105,6 @@ export {
INDEXER_HOST_CONCURRENCY, INDEXER_HOST_CONCURRENCY,
INDEXER_COUNT, INDEXER_COUNT,
INDEXER_LIMIT, INDEXER_LIMIT,
INDEXER_BULK_INDEXATION_MS,
REQUESTS REQUESTS
} }

View File

@ -17,10 +17,12 @@ export class ChannelIndexer extends AbstractIndexer <IndexableChannel, DBChannel
} }
async indexSpecificElement (host: string, name: string) { async indexSpecificElement (host: string, name: string) {
await this.waitForBulkIndexation()
const channel = await getChannel(host, name) const channel = await getChannel(host, name)
logger.info('Indexing specific channel %s@%s.', name, host) logger.info('Indexing specific channel %s@%s.', name, host)
return this.indexElements([ channel ]) this.addElementsToBulkIndex([ channel ])
} }
} }

View File

@ -1,10 +1,11 @@
import { QueueObject, queue } from 'async' import { QueueObject, queue } from 'async'
import { inspect } from 'util' import { inspect } from 'util'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants' import { INDEXER_BULK_INDEXATION_MS, INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
import { IndexableDoc } from '../../../types/indexable-doc.model' import { IndexableDoc } from '../../../types/indexable-doc.model'
import { client } from '../../../helpers/meilisearch' import { client } from '../../../helpers/meilisearch'
import { buildInValuesArray } from '../../meilisearch/meilisearch-queries' import { buildInValuesArray } from '../../meilisearch/meilisearch-queries'
import { EnqueuedTask } from 'meilisearch'
// identifier could be an uuid, an handle or a url for example // identifier could be an uuid, an handle or a url for example
export type QueueParam = { host: string, identifier: string } export type QueueParam = { host: string, identifier: string }
@ -19,7 +20,11 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
protected readonly rankingRules: string[] protected readonly rankingRules: string[]
abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }> private elementsToBulkIndex: T[] = []
private bulkIndexationTimer: any
private bulkProcessEnqueuedTask: EnqueuedTask
abstract indexSpecificElement (host: string, uuid: string): Promise<void>
constructor ( constructor (
protected readonly indexName: string, protected readonly indexName: string,
@ -27,8 +32,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
) { ) {
this.indexQueue = queue<QueueParam, Error>(async (task, cb) => { this.indexQueue = queue<QueueParam, Error>(async (task, cb) => {
try { try {
const { taskUid } = await this.indexSpecificElement(task.host, task.identifier) await this.indexSpecificElement(task.host, task.identifier)
await client.index(this.indexName).waitForTask(taskUid, { timeOutMs: 1000 * 60 * 5 }) // 5 minutes timeout
return cb() return cb()
} catch (err) { } catch (err) {
@ -62,10 +66,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
} }
} }
scheduleIndexation (host: string, identifier: string) { // ---------------------------------------------------------------------------
this.indexQueue.push({ identifier, host })
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
}
removeNotExisting (host: string, existingPrimaryKeys: Set<string>) { removeNotExisting (host: string, existingPrimaryKeys: Set<string>) {
return client.index(this.indexName).deleteDocuments({ return client.index(this.indexName).deleteDocuments({
@ -79,7 +80,43 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
}) })
} }
async indexElements (elements: T[]) { // ---------------------------------------------------------------------------
scheduleParallelIndexation (host: string, identifier: string) {
this.indexQueue.push({ identifier, host })
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
}
async waitForBulkIndexation () {
if (!this.bulkProcessEnqueuedTask) return
await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000)
this.bulkProcessEnqueuedTask = undefined
}
addElementsToBulkIndex (elements: T[]) {
this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements)
this.scheduleBulkIndexationProcess()
}
private scheduleBulkIndexationProcess () {
if (this.bulkIndexationTimer) return
this.bulkIndexationTimer = setTimeout(async () => {
try {
const elements = this.elementsToBulkIndex
this.elementsToBulkIndex = []
this.bulkProcessEnqueuedTask = await this.indexElements(elements)
} catch (err) {
logger.error({ err }, 'Cannot schedule bulk indexation')
}
}, INDEXER_BULK_INDEXATION_MS)
}
private async indexElements (elements: T[]) {
const documents = elements.map(e => this.formatterFn(e)) const documents = elements.map(e => this.formatterFn(e))
const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() }) const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() })
@ -87,4 +124,10 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
return result return result
} }
// ---------------------------------------------------------------------------
private waitForTask (taskId: number, intervalMs?: number) {
return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs })
}
} }

View File

@ -52,10 +52,12 @@ export class VideoIndexer extends AbstractIndexer <IndexableVideo, DBVideo> {
} }
async indexSpecificElement (host: string, uuid: string) { async indexSpecificElement (host: string, uuid: string) {
await this.waitForBulkIndexation()
const video = await getVideo(host, uuid) const video = await getVideo(host, uuid)
logger.info('Indexing specific video %s of %s.', uuid, host) logger.info('Indexing specific video %s of %s.', uuid, host)
return this.indexElements([ video ]) this.addElementsToBulkIndex([ video ])
} }
} }

View File

@ -88,6 +88,8 @@ export class IndexationScheduler extends AbstractScheduler {
logger.info('Adding video data from %s.', host) logger.info('Adding video data from %s.', host)
do { do {
await this.videoIndexer.waitForBulkIndexation()
logger.debug('Getting video results from %s (from = %d).', host, start) logger.debug('Getting video results from %s (from = %d).', host, start)
videos = await getVideos(host, start) videos = await getVideos(host, start)
@ -98,16 +100,15 @@ export class IndexationScheduler extends AbstractScheduler {
if (videos.length !== 0) { if (videos.length !== 0) {
const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid)) const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid))
await this.videoIndexer.indexElements(videos) logger.debug('Indexing %d videos from %s.', videos.length, host)
this.videoIndexer.addElementsToBulkIndex(videos)
logger.debug('Indexed %d videos from %s.', videos.length, host)
// Fetch complete video foreach created video (to get tags) if needed // Fetch complete video foreach created video (to get tags) if needed
for (const video of videos) { for (const video of videos) {
const videoDB = videosFromDB.find(v => v.uuid === video.uuid) const videoDB = videosFromDB.find(v => v.uuid === video.uuid)
if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) { if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) {
this.videoIndexer.scheduleIndexation(host, video.uuid) this.videoIndexer.scheduleParallelIndexation(host, video.uuid)
} }
} }
} }
@ -123,7 +124,7 @@ export class IndexationScheduler extends AbstractScheduler {
logger.info('Added video data from %s.', host) logger.info('Added video data from %s.', host)
for (const c of channelsToSync) { for (const c of channelsToSync) {
this.channelIndexer.scheduleIndexation(host, c) this.channelIndexer.scheduleParallelIndexation(host, c)
} }
logger.info('Removing non-existing channels and videos from ' + host) logger.info('Removing non-existing channels and videos from ' + host)
@ -143,6 +144,8 @@ export class IndexationScheduler extends AbstractScheduler {
let start = 0 let start = 0
do { do {
await this.playlistIndexer.waitForBulkIndexation()
logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle) logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle)
playlists = await getPlaylistsOf(host, channelHandle, start) playlists = await getPlaylistsOf(host, channelHandle, start)
@ -151,7 +154,7 @@ export class IndexationScheduler extends AbstractScheduler {
start += playlists.length start += playlists.length
if (playlists.length !== 0) { if (playlists.length !== 0) {
await this.playlistIndexer.indexElements(playlists) this.playlistIndexer.addElementsToBulkIndex(playlists)
logger.debug('Indexed %d playlists from %s.', playlists.length, host) logger.debug('Indexed %d playlists from %s.', playlists.length, host)
} }