import { AbstractScheduler } from './abstract-scheduler' import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { logger } from '../../helpers/logger' import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos' import { IndexableVideo } from '../../types/video.model' import { inspect } from 'util' import { AsyncQueue, queue } from 'async' import { buildInstanceHosts } from '../elastic-search-instances' import { getChannel, getVideo, getVideos } from '../peertube-instance' import { indexChannels, refreshChannelsIndex, removeNotExistingChannels } from '../elastic-search-channels' type GetVideoQueueParam = { host: string, uuid: string } type GetChannelQueueParam = { host: string, name: string } export class VideosIndexer extends AbstractScheduler { private static instance: AbstractScheduler protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer private readonly indexVideoQueue: AsyncQueue private readonly indexChannelQueue: AsyncQueue private constructor () { super() this.indexVideoQueue = queue((task, cb) => { this.indexSpecificVideo(task.host, task.uuid) .then(() => cb()) .catch(err => { logger.error('Error in index specific video.', { err: inspect(err) }) cb() }) }) this.indexChannelQueue = queue((task, cb) => { this.indexSpecificChannel(task.host, task.name) .then(() => cb()) .catch(err => { logger.error('Error in index specific channel.', { err: inspect(err) }) cb() }) }) this.indexChannelQueue.drain(async () => { logger.info('Refresh channels index.') await refreshChannelsIndex() }) } scheduleVideoIndexation (host: string, uuid: string) { this.indexVideoQueue.push({ uuid, host }) } scheduleChannelIndexation (host: string, name: string) { this.indexChannelQueue.push({ name, host }) } protected async internalExecute () { return this.runVideosIndexer() } private async runVideosIndexer () { const { indexHosts, removedHosts } = await buildInstanceHosts() const channelsToSync = new Set() const channelsId = new Set() const videosId = new Set() await removeVideosFromHosts(removedHosts) for (const host of indexHosts) { try { let videos: IndexableVideo[] = [] let start = 0 do { videos = await getVideos(host, start) start += videos.length logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start) if (videos.length !== 0) { const { created } = await indexVideos(videos) // Fetch complete video foreach created video (to get tags) for (const c of created) { this.scheduleVideoIndexation(host, c.uuid) } } for (const video of videos) { channelsToSync.add(video.channel.name) channelsId.add(video.channel.id) videosId.add(video.id) } } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) logger.info('Added video data from %s.', host) for (const c of channelsToSync) { this.scheduleChannelIndexation(host, c) } await removeNotExistingChannels(host, channelsId) await removeNotExistingVideos(host, videosId) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn('Cannot index videos from %s.', host, { err }) } } await refreshChannelsIndex() await refreshVideosIndex() } private async indexSpecificVideo (host: string, uuid: string) { const video = await getVideo(host, uuid) logger.info('Indexing specific video %s of %s.', uuid, host) await indexVideos([ video ], true) } private async indexSpecificChannel (host: string, name: string) { const channel = await getChannel(host, name) logger.info('Indexing specific channel %s@%s.', name, host) await indexChannels([ channel ], true) } static get Instance () { return this.instance || (this.instance = new this()) } }