diff --git a/server/lib/elastic-search-channels.ts b/server/lib/elastic-search-channels.ts index fd37174..ea927cf 100644 --- a/server/lib/elastic-search-channels.ts +++ b/server/lib/elastic-search-channels.ts @@ -5,6 +5,7 @@ import { logger } from '../helpers/logger' import { DBChannel, IndexableChannel } from '../types/channel.model' import { ChannelsSearchQuery } from '../types/channel-search.model' import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar' +import { difference } from 'lodash' function initChannelsIndex () { return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS, buildChannelsMapping()) @@ -23,6 +24,36 @@ function refreshChannelsIndex () { return elasticSearch.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS }) } +async function removeNotExistingChannels (host: string, existingChannels: Set) { + const idsFromDB = await getChannelIdsOf(host) + + const idsToRemove = difference(idsFromDB, Array.from(existingChannels)) + + logger.info('Will remove %d channels from %s.', idsToRemove.length, host, { idsToRemove }) + + return elasticSearch.delete_by_query({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS, + body: { + query: { + bool: { + filter: [ + { + terms: { + id: idsToRemove + } + }, + { + term: { + host + } + } + ] + } + } + } + }) +} + async function queryChannels (search: ChannelsSearchQuery) { const bool: any = {} @@ -57,12 +88,43 @@ async function queryChannels (search: ChannelsSearchQuery) { return extractQueryResult(res) } +async function getChannelIdsOf (host: string) { + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS, + body: { + size: 0, + aggs: { + ids: { + terms: { + field: 'id' + } + } + }, + query: { + bool: { + filter: [ + { + term: { + host + } + } + ] + } + } + } + }) + + return res.body.aggregations.ids.buckets.map(b => b.key) +} + export { initChannelsIndex, indexChannels, refreshChannelsIndex, formatChannelForAPI, - queryChannels + queryChannels, + getChannelIdsOf, + removeNotExistingChannels } // ############################################################################ diff --git a/server/lib/elastic-search-videos.ts b/server/lib/elastic-search-videos.ts index 88c0b57..9da49ce 100644 --- a/server/lib/elastic-search-videos.ts +++ b/server/lib/elastic-search-videos.ts @@ -5,6 +5,8 @@ import { buildIndex, buildSort, elasticSearch, extractQueryResult, indexDocument import { VideosSearchQuery } from '../types/video-search.model' import { logger } from '../helpers/logger' import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar' +import { difference } from 'lodash' +import { getChannelIdsOf } from './elastic-search-channels' function initVideosIndex () { return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, buildVideosMapping()) @@ -44,6 +46,65 @@ function removeVideosFromHosts (hosts: string[]) { }) } +async function removeNotExistingVideos (host: string, existingVideos: Set) { + const idsFromDB = await getVideoIdsOf(host) + + const idsToRemove = difference(idsFromDB, Array.from(existingVideos)) + + logger.info('Will remove %d videos from %s.', idsToRemove.length, host, { idsToRemove }) + + return elasticSearch.delete_by_query({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + query: { + bool: { + filter: [ + { + terms: { + id: idsToRemove + } + }, + { + term: { + host + } + } + ] + } + } + } + }) +} + +async function getVideoIdsOf (host: string) { + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + size: 0, + aggs: { + ids: { + terms: { + field: 'id' + } + } + }, + query: { + bool: { + filter: [ + { + term: { + host + } + } + ] + } + } + } + }) + + return res.body.aggregations.ids.buckets.map(b => b.key) +} + async function queryVideos (search: VideosSearchQuery) { const bool: any = {} const filter: any[] = [] @@ -193,6 +254,7 @@ async function queryVideos (search: VideosSearchQuery) { export { indexVideos, + removeNotExistingVideos, queryVideos, refreshVideosIndex, removeVideosFromHosts, diff --git a/server/lib/schedulers/videos-indexer.ts b/server/lib/schedulers/videos-indexer.ts index 14c5b61..67151cb 100644 --- a/server/lib/schedulers/videos-indexer.ts +++ b/server/lib/schedulers/videos-indexer.ts @@ -1,13 +1,13 @@ import { AbstractScheduler } from './abstract-scheduler' import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { logger } from '../../helpers/logger' -import { indexVideos, refreshVideosIndex, removeVideosFromHosts } from '../elastic-search-videos' +import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos' import { IndexableVideo } from '../../types/video.model' import { inspect } from 'util' import { AsyncQueue, queue } from 'async' import { buildInstanceHosts } from '../elastic-search-instances' import { getChannel, getVideo, getVideos } from '../peertube-instance' -import { indexChannels, refreshChannelsIndex } from '../elastic-search-channels' +import { indexChannels, refreshChannelsIndex, removeNotExistingChannels } from '../elastic-search-channels' type GetVideoQueueParam = { host: string, uuid: string } type GetChannelQueueParam = { host: string, name: string } @@ -63,6 +63,8 @@ export class VideosIndexer extends AbstractScheduler { private async runVideosIndexer () { const { indexHosts, removedHosts } = await buildInstanceHosts() const channelsToSync = new Set() + const channelsId = new Set() + const videosId = new Set() await removeVideosFromHosts(removedHosts) @@ -84,20 +86,30 @@ export class VideosIndexer extends AbstractScheduler { for (const c of created) { this.scheduleVideoIndexation(host, c.uuid) } + } - videos.forEach(v => channelsToSync.add(v.channel.name)) + for (const video of videos) { + channelsToSync.add(video.channel.name) + channelsId.add(video.channel.id) + videosId.add(video.id) } } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) logger.info('Added video data from %s.', host) - channelsToSync.forEach(c => this.scheduleChannelIndexation(host, c)) + for (const c of channelsToSync) { + this.scheduleChannelIndexation(host, c) + } + + await removeNotExistingChannels(host, channelsId) + await removeNotExistingVideos(host, videosId) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn('Cannot index videos from %s.', host, { err }) } } + await refreshChannelsIndex() await refreshVideosIndex() }