Some fixes and speedup indexation
This commit is contained in:
parent
199745e161
commit
e8a13222b9
|
@ -1,4 +1,9 @@
|
|||
import { isTestInstance } from './server/helpers/core-utils'
|
||||
|
||||
if (isTestInstance()) {
|
||||
require('source-map-support').install()
|
||||
}
|
||||
|
||||
import * as bodyParser from 'body-parser'
|
||||
import * as express from 'express'
|
||||
import * as cors from 'cors'
|
||||
|
@ -10,10 +15,6 @@ import { VideosIndexer } from './server/lib/schedulers/videos-indexer'
|
|||
import { initVideosIndex } from './server/lib/elastic-search-videos'
|
||||
import { initChannelsIndex } from './server/lib/elastic-search-channels'
|
||||
|
||||
if (isTestInstance()) {
|
||||
require('source-map-support').install()
|
||||
}
|
||||
|
||||
const app = express()
|
||||
|
||||
app.use(morgan('combined', {
|
||||
|
|
|
@ -2,8 +2,15 @@ function isTestInstance () {
|
|||
return process.env.NODE_ENV === 'test'
|
||||
}
|
||||
|
||||
function waitMs (ms: number) {
|
||||
return new Promise(res => {
|
||||
setTimeout(() => res(), ms)
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
isTestInstance
|
||||
isTestInstance,
|
||||
waitMs
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import * as Bluebird from 'bluebird'
|
||||
import * as request from 'request'
|
||||
import { waitMs } from './core-utils'
|
||||
|
||||
function doRequest <T> (
|
||||
requestOptions: request.CoreOptions & request.UriOptions
|
||||
|
@ -12,6 +13,27 @@ function doRequest <T> (
|
|||
})
|
||||
}
|
||||
|
||||
export {
|
||||
doRequest
|
||||
async function doRequestWithRetries<T> (
|
||||
requestOptions: request.CoreOptions & request.UriOptions,
|
||||
maxRetries: number,
|
||||
msToWait: number,
|
||||
currentRetry = 0
|
||||
): Promise<{ response: request.RequestResponse, body: T }> {
|
||||
const res = await doRequest<T>(requestOptions)
|
||||
|
||||
if (res.response.statusCode === 429) {
|
||||
if (currentRetry < maxRetries) {
|
||||
await waitMs(msToWait)
|
||||
return doRequestWithRetries(requestOptions, maxRetries, msToWait, currentRetry + 1)
|
||||
}
|
||||
|
||||
throw new Error('Exceeded max retries for request ' + requestOptions.uri)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
export {
|
||||
doRequest,
|
||||
doRequestWithRetries
|
||||
}
|
||||
|
|
|
@ -38,8 +38,17 @@ const INDEXER_COUNT = {
|
|||
VIDEOS: 10
|
||||
}
|
||||
|
||||
const INDEXER_CONCURRENCY = 3
|
||||
|
||||
const INDEXER_QUEUE_CONCURRENCY = 3
|
||||
|
||||
const REQUESTS = {
|
||||
MAX_RETRIES: 10,
|
||||
WAIT: 10000 // 10 seconds
|
||||
}
|
||||
|
||||
if (isTestInstance()) {
|
||||
SCHEDULER_INTERVALS_MS.videosIndexer = 10000
|
||||
SCHEDULER_INTERVALS_MS.videosIndexer = 30000
|
||||
}
|
||||
|
||||
export {
|
||||
|
@ -47,6 +56,9 @@ export {
|
|||
API_VERSION,
|
||||
PAGINATION_COUNT_DEFAULT,
|
||||
SORTABLE_COLUMNS,
|
||||
INDEXER_QUEUE_CONCURRENCY,
|
||||
SCHEDULER_INTERVALS_MS,
|
||||
INDEXER_COUNT
|
||||
INDEXER_CONCURRENCY,
|
||||
INDEXER_COUNT,
|
||||
REQUESTS
|
||||
}
|
||||
|
|
|
@ -21,7 +21,17 @@ async function listIndexInstances () {
|
|||
}
|
||||
|
||||
async function buildInstanceHosts () {
|
||||
const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re')
|
||||
const whitelist = [
|
||||
'peertube.cpy.re',
|
||||
'peertube2.cpy.re',
|
||||
'peertube3.cpy.re',
|
||||
'framatube.org',
|
||||
'aperi.tube',
|
||||
'peertube.datagueule.tv',
|
||||
'thinkerview.video'
|
||||
]
|
||||
|
||||
const indexHosts = (await listIndexInstancesHost()).filter(h => whitelist.includes(h))
|
||||
|
||||
const dbHosts = await listIndexInstances()
|
||||
const removedHosts = getRemovedHosts(dbHosts, indexHosts)
|
||||
|
|
|
@ -6,7 +6,6 @@ import { VideosSearchQuery } from '../types/video-search.model'
|
|||
import { logger } from '../helpers/logger'
|
||||
import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar'
|
||||
import { difference } from 'lodash'
|
||||
import { getChannelIdsOf } from './elastic-search-channels'
|
||||
|
||||
function initVideosIndex () {
|
||||
return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, buildVideosMapping())
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import { CONFIG } from '../initializers/constants'
|
||||
import { doRequest } from '../helpers/requests'
|
||||
|
||||
async function listIndexInstancesHost () {
|
||||
async function listIndexInstancesHost (): Promise<string[]> {
|
||||
const uri = CONFIG.INSTANCES_INDEX.URL + '/api/v1/instances/hosts'
|
||||
|
||||
const qs = { count: 5000 }
|
||||
|
||||
const { body } = await doRequest({ uri, qs, json: true })
|
||||
const { body } = await doRequest<any>({ uri, qs, json: true })
|
||||
|
||||
return body.data.map(o => o.host as string)
|
||||
}
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
import { IndexableVideo } from '../types/video.model'
|
||||
import { doRequest } from '../helpers/requests'
|
||||
import { doRequest, doRequestWithRetries } from '../helpers/requests'
|
||||
import { ResultList, Video, VideoChannel, VideoDetails } from '@shared/models'
|
||||
import { IndexableChannel } from '../types/channel.model'
|
||||
import { INDEXER_COUNT } from '../initializers/constants'
|
||||
import { INDEXER_COUNT, REQUESTS } from '../initializers/constants'
|
||||
import { IndexableDoc } from '../types/elastic-search.model'
|
||||
|
||||
async function getVideo (host: string, uuid: string): Promise<IndexableVideo> {
|
||||
const url = 'https://' + host + '/api/v1/videos/' + uuid
|
||||
|
||||
const res = await doRequest<VideoDetails>({
|
||||
const res = await doRequestWithRetries<VideoDetails>({
|
||||
uri: url,
|
||||
json: true
|
||||
})
|
||||
}, REQUESTS.MAX_RETRIES, REQUESTS.WAIT)
|
||||
|
||||
return prepareVideoForDB(res.body, host)
|
||||
}
|
||||
|
@ -19,10 +19,10 @@ async function getVideo (host: string, uuid: string): Promise<IndexableVideo> {
|
|||
async function getChannel (host: string, name: string): Promise<IndexableChannel> {
|
||||
const url = 'https://' + host + '/api/v1/video-channels/' + name
|
||||
|
||||
const res = await doRequest<VideoChannel>({
|
||||
const res = await doRequestWithRetries<VideoChannel>({
|
||||
uri: url,
|
||||
json: true
|
||||
})
|
||||
}, REQUESTS.MAX_RETRIES, REQUESTS.WAIT)
|
||||
|
||||
return prepareChannelForDB(res.body, host)
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ async function getChannel (host: string, name: string): Promise<IndexableChannel
|
|||
async function getVideos (host: string, start: number): Promise<IndexableVideo[]> {
|
||||
const url = 'https://' + host + '/api/v1/videos'
|
||||
|
||||
const res = await doRequest<ResultList<Video>>({
|
||||
const res = await doRequestWithRetries<ResultList<Video>>({
|
||||
uri: url,
|
||||
qs: {
|
||||
start,
|
||||
|
@ -39,7 +39,7 @@ async function getVideos (host: string, start: number): Promise<IndexableVideo[]
|
|||
count: INDEXER_COUNT.VIDEOS
|
||||
},
|
||||
json: true
|
||||
})
|
||||
}, REQUESTS.MAX_RETRIES, REQUESTS.WAIT)
|
||||
|
||||
return res.body.data.map(v => prepareVideoForDB(v, host))
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { AbstractScheduler } from './abstract-scheduler'
|
||||
import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
||||
import { INDEXER_CONCURRENCY, INDEXER_COUNT, INDEXER_QUEUE_CONCURRENCY, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
||||
import { logger } from '../../helpers/logger'
|
||||
import { indexVideos, refreshVideosIndex, removeNotExistingVideos, removeVideosFromHosts } from '../elastic-search-videos'
|
||||
import { IndexableVideo } from '../../types/video.model'
|
||||
|
@ -8,6 +8,7 @@ import { AsyncQueue, queue } from 'async'
|
|||
import { buildInstanceHosts } from '../elastic-search-instances'
|
||||
import { getChannel, getVideo, getVideos } from '../peertube-instance'
|
||||
import { indexChannels, refreshChannelsIndex, removeNotExistingChannels } from '../elastic-search-channels'
|
||||
import * as Bluebird from 'bluebird'
|
||||
|
||||
type GetVideoQueueParam = { host: string, uuid: string }
|
||||
type GetChannelQueueParam = { host: string, name: string }
|
||||
|
@ -28,19 +29,19 @@ export class VideosIndexer extends AbstractScheduler {
|
|||
this.indexSpecificVideo(task.host, task.uuid)
|
||||
.then(() => cb())
|
||||
.catch(err => {
|
||||
logger.error('Error in index specific video.', { err: inspect(err) })
|
||||
logger.error('Error in index specific video %s of %s.', task.uuid, task.host, { err: inspect(err) })
|
||||
cb()
|
||||
})
|
||||
})
|
||||
}, INDEXER_QUEUE_CONCURRENCY)
|
||||
|
||||
this.indexChannelQueue = queue<GetChannelQueueParam, Error>((task, cb) => {
|
||||
this.indexSpecificChannel(task.host, task.name)
|
||||
.then(() => cb())
|
||||
.catch(err => {
|
||||
logger.error('Error in index specific channel.', { err: inspect(err) })
|
||||
logger.error('Error in index specific channel %s@%s.', task.name, task.host, { err: inspect(err) })
|
||||
cb()
|
||||
})
|
||||
})
|
||||
}, INDEXER_QUEUE_CONCURRENCY)
|
||||
|
||||
this.indexChannelQueue.drain(async () => {
|
||||
logger.info('Refresh channels index.')
|
||||
|
@ -61,15 +62,29 @@ export class VideosIndexer extends AbstractScheduler {
|
|||
}
|
||||
|
||||
private async runVideosIndexer () {
|
||||
logger.info('Running videos indexer.')
|
||||
|
||||
const { indexHosts, removedHosts } = await buildInstanceHosts()
|
||||
await removeVideosFromHosts(removedHosts)
|
||||
|
||||
await Bluebird.map(indexHosts, async host => {
|
||||
try {
|
||||
await this.indexHost(host)
|
||||
} catch (err) {
|
||||
console.error(inspect(err, { depth: 10 }))
|
||||
logger.warn('Cannot index videos from %s.', host, { err })
|
||||
}
|
||||
}, { concurrency: INDEXER_CONCURRENCY })
|
||||
|
||||
await refreshChannelsIndex()
|
||||
await refreshVideosIndex()
|
||||
}
|
||||
|
||||
private async indexHost (host: string) {
|
||||
const channelsToSync = new Set<string>()
|
||||
const channelsId = new Set<number>()
|
||||
const videosId = new Set<number>()
|
||||
|
||||
await removeVideosFromHosts(removedHosts)
|
||||
|
||||
for (const host of indexHosts) {
|
||||
try {
|
||||
let videos: IndexableVideo[] = []
|
||||
let start = 0
|
||||
|
||||
|
@ -103,14 +118,6 @@ export class VideosIndexer extends AbstractScheduler {
|
|||
|
||||
await removeNotExistingChannels(host, channelsId)
|
||||
await removeNotExistingVideos(host, videosId)
|
||||
} catch (err) {
|
||||
console.error(inspect(err, { depth: 10 }))
|
||||
logger.warn('Cannot index videos from %s.', host, { err })
|
||||
}
|
||||
}
|
||||
|
||||
await refreshChannelsIndex()
|
||||
await refreshVideosIndex()
|
||||
}
|
||||
|
||||
private async indexSpecificVideo (host: string, uuid: string) {
|
||||
|
|
Loading…
Reference in New Issue