import { QueueObject, queue } from 'async' import { inspect } from 'util' import { logger } from '../../../helpers/logger' import { CONFIG, INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants' import { IndexableDoc } from '../../../types/indexable-doc.model' import { client } from '../../../helpers/meilisearch' import { buildInValuesArray } from '../../meilisearch/meilisearch-queries' import { EnqueuedTask } from 'meilisearch' // identifier could be an uuid, an handle or a url for example export type QueueParam = { host: string, identifier: string } export abstract class AbstractIndexer { protected readonly indexQueue: QueueObject protected abstract readonly primaryKey: keyof DB protected abstract readonly filterableAttributes: string[] protected abstract readonly sortableAttributes: string[] protected abstract readonly searchableAttributes: string[] protected readonly rankingRules: string[] private elementsToBulkIndex: T[] = [] private bulkIndexationTimer: any private bulkProcessEnqueuedTask: EnqueuedTask abstract indexSpecificElement (host: string, uuid: string): Promise constructor ( protected readonly indexName: string, protected readonly formatterFn: (o: T) => DB ) { this.indexQueue = queue(async (task, cb) => { try { await this.indexSpecificElement(task.host, task.identifier) return cb() } catch (err) { logger.error( { err: inspect(err) }, 'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName ) cb() } }, INDEXER_QUEUE_CONCURRENCY) } async initIndex () { const { results } = await client.getIndexes() if (!CONFIG.MEILISEARCH.FORCE_SETTINGS_UPDATE_AT_STARTUP && results.some(r => r.uid === this.indexName)) { logger.info(this.indexName + ' already exists, skipping configuration') return } logger.info(`Creating/updating "${this.indexName}" index settings`) await client.index(this.indexName).updateSearchableAttributes(this.searchableAttributes) await client.index(this.indexName).updateFilterableAttributes(this.filterableAttributes) await client.index(this.indexName).updateSortableAttributes(this.sortableAttributes) await client.index(this.indexName).updateFaceting({ maxValuesPerFacet: 10_000 }) await client.index(this.indexName).updatePagination({ maxTotalHits: 10_000 }) if (this.rankingRules) { await client.index(this.indexName).updateRankingRules(this.rankingRules) } } // --------------------------------------------------------------------------- removeNotExisting (host: string, existingPrimaryKeys: Set) { return client.index(this.indexName).deleteDocuments({ filter: `${this.primaryKey.toString()} NOT IN ${buildInValuesArray(Array.from(existingPrimaryKeys))} AND host = ${host}` }) } removeFromHosts (existingHosts: string[]) { return client.index(this.indexName).deleteDocuments({ filter: 'host NOT IN ' + buildInValuesArray(Array.from(existingHosts)) }) } // --------------------------------------------------------------------------- scheduleParallelIndexation (host: string, identifier: string) { this.indexQueue.push({ identifier, host }) .catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host)) } async waitForBulkIndexation () { if (!this.bulkProcessEnqueuedTask) return await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000) this.bulkProcessEnqueuedTask = undefined } addElementsToBulkIndex (elements: T[]) { this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements) this.scheduleBulkIndexationProcess() } private scheduleBulkIndexationProcess () { if (this.bulkIndexationTimer) return this.bulkIndexationTimer = setTimeout(async () => { try { const elements = this.elementsToBulkIndex this.elementsToBulkIndex = [] logger.info(`Bulk indexing ${elements.length} elements in ${this.indexName}`) this.bulkProcessEnqueuedTask = await this.indexElements(elements) this.bulkIndexationTimer = undefined } catch (err) { logger.error({ err }, 'Cannot schedule bulk indexation') } }, CONFIG.INDEXER.BULK_INDEXATION_INTERVAL_MS) } private async indexElements (elements: T[]) { const documents = elements.map(e => this.formatterFn(e)) const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() }) logger.debug(result, 'Indexed ' + documents.length + ' documents in ' + this.indexName) return result } // --------------------------------------------------------------------------- private waitForTask (taskId: number, intervalMs?: number) { return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs }) } }