Wait indexation

This commit is contained in:
Chocobozzz 2023-11-13 10:06:43 +01:00
parent 1c81808bdd
commit 1a2d9ea921
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
3 changed files with 22 additions and 16 deletions

View File

@ -14,7 +14,7 @@ export class PlaylistIndexer extends AbstractIndexer <IndexablePlaylist, DBPlayl
super(CONFIG.MEILISEARCH.INDEXES.PLAYLISTS, formatPlaylistForDB)
}
async indexSpecificElement (host: string, uuid: string) {
async indexSpecificElement (host: string, uuid: string): Promise<any> {
// We don't need to index a specific element yet, since we have all playlist information in the list endpoint
throw new Error('Not implemented')
}

View File

@ -1,4 +1,4 @@
import { AsyncQueue, queue } from 'async'
import { QueueObject, queue } from 'async'
import { inspect } from 'util'
import { logger } from '../../../helpers/logger'
import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
@ -10,7 +10,7 @@ import { buildInValuesArray } from '../../meilisearch/meilisearch-queries'
export type QueueParam = { host: string, identifier: string }
export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
protected readonly indexQueue: AsyncQueue<QueueParam>
protected readonly indexQueue: QueueObject<QueueParam>
protected abstract readonly primaryKey: keyof DB
protected abstract readonly filterableAttributes: string[]
@ -19,22 +19,25 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
protected readonly rankingRules: string[]
abstract indexSpecificElement (host: string, uuid: string): Promise<any>
abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }>
constructor (
protected readonly indexName: string,
protected readonly formatterFn: (o: T) => DB
) {
this.indexQueue = queue<QueueParam, Error>((task, cb) => {
this.indexSpecificElement(task.host, task.identifier)
.then(() => cb())
.catch(err => {
logger.error(
{ err: inspect(err) },
'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName
)
cb()
})
this.indexQueue = queue<QueueParam, Error>(async (task, cb) => {
try {
const { taskUid } = await this.indexSpecificElement(task.host, task.identifier)
await client.index(this.indexName).waitForTask(taskUid)
return cb()
} catch (err) {
logger.error(
{ err: inspect(err) },
'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName
)
cb()
}
}, INDEXER_QUEUE_CONCURRENCY)
}
@ -81,5 +84,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() })
logger.debug(result, 'Indexed ' + documents.length + ' documents in ' + this.indexName)
return result
}
}

View File

@ -1,13 +1,14 @@
import { AsyncQueue } from 'async'
import { QueueObject } from 'async'
import { logger } from '../../helpers/logger'
import { AbstractIndexer, QueueParam } from './shared'
import { CONFIG, SORTABLE_COLUMNS } from '../../initializers/constants'
import { formatVideoForDB } from '../meilisearch/meilisearch-videos'
import { getVideo } from '../requests/peertube-instance'
import { DBVideo, IndexableVideo } from '../../types/video.model'
import { client } from '../../helpers/meilisearch'
export class VideoIndexer extends AbstractIndexer <IndexableVideo, DBVideo> {
protected readonly indexQueue: AsyncQueue<QueueParam>
protected readonly indexQueue: QueueObject<QueueParam>
protected readonly primaryKey = 'uuid'
protected readonly filterableAttributes = [
'uuid',