135 lines
3.1 KiB
TypeScript
135 lines
3.1 KiB
TypeScript
import { difference } from 'lodash'
|
|
import { estypes } from '@elastic/elasticsearch'
|
|
import { AggregationsBuckets, AggregationsStringTermsAggregate, AggregationsStringTermsBucket } from '@elastic/elasticsearch/lib/api/types'
|
|
import { elasticSearch } from '../../helpers/elastic-search'
|
|
import { logger } from '../../helpers/logger'
|
|
|
|
async function removeNotExistingIdsFromHost (indexName: string, host: string, existingIds: Set<number>) {
|
|
const idsFromDB = await getIdsOf(indexName, host)
|
|
|
|
// Elastic Search limits max terms amount
|
|
const idsToRemove = difference(idsFromDB, Array.from(existingIds)).splice(0, 50000)
|
|
|
|
logger.info({ idsToRemove }, 'Will remove %d entries from %s of host %s.', idsToRemove.length, indexName, host)
|
|
|
|
return elasticSearch.deleteByQuery({
|
|
index: indexName,
|
|
body: {
|
|
query: {
|
|
bool: {
|
|
filter: [
|
|
{
|
|
terms: {
|
|
id: idsToRemove
|
|
}
|
|
},
|
|
{
|
|
term: {
|
|
host
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
function removeFromHosts (indexName: string, hosts: string[]) {
|
|
if (hosts.length === 0) return
|
|
|
|
logger.info({ hosts }, 'Will remove entries of index %s from hosts.', indexName)
|
|
|
|
return elasticSearch.deleteByQuery({
|
|
index: indexName,
|
|
body: {
|
|
query: {
|
|
bool: {
|
|
filter: {
|
|
terms: {
|
|
host: hosts
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
async function getIdsOf (indexName: string, host: string) {
|
|
const res = await elasticSearch.search<unknown, Record<'ids', AggregationsStringTermsAggregate>>({
|
|
index: indexName,
|
|
body: {
|
|
size: 0,
|
|
aggs: {
|
|
ids: {
|
|
terms: {
|
|
size: 500000,
|
|
field: 'id'
|
|
}
|
|
}
|
|
},
|
|
query: {
|
|
bool: {
|
|
filter: [
|
|
{
|
|
term: {
|
|
host
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
return extractBucketsFromAggregation<number>(res.aggregations.ids.buckets).map(b => b.key)
|
|
}
|
|
|
|
function extractSearchQueryResult (result: estypes.SearchResponse<any, any>) {
|
|
const hits = result.hits
|
|
|
|
return {
|
|
total: (hits.total as estypes.SearchTotalHits).value,
|
|
data: hits.hits.map(h => Object.assign(h._source, { score: h._score }))
|
|
}
|
|
}
|
|
|
|
function extractBucketsFromAggregation <T extends string | number> (buckets: AggregationsBuckets<AggregationsStringTermsBucket>) {
|
|
// FIXME: key returned by elastic search can also be a number
|
|
return buckets as unknown as { key: T }[]
|
|
}
|
|
|
|
function buildSort (value: string) {
|
|
let sortField: string
|
|
let direction: 'asc' | 'desc'
|
|
|
|
if (value.substring(0, 1) === '-') {
|
|
direction = 'desc'
|
|
sortField = value.substring(1)
|
|
} else {
|
|
direction = 'asc'
|
|
sortField = value
|
|
}
|
|
|
|
const field = sortField === 'match'
|
|
? '_score'
|
|
: sortField
|
|
|
|
return [
|
|
{
|
|
[field]: { order: direction }
|
|
}
|
|
]
|
|
}
|
|
|
|
export {
|
|
elasticSearch,
|
|
removeNotExistingIdsFromHost,
|
|
getIdsOf,
|
|
extractSearchQueryResult,
|
|
removeFromHosts,
|
|
buildSort,
|
|
extractBucketsFromAggregation
|
|
}
|