sepia-search-motore-di-rice.../server/lib/schedulers/videos-indexer.ts

124 lines
3.9 KiB
TypeScript

import { AbstractScheduler } from './abstract-scheduler'
import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { logger } from '../../helpers/logger'
import { indexVideos, refreshVideosIndex, removeVideosFromHosts } from '../elastic-search-videos'
import { IndexableVideo } from '../../types/video.model'
import { inspect } from 'util'
import { AsyncQueue, queue } from 'async'
import { buildInstanceHosts } from '../elastic-search-instances'
import { getChannel, getVideo, getVideos } from '../peertube-instance'
import { indexChannels, refreshChannelsIndex } from '../elastic-search-channels'
type GetVideoQueueParam = { host: string, uuid: string }
type GetChannelQueueParam = { host: string, name: string }
export class VideosIndexer extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer
private readonly indexVideoQueue: AsyncQueue<GetVideoQueueParam>
private readonly indexChannelQueue: AsyncQueue<GetChannelQueueParam>
private constructor () {
super()
this.indexVideoQueue = queue<GetVideoQueueParam, Error>((task, cb) => {
this.indexSpecificVideo(task.host, task.uuid)
.then(() => cb())
.catch(err => {
logger.error('Error in index specific video.', { err: inspect(err) })
cb()
})
})
this.indexChannelQueue = queue<GetChannelQueueParam, Error>((task, cb) => {
this.indexSpecificChannel(task.host, task.name)
.then(() => cb())
.catch(err => {
logger.error('Error in index specific channel.', { err: inspect(err) })
cb()
})
})
this.indexChannelQueue.drain(async () => {
logger.info('Refresh channels index.')
await refreshChannelsIndex()
})
}
scheduleVideoIndexation (host: string, uuid: string) {
this.indexVideoQueue.push({ uuid, host })
}
scheduleChannelIndexation (host: string, name: string) {
this.indexChannelQueue.push({ name, host })
}
protected async internalExecute () {
return this.runVideosIndexer()
}
private async runVideosIndexer () {
const { indexHosts, removedHosts } = await buildInstanceHosts()
const channelsToSync = new Set<string>()
await removeVideosFromHosts(removedHosts)
for (const host of indexHosts) {
try {
let videos: IndexableVideo[] = []
let start = 0
do {
videos = await getVideos(host, start)
start += videos.length
logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start)
if (videos.length !== 0) {
const { created } = await indexVideos(videos)
// Fetch complete video foreach created video (to get tags)
for (const c of created) {
this.scheduleVideoIndexation(host, c.uuid)
}
videos.forEach(v => channelsToSync.add(v.channel.name))
}
} while (videos.length === INDEXER_COUNT.VIDEOS && start < 500)
logger.info('Added video data from %s.', host)
channelsToSync.forEach(c => this.scheduleChannelIndexation(host, c))
} catch (err) {
console.error(inspect(err, { depth: 10 }))
logger.warn('Cannot index videos from %s.', host, { err })
}
}
await refreshVideosIndex()
}
private async indexSpecificVideo (host: string, uuid: string) {
const video = await getVideo(host, uuid)
logger.info('Indexing specific video %s of %s.', uuid, host)
await indexVideos([ video ], true)
}
private async indexSpecificChannel (host: string, name: string) {
const channel = await getChannel(host, name)
logger.info('Indexing specific channel %s@%s.', name, host)
await indexChannels([ channel ], true)
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}