176 lines
5.7 KiB
TypeScript
176 lines
5.7 KiB
TypeScript
import Bluebird from 'bluebird'
|
|
import { inspect } from 'util'
|
|
import { logger } from '../../helpers/logger'
|
|
import { INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS, CONFIG } from '../../initializers/constants'
|
|
import { IndexableVideo } from '../../types/video.model'
|
|
import { buildInstanceHosts } from '../meilisearch/meilisearch-instances'
|
|
import { ChannelIndexer } from '../indexers/channel-indexer'
|
|
import { PlaylistIndexer } from '../indexers/playlist-indexer'
|
|
import { VideoIndexer } from '../indexers/video-indexer'
|
|
import { getPlaylistsOf, getVideos } from '../requests/peertube-instance'
|
|
import { AbstractScheduler } from './abstract-scheduler'
|
|
import { IndexablePlaylist } from '../../types/playlist.model'
|
|
import { buildDBChannelPrimaryKey } from '../meilisearch/meilisearch-channels'
|
|
import { getVideosUpdatedAt } from '../meilisearch/meilisearch-videos'
|
|
|
|
export class IndexationScheduler extends AbstractScheduler {
|
|
|
|
private static instance: IndexationScheduler
|
|
|
|
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.indexation
|
|
|
|
private indexedHosts: string[] = []
|
|
|
|
private readonly channelIndexer: ChannelIndexer
|
|
private readonly videoIndexer: VideoIndexer
|
|
private readonly playlistIndexer: PlaylistIndexer
|
|
|
|
private readonly indexers: [ ChannelIndexer, VideoIndexer, PlaylistIndexer ]
|
|
|
|
private constructor () {
|
|
super()
|
|
|
|
this.channelIndexer = new ChannelIndexer()
|
|
this.videoIndexer = new VideoIndexer()
|
|
this.playlistIndexer = new PlaylistIndexer()
|
|
|
|
this.indexers = [
|
|
this.channelIndexer,
|
|
this.videoIndexer,
|
|
this.playlistIndexer
|
|
]
|
|
}
|
|
|
|
async initIndexes () {
|
|
return Promise.all(this.indexers.map(i => i.initIndex()))
|
|
}
|
|
|
|
getIndexedHosts () {
|
|
return this.indexedHosts
|
|
}
|
|
|
|
protected async internalExecute () {
|
|
return this.runIndexer()
|
|
}
|
|
|
|
private async runIndexer () {
|
|
logger.info('Running indexer.')
|
|
|
|
this.indexedHosts = await buildInstanceHosts()
|
|
|
|
logger.info({ indexHosts: this.indexedHosts }, `Will index ${this.indexedHosts.length} hosts and remove non existing hosts`)
|
|
|
|
for (const o of this.indexers) {
|
|
await o.removeFromHosts(this.indexedHosts)
|
|
}
|
|
|
|
await Bluebird.map(this.indexedHosts, async host => {
|
|
try {
|
|
await this.indexHost(host)
|
|
} catch (err) {
|
|
console.error(inspect(err, { depth: 10 }))
|
|
logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host)
|
|
}
|
|
}, { concurrency: CONFIG.INDEXER.HOST_CONCURRENCY })
|
|
|
|
logger.info('Indexer ended.')
|
|
}
|
|
|
|
private async indexHost (host: string) {
|
|
const channelsToSync = new Set<string>()
|
|
const existingChannelsId = new Set<string>()
|
|
const existingVideosId = new Set<string>()
|
|
|
|
let videos: IndexableVideo[] = []
|
|
let start = 0
|
|
|
|
logger.info('Adding video data from %s.', host)
|
|
|
|
do {
|
|
await this.videoIndexer.waitForBulkIndexation()
|
|
|
|
logger.debug('Getting video results from %s (from = %d).', host, start)
|
|
|
|
videos = await getVideos(host, start)
|
|
logger.debug('Got %d video results from %s (from = %d).', videos.length, host, start)
|
|
|
|
start += videos.length
|
|
|
|
if (videos.length !== 0) {
|
|
const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid))
|
|
|
|
logger.debug('Indexing %d videos from %s.', videos.length, host)
|
|
this.videoIndexer.addElementsToBulkIndex(videos)
|
|
|
|
// Fetch complete video foreach created video (to get tags) if needed
|
|
for (const video of videos) {
|
|
const videoDB = videosFromDB.find(v => v.uuid === video.uuid)
|
|
|
|
if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) {
|
|
this.videoIndexer.scheduleParallelIndexation(host, video.uuid)
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const video of videos) {
|
|
channelsToSync.add(video.channel.name)
|
|
|
|
existingChannelsId.add(buildDBChannelPrimaryKey(video.channel))
|
|
existingVideosId.add(video.uuid)
|
|
}
|
|
} while (videos.length === INDEXER_COUNT && start < INDEXER_LIMIT)
|
|
|
|
logger.info('Added video data from %s.', host)
|
|
|
|
for (const c of channelsToSync) {
|
|
this.channelIndexer.scheduleParallelIndexation(host, c)
|
|
}
|
|
|
|
logger.info('Removing non-existing channels and videos from ' + host)
|
|
await this.channelIndexer.removeNotExisting(host, existingChannelsId)
|
|
await this.videoIndexer.removeNotExisting(host, existingVideosId)
|
|
|
|
await this.indexPlaylists(host, Array.from(channelsToSync))
|
|
}
|
|
|
|
private async indexPlaylists (host: string, channelHandles: string[]) {
|
|
const existingPlaylistsId = new Set<string>()
|
|
|
|
logger.info('Adding playlist data from %s.', host)
|
|
|
|
for (const channelHandle of channelHandles) {
|
|
let playlists: IndexablePlaylist[] = []
|
|
let start = 0
|
|
|
|
do {
|
|
await this.playlistIndexer.waitForBulkIndexation()
|
|
|
|
logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle)
|
|
|
|
playlists = await getPlaylistsOf(host, channelHandle, start)
|
|
logger.debug('Got %d playlist results from %s (from = %d, channelHandle = %s).', playlists.length, host, start, channelHandle)
|
|
|
|
start += playlists.length
|
|
|
|
if (playlists.length !== 0) {
|
|
this.playlistIndexer.addElementsToBulkIndex(playlists)
|
|
|
|
logger.debug('Indexed %d playlists from %s.', playlists.length, host)
|
|
}
|
|
|
|
for (const playlist of playlists) {
|
|
existingPlaylistsId.add(playlist.uuid)
|
|
}
|
|
} while (playlists.length === INDEXER_COUNT && start < INDEXER_LIMIT)
|
|
}
|
|
|
|
logger.info('Added playlist data from %s.', host)
|
|
|
|
await this.playlistIndexer.removeNotExisting(host, existingPlaylistsId)
|
|
}
|
|
|
|
static get Instance () {
|
|
return this.instance || (this.instance = new this())
|
|
}
|
|
}
|