sepia-search-motore-di-rice.../server/lib/indexers/shared/abstract-indexer.ts

66 lines
2.3 KiB
TypeScript
Raw Normal View History

2021-06-24 15:18:54 +02:00
import { AsyncQueue, queue } from 'async'
import { inspect } from 'util'
2022-06-03 10:54:30 +02:00
import { MappingProperty, PropertyName } from '@elastic/elasticsearch/lib/api/types'
2021-06-24 15:18:54 +02:00
import { logger } from '../../../helpers/logger'
import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
import { buildIndex, indexDocuments, refreshIndex } from '../../../lib/elastic-search/elastic-search-index'
import { removeFromHosts, removeNotExistingIdsFromHost } from '../../../lib/elastic-search/elastic-search-queries'
import { IndexableDoc } from '../../../types/indexable-doc.model'
// identifier could be an uuid, an handle or a url for example
export type QueueParam = { host: string, identifier: string }
export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
protected readonly indexQueue: AsyncQueue<QueueParam>
abstract indexSpecificElement (host: string, uuid: string): Promise<any>
2022-06-03 10:54:30 +02:00
abstract buildMapping (): Record<PropertyName, MappingProperty>
2021-06-24 15:18:54 +02:00
constructor (
protected readonly indexName: string,
protected readonly formatterFn: (o: T) => DB
) {
this.indexQueue = queue<QueueParam, Error>((task, cb) => {
this.indexSpecificElement(task.host, task.identifier)
.then(() => cb())
.catch(err => {
logger.error(
{ err: inspect(err) },
'Error in index specific element %s of %s in index %s.', task.identifier, task.host, this.indexName
)
cb()
})
}, INDEXER_QUEUE_CONCURRENCY)
}
initIndex () {
2021-07-28 13:25:39 +02:00
return buildIndex(this.indexName, this.buildMapping())
2021-06-24 15:18:54 +02:00
}
scheduleIndexation (host: string, identifier: string) {
this.indexQueue.push({ identifier, host })
2021-06-25 10:04:50 +02:00
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
2021-06-24 15:18:54 +02:00
}
refreshIndex () {
return refreshIndex(this.indexName)
}
removeNotExisting (host: string, existingIds: Set<number>) {
return removeNotExistingIdsFromHost(this.indexName, host, existingIds)
}
removeFromHosts (hosts: string[]) {
return removeFromHosts(this.indexName, hosts)
}
indexElements (elements: T[], replace = false) {
return indexDocuments({
objects: elements,
formatter: v => this.formatterFn(v),
replace,
index: this.indexName
})
}
}