import { AsyncQueue, queue } from 'async' import * as Bluebird from 'bluebird' import { inspect } from 'util' import { logger } from '../../helpers/logger' import { INDEXER_CONCURRENCY, INDEXER_COUNT, INDEXER_QUEUE_CONCURRENCY, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { IndexableVideo } from '../../types/video.model' import { indexChannels, refreshChannelsIndex, removeChannelsFromHosts, removeNotExistingChannels } from '../elastic-search-channels' import { buildInstanceHosts } from '../elastic-search-instances' import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos' import { getChannel, getVideo, getVideos } from '../peertube-instance' import { AbstractScheduler } from './abstract-scheduler' type GetVideoQueueParam = { host: string, uuid: string } type GetChannelQueueParam = { host: string, name: string } export class VideosIndexer extends AbstractScheduler { private static instance: VideosIndexer protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer private indexedHosts: string[] = [] private readonly indexVideoQueue: AsyncQueue private readonly indexChannelQueue: AsyncQueue private constructor () { super() this.indexVideoQueue = queue((task, cb) => { this.indexSpecificVideo(task.host, task.uuid) .then(() => cb()) .catch(err => { logger.error({ err: inspect(err) }, 'Error in index specific video %s of %s.', task.uuid, task.host) cb() }) }, INDEXER_QUEUE_CONCURRENCY) this.indexChannelQueue = queue((task, cb) => { this.indexSpecificChannel(task.host, task.name) .then(() => cb()) .catch(err => { logger.error({ err: inspect(err) }, 'Error in index specific channel %s@%s.', task.name, task.host) cb() }) }, INDEXER_QUEUE_CONCURRENCY) this.indexChannelQueue.drain(async () => { logger.info('Refresh channels index.') await refreshChannelsIndex() }) } scheduleVideoIndexation (host: string, uuid: string) { this.indexVideoQueue.push({ uuid, host }) } scheduleChannelIndexation (host: string, name: string) { this.indexChannelQueue.push({ name, host }) } getIndexedHosts () { return this.indexedHosts } protected async internalExecute () { return this.runVideosIndexer() } private async runVideosIndexer () { logger.info('Running videos indexer.') const { indexHosts, removedHosts } = await buildInstanceHosts() this.indexedHosts = indexHosts await removeVideosFromHosts(removedHosts) await removeChannelsFromHosts(removedHosts) await Bluebird.map(indexHosts, async host => { try { await this.indexHost(host) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host) } }, { concurrency: INDEXER_CONCURRENCY }) await refreshChannelsIndex() await refreshVideosIndex() logger.info('Videos indexer ended.') } private async indexHost (host: string) { const channelsToSync = new Set() const channelsId = new Set() const videosId = new Set() let videos: IndexableVideo[] = [] let start = 0 logger.info('Adding video data from %s.', host) do { logger.debug('Getting results from %s (from = %d).', host, start) videos = await getVideos(host, start) start += videos.length logger.debug('Got %d results from %s (from = %d).', videos.length, host, start) if (videos.length !== 0) { const { created } = await indexVideos(videos) logger.debug('Indexed %d videos from %s.', videos.length, host) // Fetch complete video foreach created video (to get tags) for (const c of created) { this.scheduleVideoIndexation(host, c.uuid) } } for (const video of videos) { channelsToSync.add(video.channel.name) channelsId.add(video.channel.id) videosId.add(video.id) } } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500000) logger.info('Added video data from %s.', host) for (const c of channelsToSync) { this.scheduleChannelIndexation(host, c) } await removeNotExistingChannels(host, channelsId) await removeNotExistingVideos(host, videosId) } private async indexSpecificVideo (host: string, uuid: string) { const video = await getVideo(host, uuid) logger.info('Indexing specific video %s of %s.', uuid, host) await indexVideos([ video ], true) } private async indexSpecificChannel (host: string, name: string) { const channel = await getChannel(host, name) logger.info('Indexing specific channel %s@%s.', name, host) await indexChannels([ channel ], true) } static get Instance () { return this.instance || (this.instance = new this()) } }