diff --git a/server/lib/indexers/playlist-indexer.ts b/server/lib/indexers/playlist-indexer.ts index 7eb2a6b..a91253c 100644 --- a/server/lib/indexers/playlist-indexer.ts +++ b/server/lib/indexers/playlist-indexer.ts @@ -14,7 +14,7 @@ export class PlaylistIndexer extends AbstractIndexer { // We don't need to index a specific element yet, since we have all playlist information in the list endpoint throw new Error('Not implemented') } diff --git a/server/lib/indexers/shared/abstract-indexer.ts b/server/lib/indexers/shared/abstract-indexer.ts index 8d2e032..0a0fa4a 100644 --- a/server/lib/indexers/shared/abstract-indexer.ts +++ b/server/lib/indexers/shared/abstract-indexer.ts @@ -1,4 +1,4 @@ -import { AsyncQueue, queue } from 'async' +import { QueueObject, queue } from 'async' import { inspect } from 'util' import { logger } from '../../../helpers/logger' import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants' @@ -10,7 +10,7 @@ import { buildInValuesArray } from '../../meilisearch/meilisearch-queries' export type QueueParam = { host: string, identifier: string } export abstract class AbstractIndexer { - protected readonly indexQueue: AsyncQueue + protected readonly indexQueue: QueueObject protected abstract readonly primaryKey: keyof DB protected abstract readonly filterableAttributes: string[] @@ -19,22 +19,25 @@ export abstract class AbstractIndexer { protected readonly rankingRules: string[] - abstract indexSpecificElement (host: string, uuid: string): Promise + abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }> constructor ( protected readonly indexName: string, protected readonly formatterFn: (o: T) => DB ) { - this.indexQueue = queue((task, cb) => { - this.indexSpecificElement(task.host, task.identifier) - .then(() => cb()) - .catch(err => { - logger.error( - { err: inspect(err) }, - 'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName - ) - cb() - }) + this.indexQueue = queue(async (task, cb) => { + try { + const { taskUid } = await this.indexSpecificElement(task.host, task.identifier) + await client.index(this.indexName).waitForTask(taskUid) + + return cb() + } catch (err) { + logger.error( + { err: inspect(err) }, + 'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName + ) + cb() + } }, INDEXER_QUEUE_CONCURRENCY) } @@ -81,5 +84,7 @@ export abstract class AbstractIndexer { const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() }) logger.debug(result, 'Indexed ' + documents.length + ' documents in ' + this.indexName) + + return result } } diff --git a/server/lib/indexers/video-indexer.ts b/server/lib/indexers/video-indexer.ts index a9740ca..0e10d7e 100644 --- a/server/lib/indexers/video-indexer.ts +++ b/server/lib/indexers/video-indexer.ts @@ -1,13 +1,14 @@ -import { AsyncQueue } from 'async' +import { QueueObject } from 'async' import { logger } from '../../helpers/logger' import { AbstractIndexer, QueueParam } from './shared' import { CONFIG, SORTABLE_COLUMNS } from '../../initializers/constants' import { formatVideoForDB } from '../meilisearch/meilisearch-videos' import { getVideo } from '../requests/peertube-instance' import { DBVideo, IndexableVideo } from '../../types/video.model' +import { client } from '../../helpers/meilisearch' export class VideoIndexer extends AbstractIndexer { - protected readonly indexQueue: AsyncQueue + protected readonly indexQueue: QueueObject protected readonly primaryKey = 'uuid' protected readonly filterableAttributes = [ 'uuid',