From e8a13222b9be5ae4cf339512e40711aeeb5dedd8 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 4 Mar 2020 15:32:39 +0100 Subject: [PATCH] Some fixes and speedup indexation --- server.ts | 9 +-- server/helpers/core-utils.ts | 9 ++- server/helpers/requests.ts | 26 ++++++- server/initializers/constants.ts | 16 ++++- server/lib/elastic-search-instances.ts | 12 +++- server/lib/elastic-search-videos.ts | 1 - server/lib/instances-index.ts | 4 +- server/lib/peertube-instance.ts | 16 ++--- server/lib/schedulers/videos-indexer.ts | 95 +++++++++++++------------ 9 files changed, 123 insertions(+), 65 deletions(-) diff --git a/server.ts b/server.ts index 3680f49..1d6ab78 100644 --- a/server.ts +++ b/server.ts @@ -1,4 +1,9 @@ import { isTestInstance } from './server/helpers/core-utils' + +if (isTestInstance()) { + require('source-map-support').install() +} + import * as bodyParser from 'body-parser' import * as express from 'express' import * as cors from 'cors' @@ -10,10 +15,6 @@ import { VideosIndexer } from './server/lib/schedulers/videos-indexer' import { initVideosIndex } from './server/lib/elastic-search-videos' import { initChannelsIndex } from './server/lib/elastic-search-channels' -if (isTestInstance()) { - require('source-map-support').install() -} - const app = express() app.use(morgan('combined', { diff --git a/server/helpers/core-utils.ts b/server/helpers/core-utils.ts index 8a679ea..5cb21b9 100644 --- a/server/helpers/core-utils.ts +++ b/server/helpers/core-utils.ts @@ -2,8 +2,15 @@ function isTestInstance () { return process.env.NODE_ENV === 'test' } +function waitMs (ms: number) { + return new Promise(res => { + setTimeout(() => res(), ms) + }) +} + // --------------------------------------------------------------------------- export { - isTestInstance + isTestInstance, + waitMs } diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index 461b40c..eb81caa 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts @@ -1,5 +1,6 @@ import * as Bluebird from 'bluebird' import * as request from 'request' +import { waitMs } from './core-utils' function doRequest ( requestOptions: request.CoreOptions & request.UriOptions @@ -12,6 +13,27 @@ function doRequest ( }) } -export { - doRequest +async function doRequestWithRetries ( + requestOptions: request.CoreOptions & request.UriOptions, + maxRetries: number, + msToWait: number, + currentRetry = 0 +): Promise<{ response: request.RequestResponse, body: T }> { + const res = await doRequest(requestOptions) + + if (res.response.statusCode === 429) { + if (currentRetry < maxRetries) { + await waitMs(msToWait) + return doRequestWithRetries(requestOptions, maxRetries, msToWait, currentRetry + 1) + } + + throw new Error('Exceeded max retries for request ' + requestOptions.uri) + } + + return res +} + +export { + doRequest, + doRequestWithRetries } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index c848960..6ad4ef5 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -38,8 +38,17 @@ const INDEXER_COUNT = { VIDEOS: 10 } +const INDEXER_CONCURRENCY = 3 + +const INDEXER_QUEUE_CONCURRENCY = 3 + +const REQUESTS = { + MAX_RETRIES: 10, + WAIT: 10000 // 10 seconds +} + if (isTestInstance()) { - SCHEDULER_INTERVALS_MS.videosIndexer = 10000 + SCHEDULER_INTERVALS_MS.videosIndexer = 30000 } export { @@ -47,6 +56,9 @@ export { API_VERSION, PAGINATION_COUNT_DEFAULT, SORTABLE_COLUMNS, + INDEXER_QUEUE_CONCURRENCY, SCHEDULER_INTERVALS_MS, - INDEXER_COUNT + INDEXER_CONCURRENCY, + INDEXER_COUNT, + REQUESTS } diff --git a/server/lib/elastic-search-instances.ts b/server/lib/elastic-search-instances.ts index d768519..55169a5 100644 --- a/server/lib/elastic-search-instances.ts +++ b/server/lib/elastic-search-instances.ts @@ -21,7 +21,17 @@ async function listIndexInstances () { } async function buildInstanceHosts () { - const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re') + const whitelist = [ + 'peertube.cpy.re', + 'peertube2.cpy.re', + 'peertube3.cpy.re', + 'framatube.org', + 'aperi.tube', + 'peertube.datagueule.tv', + 'thinkerview.video' + ] + + const indexHosts = (await listIndexInstancesHost()).filter(h => whitelist.includes(h)) const dbHosts = await listIndexInstances() const removedHosts = getRemovedHosts(dbHosts, indexHosts) diff --git a/server/lib/elastic-search-videos.ts b/server/lib/elastic-search-videos.ts index 9da49ce..978b9ba 100644 --- a/server/lib/elastic-search-videos.ts +++ b/server/lib/elastic-search-videos.ts @@ -6,7 +6,6 @@ import { VideosSearchQuery } from '../types/video-search.model' import { logger } from '../helpers/logger' import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar' import { difference } from 'lodash' -import { getChannelIdsOf } from './elastic-search-channels' function initVideosIndex () { return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, buildVideosMapping()) diff --git a/server/lib/instances-index.ts b/server/lib/instances-index.ts index 34d3155..0a2abf5 100644 --- a/server/lib/instances-index.ts +++ b/server/lib/instances-index.ts @@ -1,12 +1,12 @@ import { CONFIG } from '../initializers/constants' import { doRequest } from '../helpers/requests' -async function listIndexInstancesHost () { +async function listIndexInstancesHost (): Promise { const uri = CONFIG.INSTANCES_INDEX.URL + '/api/v1/instances/hosts' const qs = { count: 5000 } - const { body } = await doRequest({ uri, qs, json: true }) + const { body } = await doRequest({ uri, qs, json: true }) return body.data.map(o => o.host as string) } diff --git a/server/lib/peertube-instance.ts b/server/lib/peertube-instance.ts index a431b32..6ab6085 100644 --- a/server/lib/peertube-instance.ts +++ b/server/lib/peertube-instance.ts @@ -1,17 +1,17 @@ import { IndexableVideo } from '../types/video.model' -import { doRequest } from '../helpers/requests' +import { doRequest, doRequestWithRetries } from '../helpers/requests' import { ResultList, Video, VideoChannel, VideoDetails } from '@shared/models' import { IndexableChannel } from '../types/channel.model' -import { INDEXER_COUNT } from '../initializers/constants' +import { INDEXER_COUNT, REQUESTS } from '../initializers/constants' import { IndexableDoc } from '../types/elastic-search.model' async function getVideo (host: string, uuid: string): Promise { const url = 'https://' + host + '/api/v1/videos/' + uuid - const res = await doRequest({ + const res = await doRequestWithRetries({ uri: url, json: true - }) + }, REQUESTS.MAX_RETRIES, REQUESTS.WAIT) return prepareVideoForDB(res.body, host) } @@ -19,10 +19,10 @@ async function getVideo (host: string, uuid: string): Promise { async function getChannel (host: string, name: string): Promise { const url = 'https://' + host + '/api/v1/video-channels/' + name - const res = await doRequest({ + const res = await doRequestWithRetries({ uri: url, json: true - }) + }, REQUESTS.MAX_RETRIES, REQUESTS.WAIT) return prepareChannelForDB(res.body, host) } @@ -30,7 +30,7 @@ async function getChannel (host: string, name: string): Promise { const url = 'https://' + host + '/api/v1/videos' - const res = await doRequest>({ + const res = await doRequestWithRetries>({ uri: url, qs: { start, @@ -39,7 +39,7 @@ async function getVideos (host: string, start: number): Promise prepareVideoForDB(v, host)) } diff --git a/server/lib/schedulers/videos-indexer.ts b/server/lib/schedulers/videos-indexer.ts index 67151cb..0422ece 100644 --- a/server/lib/schedulers/videos-indexer.ts +++ b/server/lib/schedulers/videos-indexer.ts @@ -1,5 +1,5 @@ import { AbstractScheduler } from './abstract-scheduler' -import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { INDEXER_CONCURRENCY, INDEXER_COUNT, INDEXER_QUEUE_CONCURRENCY, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { logger } from '../../helpers/logger' import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos' import { IndexableVideo } from '../../types/video.model' @@ -8,6 +8,7 @@ import { AsyncQueue, queue } from 'async' import { buildInstanceHosts } from '../elastic-search-instances' import { getChannel, getVideo, getVideos } from '../peertube-instance' import { indexChannels, refreshChannelsIndex, removeNotExistingChannels } from '../elastic-search-channels' +import * as Bluebird from 'bluebird' type GetVideoQueueParam = { host: string, uuid: string } type GetChannelQueueParam = { host: string, name: string } @@ -28,19 +29,19 @@ export class VideosIndexer extends AbstractScheduler { this.indexSpecificVideo(task.host, task.uuid) .then(() => cb()) .catch(err => { - logger.error('Error in index specific video.', { err: inspect(err) }) + logger.error('Error in index specific video %s of %s.', task.uuid, task.host, { err: inspect(err) }) cb() }) - }) + }, INDEXER_QUEUE_CONCURRENCY) this.indexChannelQueue = queue((task, cb) => { this.indexSpecificChannel(task.host, task.name) .then(() => cb()) .catch(err => { - logger.error('Error in index specific channel.', { err: inspect(err) }) + logger.error('Error in index specific channel %s@%s.', task.name, task.host, { err: inspect(err) }) cb() }) - }) + }, INDEXER_QUEUE_CONCURRENCY) this.indexChannelQueue.drain(async () => { logger.info('Refresh channels index.') @@ -61,58 +62,64 @@ export class VideosIndexer extends AbstractScheduler { } private async runVideosIndexer () { - const { indexHosts, removedHosts } = await buildInstanceHosts() - const channelsToSync = new Set() - const channelsId = new Set() - const videosId = new Set() + logger.info('Running videos indexer.') + const { indexHosts, removedHosts } = await buildInstanceHosts() await removeVideosFromHosts(removedHosts) - for (const host of indexHosts) { + await Bluebird.map(indexHosts, async host => { try { - let videos: IndexableVideo[] = [] - let start = 0 - - do { - videos = await getVideos(host, start) - start += videos.length - - logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start) - - if (videos.length !== 0) { - const { created } = await indexVideos(videos) - - // Fetch complete video foreach created video (to get tags) - for (const c of created) { - this.scheduleVideoIndexation(host, c.uuid) - } - } - - for (const video of videos) { - channelsToSync.add(video.channel.name) - channelsId.add(video.channel.id) - videosId.add(video.id) - } - } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) - - logger.info('Added video data from %s.', host) - - for (const c of channelsToSync) { - this.scheduleChannelIndexation(host, c) - } - - await removeNotExistingChannels(host, channelsId) - await removeNotExistingVideos(host, videosId) + await this.indexHost(host) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn('Cannot index videos from %s.', host, { err }) } - } + }, { concurrency: INDEXER_CONCURRENCY }) await refreshChannelsIndex() await refreshVideosIndex() } + private async indexHost (host: string) { + const channelsToSync = new Set() + const channelsId = new Set() + const videosId = new Set() + + let videos: IndexableVideo[] = [] + let start = 0 + + do { + videos = await getVideos(host, start) + start += videos.length + + logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start) + + if (videos.length !== 0) { + const { created } = await indexVideos(videos) + + // Fetch complete video foreach created video (to get tags) + for (const c of created) { + this.scheduleVideoIndexation(host, c.uuid) + } + } + + for (const video of videos) { + channelsToSync.add(video.channel.name) + channelsId.add(video.channel.id) + videosId.add(video.id) + } + } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) + + logger.info('Added video data from %s.', host) + + for (const c of channelsToSync) { + this.scheduleChannelIndexation(host, c) + } + + await removeNotExistingChannels(host, channelsId) + await removeNotExistingVideos(host, videosId) + } + private async indexSpecificVideo (host: string, uuid: string) { const video = await getVideo(host, uuid)