import { AsyncQueue, queue } from 'async' import { inspect } from 'util' import { logger } from '../../../helpers/logger' import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants' import { IndexableDoc } from '../../../types/indexable-doc.model' import { client } from '../../../helpers/meilisearch' import { buildInValuesArray } from '../../meilisearch/meilisearch-queries' // identifier could be an uuid, an handle or a url for example export type QueueParam = { host: string, identifier: string } export abstract class AbstractIndexer { protected readonly indexQueue: AsyncQueue protected abstract readonly primaryKey: keyof DB protected abstract readonly filterableAttributes: string[] protected abstract readonly sortableAttributes: string[] protected abstract readonly searchableAttributes: string[] protected readonly rankingRules: string[] abstract indexSpecificElement (host: string, uuid: string): Promise constructor ( protected readonly indexName: string, protected readonly formatterFn: (o: T) => DB ) { this.indexQueue = queue((task, cb) => { this.indexSpecificElement(task.host, task.identifier) .then(() => cb()) .catch(err => { logger.error( { err: inspect(err) }, 'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName ) cb() }) }, INDEXER_QUEUE_CONCURRENCY) } async initIndex () { const { results } = await client.getIndexes() if (results.some(r => r.uid === this.indexName)) { logger.info(this.indexName + ' already exists, skipping configuration') return } logger.info('Creating and configuring index ' + this.indexName) await client.index(this.indexName).updateSearchableAttributes(this.searchableAttributes) await client.index(this.indexName).updateFilterableAttributes(this.filterableAttributes) await client.index(this.indexName).updateSortableAttributes(this.sortableAttributes) await client.index(this.indexName).updateFaceting({ maxValuesPerFacet: 10_000 }) await client.index(this.indexName).updatePagination({ maxTotalHits: 10_000 }) if (this.rankingRules) { await client.index(this.indexName).updateRankingRules(this.rankingRules) } } scheduleIndexation (host: string, identifier: string) { this.indexQueue.push({ identifier, host }) .catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host)) } removeNotExisting (host: string, existingPrimaryKeys: Set) { return client.index(this.indexName).deleteDocuments({ filter: `${this.primaryKey.toString()} NOT IN ${buildInValuesArray(Array.from(existingPrimaryKeys))} AND host = ${host}` }) } removeFromHosts (hosts: string[]) { return client.index(this.indexName).deleteDocuments({ filter: 'host IN ' + buildInValuesArray(Array.from(hosts)) }) } async indexElements (elements: T[]) { const documents = elements.map(e => this.formatterFn(e)) const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() }) logger.debug(result, 'Indexed ' + documents.length + ' documents in ' + this.indexName) } }