Compare commits
5 Commits
c9f901d6a3
...
65295a804a
Author | SHA1 | Date |
---|---|---|
Chocobozzz | 65295a804a | |
Chocobozzz | 491f936906 | |
Chocobozzz | 9c43989b59 | |
Chocobozzz | 69e17e2777 | |
Chocobozzz | fde7007788 |
|
@ -54,3 +54,10 @@ api:
|
|||
enabled: false
|
||||
# Array of hosts
|
||||
hosts: null
|
||||
|
||||
indexer:
|
||||
# How many hosts in parallel to index
|
||||
host_concurrency: 10
|
||||
|
||||
# How much time to wait before bulk indexing in Meilisearch data
|
||||
bulk_indexation_interval_ms: 10000
|
||||
|
|
|
@ -29,6 +29,7 @@ instances-index:
|
|||
- 'replay.jres.org'
|
||||
- 'tube.nah.re'
|
||||
- 'video.passageenseine.fr'
|
||||
- 'peertube.luga.at'
|
||||
|
||||
api:
|
||||
blacklist:
|
||||
|
|
|
@ -45,6 +45,10 @@ const CONFIG = {
|
|||
ENABLED: config.get<boolean>('api.blacklist.enabled'),
|
||||
HOSTS: config.get<string[]>('api.blacklist.hosts')
|
||||
}
|
||||
},
|
||||
INDEXER: {
|
||||
HOST_CONCURRENCY: config.get<number>('indexer.host_concurrency'),
|
||||
BULK_INDEXATION_INTERVAL_MS: config.get<number>('indexer.bulk_indexation_interval_ms')
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,7 +74,6 @@ const SCHEDULER_INTERVALS_MS = {
|
|||
const INDEXER_COUNT = 20
|
||||
const INDEXER_LIMIT = 500000
|
||||
|
||||
const INDEXER_HOST_CONCURRENCY = 3
|
||||
const INDEXER_QUEUE_CONCURRENCY = 3
|
||||
|
||||
const REQUESTS = {
|
||||
|
@ -100,7 +103,6 @@ export {
|
|||
SORTABLE_COLUMNS,
|
||||
INDEXER_QUEUE_CONCURRENCY,
|
||||
SCHEDULER_INTERVALS_MS,
|
||||
INDEXER_HOST_CONCURRENCY,
|
||||
INDEXER_COUNT,
|
||||
INDEXER_LIMIT,
|
||||
REQUESTS
|
||||
|
|
|
@ -17,10 +17,12 @@ export class ChannelIndexer extends AbstractIndexer <IndexableChannel, DBChannel
|
|||
}
|
||||
|
||||
async indexSpecificElement (host: string, name: string) {
|
||||
await this.waitForBulkIndexation()
|
||||
|
||||
const channel = await getChannel(host, name)
|
||||
|
||||
logger.info('Indexing specific channel %s@%s.', name, host)
|
||||
|
||||
return this.indexElements([ channel ])
|
||||
this.addElementsToBulkIndex([ channel ])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import { QueueObject, queue } from 'async'
|
||||
import { inspect } from 'util'
|
||||
import { logger } from '../../../helpers/logger'
|
||||
import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
|
||||
import { CONFIG, INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
|
||||
import { IndexableDoc } from '../../../types/indexable-doc.model'
|
||||
import { client } from '../../../helpers/meilisearch'
|
||||
import { buildInValuesArray } from '../../meilisearch/meilisearch-queries'
|
||||
import { EnqueuedTask } from 'meilisearch'
|
||||
|
||||
// identifier could be an uuid, an handle or a url for example
|
||||
export type QueueParam = { host: string, identifier: string }
|
||||
|
@ -19,7 +20,11 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
|
|||
|
||||
protected readonly rankingRules: string[]
|
||||
|
||||
abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }>
|
||||
private elementsToBulkIndex: T[] = []
|
||||
private bulkIndexationTimer: any
|
||||
private bulkProcessEnqueuedTask: EnqueuedTask
|
||||
|
||||
abstract indexSpecificElement (host: string, uuid: string): Promise<void>
|
||||
|
||||
constructor (
|
||||
protected readonly indexName: string,
|
||||
|
@ -27,8 +32,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
|
|||
) {
|
||||
this.indexQueue = queue<QueueParam, Error>(async (task, cb) => {
|
||||
try {
|
||||
const { taskUid } = await this.indexSpecificElement(task.host, task.identifier)
|
||||
await client.index(this.indexName).waitForTask(taskUid, { timeOutMs: 1000 * 60 * 5 }) // 5 minutes timeout
|
||||
await this.indexSpecificElement(task.host, task.identifier)
|
||||
|
||||
return cb()
|
||||
} catch (err) {
|
||||
|
@ -62,10 +66,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
|
|||
}
|
||||
}
|
||||
|
||||
scheduleIndexation (host: string, identifier: string) {
|
||||
this.indexQueue.push({ identifier, host })
|
||||
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
|
||||
}
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
removeNotExisting (host: string, existingPrimaryKeys: Set<string>) {
|
||||
return client.index(this.indexName).deleteDocuments({
|
||||
|
@ -79,7 +80,47 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
|
|||
})
|
||||
}
|
||||
|
||||
async indexElements (elements: T[]) {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
scheduleParallelIndexation (host: string, identifier: string) {
|
||||
this.indexQueue.push({ identifier, host })
|
||||
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
|
||||
}
|
||||
|
||||
async waitForBulkIndexation () {
|
||||
if (!this.bulkProcessEnqueuedTask) return
|
||||
|
||||
await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000)
|
||||
|
||||
this.bulkProcessEnqueuedTask = undefined
|
||||
}
|
||||
|
||||
addElementsToBulkIndex (elements: T[]) {
|
||||
this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements)
|
||||
|
||||
this.scheduleBulkIndexationProcess()
|
||||
}
|
||||
|
||||
private scheduleBulkIndexationProcess () {
|
||||
if (this.bulkIndexationTimer) return
|
||||
|
||||
this.bulkIndexationTimer = setTimeout(async () => {
|
||||
try {
|
||||
const elements = this.elementsToBulkIndex
|
||||
this.elementsToBulkIndex = []
|
||||
|
||||
logger.info(`Bulk indexing ${elements.length} elements in ${this.indexName}`)
|
||||
|
||||
this.bulkProcessEnqueuedTask = await this.indexElements(elements)
|
||||
|
||||
this.bulkIndexationTimer = undefined
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Cannot schedule bulk indexation')
|
||||
}
|
||||
}, CONFIG.INDEXER.BULK_INDEXATION_INTERVAL_MS)
|
||||
}
|
||||
|
||||
private async indexElements (elements: T[]) {
|
||||
const documents = elements.map(e => this.formatterFn(e))
|
||||
|
||||
const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() })
|
||||
|
@ -87,4 +128,10 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
|
|||
|
||||
return result
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
private waitForTask (taskId: number, intervalMs?: number) {
|
||||
return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs })
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,10 +52,12 @@ export class VideoIndexer extends AbstractIndexer <IndexableVideo, DBVideo> {
|
|||
}
|
||||
|
||||
async indexSpecificElement (host: string, uuid: string) {
|
||||
await this.waitForBulkIndexation()
|
||||
|
||||
const video = await getVideo(host, uuid)
|
||||
|
||||
logger.info('Indexing specific video %s of %s.', uuid, host)
|
||||
|
||||
return this.indexElements([ video ])
|
||||
this.addElementsToBulkIndex([ video ])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ import { CONFIG } from '../../initializers/constants'
|
|||
import { listIndexInstancesHost, getMajorInstanceVersion } from '../requests/instances-index'
|
||||
import { client } from '../../helpers/meilisearch'
|
||||
import { logger } from '../../helpers/logger'
|
||||
import Bluebird from 'bluebird'
|
||||
|
||||
async function buildInstanceHosts () {
|
||||
let indexHosts = await listIndexInstancesHost()
|
||||
|
@ -14,15 +15,17 @@ async function buildInstanceHosts () {
|
|||
indexHosts = indexHosts.filter(h => whitelistHosts.includes(h))
|
||||
}
|
||||
|
||||
for (const indexHost of indexHosts) {
|
||||
indexHosts = await Bluebird.filter(indexHosts, async indexHost => {
|
||||
const instanceVersion = await getMajorInstanceVersion(indexHost)
|
||||
|
||||
if (instanceVersion < 4) {
|
||||
logger.info(`Do not index ${indexHost} because the major version is too low (v${instanceVersion} < v4)`)
|
||||
|
||||
indexHosts = indexHosts.filter(h => h !== indexHost)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}, { concurrency: 10 })
|
||||
|
||||
const dbHosts = await listDBInstances()
|
||||
const removedHosts = getRemovedHosts(dbHosts, indexHosts)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import Bluebird from 'bluebird'
|
||||
import { inspect } from 'util'
|
||||
import { logger } from '../../helpers/logger'
|
||||
import { INDEXER_HOST_CONCURRENCY, INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
||||
import { INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS, CONFIG } from '../../initializers/constants'
|
||||
import { IndexableVideo } from '../../types/video.model'
|
||||
import { buildInstanceHosts } from '../meilisearch/meilisearch-instances'
|
||||
import { ChannelIndexer } from '../indexers/channel-indexer'
|
||||
|
@ -72,7 +72,7 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
console.error(inspect(err, { depth: 10 }))
|
||||
logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host)
|
||||
}
|
||||
}, { concurrency: INDEXER_HOST_CONCURRENCY })
|
||||
}, { concurrency: CONFIG.INDEXER.HOST_CONCURRENCY })
|
||||
|
||||
logger.info('Indexer ended.')
|
||||
}
|
||||
|
@ -88,6 +88,8 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
logger.info('Adding video data from %s.', host)
|
||||
|
||||
do {
|
||||
await this.videoIndexer.waitForBulkIndexation()
|
||||
|
||||
logger.debug('Getting video results from %s (from = %d).', host, start)
|
||||
|
||||
videos = await getVideos(host, start)
|
||||
|
@ -98,16 +100,15 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
if (videos.length !== 0) {
|
||||
const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid))
|
||||
|
||||
await this.videoIndexer.indexElements(videos)
|
||||
|
||||
logger.debug('Indexed %d videos from %s.', videos.length, host)
|
||||
logger.debug('Indexing %d videos from %s.', videos.length, host)
|
||||
this.videoIndexer.addElementsToBulkIndex(videos)
|
||||
|
||||
// Fetch complete video foreach created video (to get tags) if needed
|
||||
for (const video of videos) {
|
||||
const videoDB = videosFromDB.find(v => v.uuid === video.uuid)
|
||||
|
||||
if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) {
|
||||
this.videoIndexer.scheduleIndexation(host, video.uuid)
|
||||
this.videoIndexer.scheduleParallelIndexation(host, video.uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -123,7 +124,7 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
logger.info('Added video data from %s.', host)
|
||||
|
||||
for (const c of channelsToSync) {
|
||||
this.channelIndexer.scheduleIndexation(host, c)
|
||||
this.channelIndexer.scheduleParallelIndexation(host, c)
|
||||
}
|
||||
|
||||
logger.info('Removing non-existing channels and videos from ' + host)
|
||||
|
@ -143,6 +144,8 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
let start = 0
|
||||
|
||||
do {
|
||||
await this.playlistIndexer.waitForBulkIndexation()
|
||||
|
||||
logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle)
|
||||
|
||||
playlists = await getPlaylistsOf(host, channelHandle, start)
|
||||
|
@ -151,7 +154,7 @@ export class IndexationScheduler extends AbstractScheduler {
|
|||
start += playlists.length
|
||||
|
||||
if (playlists.length !== 0) {
|
||||
await this.playlistIndexer.indexElements(playlists)
|
||||
this.playlistIndexer.addElementsToBulkIndex(playlists)
|
||||
|
||||
logger.debug('Indexed %d playlists from %s.', playlists.length, host)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue