sepia-search-motore-di-rice.../server/lib/schedulers/videos-indexer.ts

144 lines
4.3 KiB
TypeScript
Raw Normal View History

2020-02-13 16:06:52 +01:00
import { AbstractScheduler } from './abstract-scheduler'
2020-02-14 14:09:31 +01:00
import { CONFIG, INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
2020-02-13 16:06:52 +01:00
import { doRequest } from '../../helpers/requests'
import { logger } from '../../helpers/logger'
import { ResultList } from '../../../PeerTube/shared/models/result-list.model'
2020-02-14 16:14:45 +01:00
import { Video, VideoDetails } from '../../../PeerTube/shared/models/videos/video.model'
2020-02-14 14:09:31 +01:00
import { indexVideos, listIndexInstances, refreshVideosIndex } from '../elastic-search-videos'
2020-02-14 16:14:45 +01:00
import { IndexableVideo, IndexableDoc } from '../../types/video.model'
2020-02-13 16:06:52 +01:00
import { inspect } from 'util'
2020-02-14 14:09:31 +01:00
import { getRemovedHosts, listIndexInstancesHost } from '../instances-index'
import { elasticSearch } from '../../helpers/elastic-search'
2020-02-14 16:14:45 +01:00
import { AsyncQueue, queue } from 'async'
type GetVideoQueueParam = { host: string, uuid: string }
2020-02-13 16:06:52 +01:00
export class VideosIndexer extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer
2020-02-14 16:14:45 +01:00
private readonly getVideoQueue: AsyncQueue<GetVideoQueueParam>
2020-02-13 16:06:52 +01:00
private constructor () {
super()
2020-02-14 16:14:45 +01:00
this.getVideoQueue = queue<GetVideoQueueParam, Error>((task, cb) => {
this.indexSpecificVideo(task.host, task.uuid)
.then(() => cb())
.catch(err => {
logger.error('Error in index specific video.', { err: inspect(err) })
cb()
})
})
2020-02-13 16:06:52 +01:00
}
protected async internalExecute () {
2020-02-14 16:14:45 +01:00
return this.runVideosIndexer()
2020-02-13 16:06:52 +01:00
}
2020-02-14 16:14:45 +01:00
private async runVideosIndexer () {
2020-02-14 14:09:31 +01:00
const dbHosts = await listIndexInstances()
const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re')
2020-02-13 16:06:52 +01:00
2020-02-14 14:09:31 +01:00
const hostsToRemove = getRemovedHosts(dbHosts, indexHosts)
await this.removeVideosFromHosts(hostsToRemove)
2020-02-14 16:14:45 +01:00
for (const host of indexHosts) {
2020-02-13 16:06:52 +01:00
try {
2020-02-14 14:09:31 +01:00
let videos: IndexableVideo[] = []
let start = 0
do {
2020-02-14 16:14:45 +01:00
videos = await this.getVideos(host, start)
2020-02-14 14:09:31 +01:00
start += videos.length
2020-02-14 16:14:45 +01:00
logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start)
2020-02-13 16:06:52 +01:00
2020-02-14 14:09:31 +01:00
if (videos.length !== 0) {
2020-02-14 16:14:45 +01:00
const { created } = await indexVideos(videos)
// Fetch complete video foreach created video (to get tags)
for (const c of created) {
this.getVideoQueue.push({ uuid: c, host })
}
2020-02-14 14:09:31 +01:00
}
} while (videos.length === INDEXER_COUNT.VIDEOS && start < 500)
2020-02-13 16:06:52 +01:00
2020-02-14 16:14:45 +01:00
logger.info('Added video data from %s.', host)
2020-02-13 16:06:52 +01:00
} catch (err) {
console.error(inspect(err, { depth: 10 }))
2020-02-14 16:14:45 +01:00
logger.warn('Cannot index videos from %s.', host, { err })
2020-02-13 16:06:52 +01:00
}
}
await refreshVideosIndex()
}
2020-02-14 14:09:31 +01:00
private async getVideos (host: string, start: number): Promise<IndexableVideo[]> {
2020-02-13 16:06:52 +01:00
const url = 'https://' + host + '/api/v1/videos'
const res = await doRequest<ResultList<Video>>({
uri: url,
qs: {
2020-02-14 14:09:31 +01:00
start,
2020-02-13 16:06:52 +01:00
filter: 'local',
2020-02-14 14:09:31 +01:00
skipCount: true,
2020-02-13 16:06:52 +01:00
count: INDEXER_COUNT.VIDEOS
},
json: true
})
2020-02-14 16:14:45 +01:00
return res.body.data.map(v => this.prepareVideoForDB(v, host))
}
private async getVideo (host: string, uuid: string): Promise<IndexableVideo> {
const url = 'https://' + host + '/api/v1/videos/' + uuid
const res = await doRequest<VideoDetails>({
uri: url,
json: true
})
return this.prepareVideoForDB(res.body, host)
2020-02-13 16:06:52 +01:00
}
2020-02-14 14:09:31 +01:00
private removeVideosFromHosts (hosts: string[]) {
if (hosts.length === 0) return
logger.info('Will remove videos from hosts.', { hosts })
return elasticSearch.delete_by_query({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query: {
bool: {
2020-02-14 16:14:45 +01:00
filter: {
terms: {
host: hosts
}
}
2020-02-14 14:09:31 +01:00
}
}
}
})
}
2020-02-14 16:14:45 +01:00
private async indexSpecificVideo (host: string, uuid: string) {
const video = await this.getVideo(host, uuid)
logger.info('Indexing specific video %s of %s.', uuid, host)
await indexVideos([ video ])
}
private prepareVideoForDB <T extends Video> (video: T, host: string): T & IndexableDoc {
return Object.assign(video, { elasticSearchId: host + video.id, host })
}
2020-02-13 16:06:52 +01:00
static get Instance () {
return this.instance || (this.instance = new this())
}
}