2020-09-03 14:25:57 +02:00
|
|
|
import { AsyncQueue, queue } from 'async'
|
|
|
|
import * as Bluebird from 'bluebird'
|
|
|
|
import { inspect } from 'util'
|
2020-02-13 16:06:52 +01:00
|
|
|
import { logger } from '../../helpers/logger'
|
2020-09-03 14:25:57 +02:00
|
|
|
import { INDEXER_CONCURRENCY, INDEXER_COUNT, INDEXER_QUEUE_CONCURRENCY, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
2020-02-19 15:39:35 +01:00
|
|
|
import { IndexableVideo } from '../../types/video.model'
|
2020-09-03 14:25:57 +02:00
|
|
|
import { indexChannels, refreshChannelsIndex, removeChannelsFromHosts, removeNotExistingChannels } from '../elastic-search-channels'
|
2020-02-19 15:39:35 +01:00
|
|
|
import { buildInstanceHosts } from '../elastic-search-instances'
|
2020-09-03 14:25:57 +02:00
|
|
|
import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos'
|
2020-02-19 15:39:35 +01:00
|
|
|
import { getChannel, getVideo, getVideos } from '../peertube-instance'
|
2020-09-03 14:25:57 +02:00
|
|
|
import { AbstractScheduler } from './abstract-scheduler'
|
2020-02-14 16:14:45 +01:00
|
|
|
|
|
|
|
type GetVideoQueueParam = { host: string, uuid: string }
|
2020-02-19 15:39:35 +01:00
|
|
|
type GetChannelQueueParam = { host: string, name: string }
|
2020-02-13 16:06:52 +01:00
|
|
|
|
|
|
|
export class VideosIndexer extends AbstractScheduler {
|
|
|
|
|
2020-08-27 14:44:21 +02:00
|
|
|
private static instance: VideosIndexer
|
2020-02-13 16:06:52 +01:00
|
|
|
|
|
|
|
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer
|
|
|
|
|
2020-08-27 14:44:21 +02:00
|
|
|
private indexedHosts: string[] = []
|
|
|
|
|
2020-02-19 15:39:35 +01:00
|
|
|
private readonly indexVideoQueue: AsyncQueue<GetVideoQueueParam>
|
|
|
|
private readonly indexChannelQueue: AsyncQueue<GetChannelQueueParam>
|
2020-02-14 16:14:45 +01:00
|
|
|
|
2020-02-13 16:06:52 +01:00
|
|
|
private constructor () {
|
|
|
|
super()
|
2020-02-14 16:14:45 +01:00
|
|
|
|
2020-02-19 15:39:35 +01:00
|
|
|
this.indexVideoQueue = queue<GetVideoQueueParam, Error>((task, cb) => {
|
2020-02-14 16:14:45 +01:00
|
|
|
this.indexSpecificVideo(task.host, task.uuid)
|
|
|
|
.then(() => cb())
|
|
|
|
.catch(err => {
|
2020-05-28 16:32:49 +02:00
|
|
|
logger.error({ err: inspect(err) }, 'Error in index specific video %s of %s.', task.uuid, task.host)
|
2020-02-14 16:14:45 +01:00
|
|
|
cb()
|
|
|
|
})
|
2020-03-04 15:32:39 +01:00
|
|
|
}, INDEXER_QUEUE_CONCURRENCY)
|
2020-02-19 15:39:35 +01:00
|
|
|
|
|
|
|
this.indexChannelQueue = queue<GetChannelQueueParam, Error>((task, cb) => {
|
|
|
|
this.indexSpecificChannel(task.host, task.name)
|
|
|
|
.then(() => cb())
|
|
|
|
.catch(err => {
|
2020-05-28 16:32:49 +02:00
|
|
|
logger.error({ err: inspect(err) }, 'Error in index specific channel %s@%s.', task.name, task.host)
|
2020-02-19 15:39:35 +01:00
|
|
|
cb()
|
|
|
|
})
|
2020-03-04 15:32:39 +01:00
|
|
|
}, INDEXER_QUEUE_CONCURRENCY)
|
2020-02-19 15:39:35 +01:00
|
|
|
|
|
|
|
this.indexChannelQueue.drain(async () => {
|
|
|
|
logger.info('Refresh channels index.')
|
|
|
|
await refreshChannelsIndex()
|
|
|
|
})
|
2020-02-13 16:06:52 +01:00
|
|
|
}
|
|
|
|
|
2020-02-18 15:33:21 +01:00
|
|
|
scheduleVideoIndexation (host: string, uuid: string) {
|
2020-02-19 15:39:35 +01:00
|
|
|
this.indexVideoQueue.push({ uuid, host })
|
|
|
|
}
|
|
|
|
|
|
|
|
scheduleChannelIndexation (host: string, name: string) {
|
|
|
|
this.indexChannelQueue.push({ name, host })
|
2020-02-18 15:33:21 +01:00
|
|
|
}
|
|
|
|
|
2020-08-27 14:44:21 +02:00
|
|
|
getIndexedHosts () {
|
|
|
|
return this.indexedHosts
|
|
|
|
}
|
|
|
|
|
2020-02-13 16:06:52 +01:00
|
|
|
protected async internalExecute () {
|
2020-02-14 16:14:45 +01:00
|
|
|
return this.runVideosIndexer()
|
2020-02-13 16:06:52 +01:00
|
|
|
}
|
|
|
|
|
2020-02-14 16:14:45 +01:00
|
|
|
private async runVideosIndexer () {
|
2020-03-04 15:32:39 +01:00
|
|
|
logger.info('Running videos indexer.')
|
2020-02-13 16:06:52 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
const { indexHosts, removedHosts } = await buildInstanceHosts()
|
2020-08-27 14:44:21 +02:00
|
|
|
this.indexedHosts = indexHosts
|
|
|
|
|
2020-02-19 15:39:35 +01:00
|
|
|
await removeVideosFromHosts(removedHosts)
|
2020-09-03 14:25:57 +02:00
|
|
|
await removeChannelsFromHosts(removedHosts)
|
2020-02-14 14:09:31 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
await Bluebird.map(indexHosts, async host => {
|
2020-02-13 16:06:52 +01:00
|
|
|
try {
|
2020-03-04 15:32:39 +01:00
|
|
|
await this.indexHost(host)
|
|
|
|
} catch (err) {
|
|
|
|
console.error(inspect(err, { depth: 10 }))
|
2020-05-28 16:32:49 +02:00
|
|
|
logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host)
|
2020-03-04 15:32:39 +01:00
|
|
|
}
|
|
|
|
}, { concurrency: INDEXER_CONCURRENCY })
|
2020-02-14 14:09:31 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
await refreshChannelsIndex()
|
|
|
|
await refreshVideosIndex()
|
|
|
|
}
|
2020-02-14 14:09:31 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
private async indexHost (host: string) {
|
|
|
|
const channelsToSync = new Set<string>()
|
|
|
|
const channelsId = new Set<number>()
|
|
|
|
const videosId = new Set<number>()
|
2020-02-13 16:06:52 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
let videos: IndexableVideo[] = []
|
|
|
|
let start = 0
|
2020-02-14 16:14:45 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
do {
|
|
|
|
videos = await getVideos(host, start)
|
|
|
|
start += videos.length
|
2020-02-19 15:39:35 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start)
|
2020-02-13 16:06:52 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
if (videos.length !== 0) {
|
|
|
|
const { created } = await indexVideos(videos)
|
2020-02-19 15:39:35 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
// Fetch complete video foreach created video (to get tags)
|
|
|
|
for (const c of created) {
|
|
|
|
this.scheduleVideoIndexation(host, c.uuid)
|
2020-02-20 14:16:55 +01:00
|
|
|
}
|
2020-03-04 15:32:39 +01:00
|
|
|
}
|
2020-02-20 14:16:55 +01:00
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
for (const video of videos) {
|
|
|
|
channelsToSync.add(video.channel.name)
|
|
|
|
channelsId.add(video.channel.id)
|
|
|
|
videosId.add(video.id)
|
2020-02-13 16:06:52 +01:00
|
|
|
}
|
2020-07-23 13:47:23 +02:00
|
|
|
} while (videos.length === INDEXER_COUNT.VIDEOS && start < 500000)
|
2020-03-04 15:32:39 +01:00
|
|
|
|
|
|
|
logger.info('Added video data from %s.', host)
|
|
|
|
|
|
|
|
for (const c of channelsToSync) {
|
|
|
|
this.scheduleChannelIndexation(host, c)
|
2020-02-13 16:06:52 +01:00
|
|
|
}
|
|
|
|
|
2020-03-04 15:32:39 +01:00
|
|
|
await removeNotExistingChannels(host, channelsId)
|
|
|
|
await removeNotExistingVideos(host, videosId)
|
2020-02-13 16:06:52 +01:00
|
|
|
}
|
|
|
|
|
2020-02-14 16:14:45 +01:00
|
|
|
private async indexSpecificVideo (host: string, uuid: string) {
|
2020-02-19 15:39:35 +01:00
|
|
|
const video = await getVideo(host, uuid)
|
2020-02-14 16:14:45 +01:00
|
|
|
|
|
|
|
logger.info('Indexing specific video %s of %s.', uuid, host)
|
|
|
|
|
2020-02-18 15:33:21 +01:00
|
|
|
await indexVideos([ video ], true)
|
2020-02-14 16:14:45 +01:00
|
|
|
}
|
|
|
|
|
2020-02-19 15:39:35 +01:00
|
|
|
private async indexSpecificChannel (host: string, name: string) {
|
|
|
|
const channel = await getChannel(host, name)
|
|
|
|
|
|
|
|
logger.info('Indexing specific channel %s@%s.', name, host)
|
|
|
|
|
|
|
|
await indexChannels([ channel ], true)
|
2020-02-14 16:14:45 +01:00
|
|
|
}
|
|
|
|
|
2020-02-13 16:06:52 +01:00
|
|
|
static get Instance () {
|
|
|
|
return this.instance || (this.instance = new this())
|
|
|
|
}
|
|
|
|
}
|