121 lines
2.7 KiB
TypeScript
121 lines
2.7 KiB
TypeScript
import { ApiResponse, Client } from '@elastic/elasticsearch'
|
|
import { CONFIG } from '../initializers/constants'
|
|
import { logger } from './logger'
|
|
import { flatMap } from 'lodash'
|
|
import { IndexableDoc } from '../types/elastic-search.model'
|
|
|
|
const elasticSearch = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
|
|
|
|
function buildSort (value: string) {
|
|
let sortField: string
|
|
let direction: 'asc' | 'desc'
|
|
|
|
if (value.substring(0, 1) === '-') {
|
|
direction = 'desc'
|
|
sortField = value.substring(1)
|
|
} else {
|
|
direction = 'asc'
|
|
sortField = value
|
|
}
|
|
|
|
const field = sortField === 'match'
|
|
? '_score'
|
|
: sortField
|
|
|
|
return [
|
|
{
|
|
[field]: { order: direction }
|
|
}
|
|
]
|
|
}
|
|
|
|
function buildIndex (name: string, mapping: object) {
|
|
logger.info('Initialize %s Elastic Search index.', name)
|
|
|
|
return elasticSearch.indices.create({
|
|
index: name,
|
|
body: {
|
|
settings: {
|
|
number_of_shards: 1,
|
|
number_of_replicas: 1
|
|
},
|
|
mappings: {
|
|
properties: mapping
|
|
}
|
|
}
|
|
}).catch(err => {
|
|
if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return
|
|
|
|
throw err
|
|
})
|
|
}
|
|
|
|
async function indexDocuments <T extends IndexableDoc> (options: {
|
|
objects: T[]
|
|
formatter: (o: T) => any
|
|
replace: boolean
|
|
index: string
|
|
}) {
|
|
const { objects, formatter, replace, index } = options
|
|
|
|
const elIdIndex: { [elId: string]: T } = {}
|
|
|
|
for (const object of objects) {
|
|
elIdIndex[object.elasticSearchId] = object
|
|
}
|
|
|
|
const method = replace ? 'index' : 'update'
|
|
|
|
const body = flatMap(objects, v => {
|
|
const doc = formatter(v)
|
|
|
|
const options = replace
|
|
? doc
|
|
: { doc, doc_as_upsert: true }
|
|
|
|
return [
|
|
{
|
|
[method]: {
|
|
_id: v.elasticSearchId,
|
|
_index: index
|
|
}
|
|
},
|
|
options
|
|
]
|
|
})
|
|
|
|
const result = await elasticSearch.bulk({
|
|
index,
|
|
body
|
|
})
|
|
|
|
const resultBody = result.body
|
|
|
|
if (resultBody.errors === true) {
|
|
const msg = 'Cannot insert data in elastic search.'
|
|
logger.error({ err: resultBody }, msg)
|
|
throw new Error(msg)
|
|
}
|
|
|
|
const created: T[] = result.body.items
|
|
.map(i => i[method])
|
|
.filter(i => i.result === 'created')
|
|
.map(i => elIdIndex[i._id])
|
|
|
|
return { created }
|
|
}
|
|
|
|
function extractQueryResult (result: ApiResponse<any, any>) {
|
|
const hits = result.body.hits
|
|
|
|
return { total: hits.total.value, data: hits.hits.map(h => Object.assign(h._source, { score: h._score })) }
|
|
}
|
|
|
|
export {
|
|
elasticSearch,
|
|
indexDocuments,
|
|
buildSort,
|
|
extractQueryResult,
|
|
buildIndex
|
|
}
|