sepia-search-motore-di-rice.../server/lib/indexers/shared/abstract-indexer.ts

138 lines
4.9 KiB
TypeScript
Raw Normal View History

2023-11-13 10:06:43 +01:00
import { QueueObject, queue } from 'async'
2021-06-24 15:18:54 +02:00
import { inspect } from 'util'
import { logger } from '../../../helpers/logger'
2024-01-04 16:50:17 +01:00
import { CONFIG, INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
2021-06-24 15:18:54 +02:00
import { IndexableDoc } from '../../../types/indexable-doc.model'
import { client } from '../../../helpers/meilisearch'
import { buildInValuesArray } from '../../meilisearch/meilisearch-queries'
2024-01-04 14:20:20 +01:00
import { EnqueuedTask } from 'meilisearch'
2021-06-24 15:18:54 +02:00
// identifier could be an uuid, an handle or a url for example
export type QueueParam = { host: string, identifier: string }
export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
2023-11-13 10:06:43 +01:00
protected readonly indexQueue: QueueObject<QueueParam>
2021-06-24 15:18:54 +02:00
protected abstract readonly primaryKey: keyof DB
protected abstract readonly filterableAttributes: string[]
protected abstract readonly sortableAttributes: string[]
protected abstract readonly searchableAttributes: string[]
protected readonly rankingRules: string[]
2024-01-04 14:20:20 +01:00
private elementsToBulkIndex: T[] = []
private bulkIndexationTimer: any
private bulkProcessEnqueuedTask: EnqueuedTask
abstract indexSpecificElement (host: string, uuid: string): Promise<void>
2021-06-24 15:18:54 +02:00
constructor (
protected readonly indexName: string,
protected readonly formatterFn: (o: T) => DB
) {
2023-11-13 10:06:43 +01:00
this.indexQueue = queue<QueueParam, Error>(async (task, cb) => {
try {
2024-01-04 14:20:20 +01:00
await this.indexSpecificElement(task.host, task.identifier)
2023-11-13 10:06:43 +01:00
return cb()
} catch (err) {
logger.error(
{ err: inspect(err) },
'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName
)
cb()
}
2021-06-24 15:18:54 +02:00
}, INDEXER_QUEUE_CONCURRENCY)
}
async initIndex () {
const { results } = await client.getIndexes()
if (results.some(r => r.uid === this.indexName)) {
logger.info(this.indexName + ' already exists, skipping configuration')
return
}
logger.info('Creating and configuring index ' + this.indexName)
await client.index(this.indexName).updateSearchableAttributes(this.searchableAttributes)
await client.index(this.indexName).updateFilterableAttributes(this.filterableAttributes)
await client.index(this.indexName).updateSortableAttributes(this.sortableAttributes)
await client.index(this.indexName).updateFaceting({ maxValuesPerFacet: 10_000 })
await client.index(this.indexName).updatePagination({ maxTotalHits: 10_000 })
if (this.rankingRules) {
await client.index(this.indexName).updateRankingRules(this.rankingRules)
}
2021-06-24 15:18:54 +02:00
}
2024-01-04 14:20:20 +01:00
// ---------------------------------------------------------------------------
2021-06-24 15:18:54 +02:00
removeNotExisting (host: string, existingPrimaryKeys: Set<string>) {
return client.index(this.indexName).deleteDocuments({
filter: `${this.primaryKey.toString()} NOT IN ${buildInValuesArray(Array.from(existingPrimaryKeys))} AND host = ${host}`
})
2021-06-24 15:18:54 +02:00
}
2024-01-09 11:06:15 +01:00
removeFromHosts (existingHosts: string[]) {
return client.index(this.indexName).deleteDocuments({
2024-01-09 11:06:15 +01:00
filter: 'host NOT IN ' + buildInValuesArray(Array.from(existingHosts))
})
2021-06-24 15:18:54 +02:00
}
2024-01-04 14:20:20 +01:00
// ---------------------------------------------------------------------------
scheduleParallelIndexation (host: string, identifier: string) {
this.indexQueue.push({ identifier, host })
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
}
async waitForBulkIndexation () {
if (!this.bulkProcessEnqueuedTask) return
await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000)
this.bulkProcessEnqueuedTask = undefined
}
addElementsToBulkIndex (elements: T[]) {
this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements)
this.scheduleBulkIndexationProcess()
}
private scheduleBulkIndexationProcess () {
if (this.bulkIndexationTimer) return
this.bulkIndexationTimer = setTimeout(async () => {
try {
const elements = this.elementsToBulkIndex
this.elementsToBulkIndex = []
2024-01-04 15:37:00 +01:00
logger.info(`Bulk indexing ${elements.length} elements in ${this.indexName}`)
2024-01-04 14:20:20 +01:00
this.bulkProcessEnqueuedTask = await this.indexElements(elements)
2024-01-04 15:37:00 +01:00
this.bulkIndexationTimer = undefined
2024-01-04 14:20:20 +01:00
} catch (err) {
logger.error({ err }, 'Cannot schedule bulk indexation')
}
2024-01-04 16:50:17 +01:00
}, CONFIG.INDEXER.BULK_INDEXATION_INTERVAL_MS)
2024-01-04 14:20:20 +01:00
}
private async indexElements (elements: T[]) {
const documents = elements.map(e => this.formatterFn(e))
const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() })
logger.debug(result, 'Indexed ' + documents.length + ' documents in ' + this.indexName)
2023-11-13 10:06:43 +01:00
return result
2021-06-24 15:18:54 +02:00
}
2024-01-04 14:20:20 +01:00
// ---------------------------------------------------------------------------
private waitForTask (taskId: number, intervalMs?: number) {
return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs })
}
2021-06-24 15:18:54 +02:00
}