import Bluebird from 'bluebird' import { inspect } from 'util' import { logger } from '../../helpers/logger' import { INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS, CONFIG } from '../../initializers/constants' import { IndexableVideo } from '../../types/video.model' import { buildInstanceHosts } from '../meilisearch/meilisearch-instances' import { ChannelIndexer } from '../indexers/channel-indexer' import { PlaylistIndexer } from '../indexers/playlist-indexer' import { VideoIndexer } from '../indexers/video-indexer' import { getPlaylistsOf, getVideos } from '../requests/peertube-instance' import { AbstractScheduler } from './abstract-scheduler' import { IndexablePlaylist } from '../../types/playlist.model' import { buildDBChannelPrimaryKey } from '../meilisearch/meilisearch-channels' import { getVideosUpdatedAt } from '../meilisearch/meilisearch-videos' export class IndexationScheduler extends AbstractScheduler { private static instance: IndexationScheduler protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.indexation private indexedHosts: string[] = [] private readonly channelIndexer: ChannelIndexer private readonly videoIndexer: VideoIndexer private readonly playlistIndexer: PlaylistIndexer private readonly indexers: [ ChannelIndexer, VideoIndexer, PlaylistIndexer ] private constructor () { super() this.channelIndexer = new ChannelIndexer() this.videoIndexer = new VideoIndexer() this.playlistIndexer = new PlaylistIndexer() this.indexers = [ this.channelIndexer, this.videoIndexer, this.playlistIndexer ] } async initIndexes () { return Promise.all(this.indexers.map(i => i.initIndex())) } getIndexedHosts () { return this.indexedHosts } protected async internalExecute () { return this.runIndexer() } private async runIndexer () { logger.info('Running indexer.') const { indexHosts, removedHosts } = await buildInstanceHosts() this.indexedHosts = indexHosts logger.info({ removedHosts }, `Will remove ${removedHosts.length} hosts that do not exist anymore`) for (const o of this.indexers) { await o.removeFromHosts(removedHosts) } logger.info({ indexHosts }, `Will index ${indexHosts.length} hosts`) await Bluebird.map(indexHosts, async host => { try { await this.indexHost(host) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host) } }, { concurrency: CONFIG.INDEXER.HOST_CONCURRENCY }) logger.info('Indexer ended.') } private async indexHost (host: string) { const channelsToSync = new Set() const existingChannelsId = new Set() const existingVideosId = new Set() let videos: IndexableVideo[] = [] let start = 0 logger.info('Adding video data from %s.', host) do { await this.videoIndexer.waitForBulkIndexation() logger.debug('Getting video results from %s (from = %d).', host, start) videos = await getVideos(host, start) logger.debug('Got %d video results from %s (from = %d).', videos.length, host, start) start += videos.length if (videos.length !== 0) { const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid)) logger.debug('Indexing %d videos from %s.', videos.length, host) this.videoIndexer.addElementsToBulkIndex(videos) // Fetch complete video foreach created video (to get tags) if needed for (const video of videos) { const videoDB = videosFromDB.find(v => v.uuid === video.uuid) if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) { this.videoIndexer.scheduleParallelIndexation(host, video.uuid) } } } for (const video of videos) { channelsToSync.add(video.channel.name) existingChannelsId.add(buildDBChannelPrimaryKey(video.channel)) existingVideosId.add(video.uuid) } } while (videos.length === INDEXER_COUNT && start < INDEXER_LIMIT) logger.info('Added video data from %s.', host) for (const c of channelsToSync) { this.channelIndexer.scheduleParallelIndexation(host, c) } logger.info('Removing non-existing channels and videos from ' + host) await this.channelIndexer.removeNotExisting(host, existingChannelsId) await this.videoIndexer.removeNotExisting(host, existingVideosId) await this.indexPlaylists(host, Array.from(channelsToSync)) } private async indexPlaylists (host: string, channelHandles: string[]) { const existingPlaylistsId = new Set() logger.info('Adding playlist data from %s.', host) for (const channelHandle of channelHandles) { let playlists: IndexablePlaylist[] = [] let start = 0 do { await this.playlistIndexer.waitForBulkIndexation() logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle) playlists = await getPlaylistsOf(host, channelHandle, start) logger.debug('Got %d playlist results from %s (from = %d, channelHandle = %s).', playlists.length, host, start, channelHandle) start += playlists.length if (playlists.length !== 0) { this.playlistIndexer.addElementsToBulkIndex(playlists) logger.debug('Indexed %d playlists from %s.', playlists.length, host) } for (const playlist of playlists) { existingPlaylistsId.add(playlist.uuid) } } while (playlists.length === INDEXER_COUNT && start < INDEXER_LIMIT) } logger.info('Added playlist data from %s.', host) await this.playlistIndexer.removeNotExisting(host, existingPlaylistsId) } static get Instance () { return this.instance || (this.instance = new this()) } }