Compare commits

...

5 Commits

Author SHA1 Message Date
Chocobozzz 65295a804a
Put some constants in config 2024-01-04 16:50:17 +01:00
Chocobozzz 491f936906
Increase bulk indexation time 2024-01-04 16:22:43 +01:00
Chocobozzz 9c43989b59
Fix bulk indexing 2024-01-04 15:37:00 +01:00
Chocobozzz 69e17e2777
Filter instance in bulk 2024-01-04 14:39:41 +01:00
Chocobozzz fde7007788
Try to have faster indexation 2024-01-04 14:20:20 +01:00
8 changed files with 91 additions and 24 deletions

View File

@ -54,3 +54,10 @@ api:
enabled: false
# Array of hosts
hosts: null
indexer:
# How many hosts in parallel to index
host_concurrency: 10
# How much time to wait before bulk indexing in Meilisearch data
bulk_indexation_interval_ms: 10000

View File

@ -29,6 +29,7 @@ instances-index:
- 'replay.jres.org'
- 'tube.nah.re'
- 'video.passageenseine.fr'
- 'peertube.luga.at'
api:
blacklist:

View File

@ -45,6 +45,10 @@ const CONFIG = {
ENABLED: config.get<boolean>('api.blacklist.enabled'),
HOSTS: config.get<string[]>('api.blacklist.hosts')
}
},
INDEXER: {
HOST_CONCURRENCY: config.get<number>('indexer.host_concurrency'),
BULK_INDEXATION_INTERVAL_MS: config.get<number>('indexer.bulk_indexation_interval_ms')
}
}
@ -70,7 +74,6 @@ const SCHEDULER_INTERVALS_MS = {
const INDEXER_COUNT = 20
const INDEXER_LIMIT = 500000
const INDEXER_HOST_CONCURRENCY = 3
const INDEXER_QUEUE_CONCURRENCY = 3
const REQUESTS = {
@ -100,7 +103,6 @@ export {
SORTABLE_COLUMNS,
INDEXER_QUEUE_CONCURRENCY,
SCHEDULER_INTERVALS_MS,
INDEXER_HOST_CONCURRENCY,
INDEXER_COUNT,
INDEXER_LIMIT,
REQUESTS

View File

@ -17,10 +17,12 @@ export class ChannelIndexer extends AbstractIndexer <IndexableChannel, DBChannel
}
async indexSpecificElement (host: string, name: string) {
await this.waitForBulkIndexation()
const channel = await getChannel(host, name)
logger.info('Indexing specific channel %s@%s.', name, host)
return this.indexElements([ channel ])
this.addElementsToBulkIndex([ channel ])
}
}

View File

@ -1,10 +1,11 @@
import { QueueObject, queue } from 'async'
import { inspect } from 'util'
import { logger } from '../../../helpers/logger'
import { INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
import { CONFIG, INDEXER_QUEUE_CONCURRENCY } from '../../../initializers/constants'
import { IndexableDoc } from '../../../types/indexable-doc.model'
import { client } from '../../../helpers/meilisearch'
import { buildInValuesArray } from '../../meilisearch/meilisearch-queries'
import { EnqueuedTask } from 'meilisearch'
// identifier could be an uuid, an handle or a url for example
export type QueueParam = { host: string, identifier: string }
@ -19,7 +20,11 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
protected readonly rankingRules: string[]
abstract indexSpecificElement (host: string, uuid: string): Promise<{ taskUid: number }>
private elementsToBulkIndex: T[] = []
private bulkIndexationTimer: any
private bulkProcessEnqueuedTask: EnqueuedTask
abstract indexSpecificElement (host: string, uuid: string): Promise<void>
constructor (
protected readonly indexName: string,
@ -27,8 +32,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
) {
this.indexQueue = queue<QueueParam, Error>(async (task, cb) => {
try {
const { taskUid } = await this.indexSpecificElement(task.host, task.identifier)
await client.index(this.indexName).waitForTask(taskUid, { timeOutMs: 1000 * 60 * 5 }) // 5 minutes timeout
await this.indexSpecificElement(task.host, task.identifier)
return cb()
} catch (err) {
@ -62,10 +66,7 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
}
}
scheduleIndexation (host: string, identifier: string) {
this.indexQueue.push({ identifier, host })
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
}
// ---------------------------------------------------------------------------
removeNotExisting (host: string, existingPrimaryKeys: Set<string>) {
return client.index(this.indexName).deleteDocuments({
@ -79,7 +80,47 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
})
}
async indexElements (elements: T[]) {
// ---------------------------------------------------------------------------
scheduleParallelIndexation (host: string, identifier: string) {
this.indexQueue.push({ identifier, host })
.catch(err => logger.error({ err: inspect(err) }, 'Cannot schedule indexation of %s for %s', identifier, host))
}
async waitForBulkIndexation () {
if (!this.bulkProcessEnqueuedTask) return
await this.waitForTask(this.bulkProcessEnqueuedTask.taskUid, 1000)
this.bulkProcessEnqueuedTask = undefined
}
addElementsToBulkIndex (elements: T[]) {
this.elementsToBulkIndex = this.elementsToBulkIndex.concat(elements)
this.scheduleBulkIndexationProcess()
}
private scheduleBulkIndexationProcess () {
if (this.bulkIndexationTimer) return
this.bulkIndexationTimer = setTimeout(async () => {
try {
const elements = this.elementsToBulkIndex
this.elementsToBulkIndex = []
logger.info(`Bulk indexing ${elements.length} elements in ${this.indexName}`)
this.bulkProcessEnqueuedTask = await this.indexElements(elements)
this.bulkIndexationTimer = undefined
} catch (err) {
logger.error({ err }, 'Cannot schedule bulk indexation')
}
}, CONFIG.INDEXER.BULK_INDEXATION_INTERVAL_MS)
}
private async indexElements (elements: T[]) {
const documents = elements.map(e => this.formatterFn(e))
const result = await client.index(this.indexName).updateDocuments(documents, { primaryKey: this.primaryKey.toString() })
@ -87,4 +128,10 @@ export abstract class AbstractIndexer <T extends IndexableDoc, DB> {
return result
}
// ---------------------------------------------------------------------------
private waitForTask (taskId: number, intervalMs?: number) {
return client.index(this.indexName).waitForTask(taskId, { timeOutMs: 1000 * 60 * 5, intervalMs })
}
}

View File

@ -52,10 +52,12 @@ export class VideoIndexer extends AbstractIndexer <IndexableVideo, DBVideo> {
}
async indexSpecificElement (host: string, uuid: string) {
await this.waitForBulkIndexation()
const video = await getVideo(host, uuid)
logger.info('Indexing specific video %s of %s.', uuid, host)
return this.indexElements([ video ])
this.addElementsToBulkIndex([ video ])
}
}

View File

@ -2,6 +2,7 @@ import { CONFIG } from '../../initializers/constants'
import { listIndexInstancesHost, getMajorInstanceVersion } from '../requests/instances-index'
import { client } from '../../helpers/meilisearch'
import { logger } from '../../helpers/logger'
import Bluebird from 'bluebird'
async function buildInstanceHosts () {
let indexHosts = await listIndexInstancesHost()
@ -14,15 +15,17 @@ async function buildInstanceHosts () {
indexHosts = indexHosts.filter(h => whitelistHosts.includes(h))
}
for (const indexHost of indexHosts) {
indexHosts = await Bluebird.filter(indexHosts, async indexHost => {
const instanceVersion = await getMajorInstanceVersion(indexHost)
if (instanceVersion < 4) {
logger.info(`Do not index ${indexHost} because the major version is too low (v${instanceVersion} < v4)`)
indexHosts = indexHosts.filter(h => h !== indexHost)
return false
}
}
return true
}, { concurrency: 10 })
const dbHosts = await listDBInstances()
const removedHosts = getRemovedHosts(dbHosts, indexHosts)

View File

@ -1,7 +1,7 @@
import Bluebird from 'bluebird'
import { inspect } from 'util'
import { logger } from '../../helpers/logger'
import { INDEXER_HOST_CONCURRENCY, INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { INDEXER_COUNT, INDEXER_LIMIT, SCHEDULER_INTERVALS_MS, CONFIG } from '../../initializers/constants'
import { IndexableVideo } from '../../types/video.model'
import { buildInstanceHosts } from '../meilisearch/meilisearch-instances'
import { ChannelIndexer } from '../indexers/channel-indexer'
@ -72,7 +72,7 @@ export class IndexationScheduler extends AbstractScheduler {
console.error(inspect(err, { depth: 10 }))
logger.warn({ err: inspect(err) }, 'Cannot index videos from %s.', host)
}
}, { concurrency: INDEXER_HOST_CONCURRENCY })
}, { concurrency: CONFIG.INDEXER.HOST_CONCURRENCY })
logger.info('Indexer ended.')
}
@ -88,6 +88,8 @@ export class IndexationScheduler extends AbstractScheduler {
logger.info('Adding video data from %s.', host)
do {
await this.videoIndexer.waitForBulkIndexation()
logger.debug('Getting video results from %s (from = %d).', host, start)
videos = await getVideos(host, start)
@ -98,16 +100,15 @@ export class IndexationScheduler extends AbstractScheduler {
if (videos.length !== 0) {
const videosFromDB = await getVideosUpdatedAt(videos.map(v => v.uuid))
await this.videoIndexer.indexElements(videos)
logger.debug('Indexed %d videos from %s.', videos.length, host)
logger.debug('Indexing %d videos from %s.', videos.length, host)
this.videoIndexer.addElementsToBulkIndex(videos)
// Fetch complete video foreach created video (to get tags) if needed
for (const video of videos) {
const videoDB = videosFromDB.find(v => v.uuid === video.uuid)
if (!videoDB || videoDB.updatedAt !== new Date(video.updatedAt).getTime()) {
this.videoIndexer.scheduleIndexation(host, video.uuid)
this.videoIndexer.scheduleParallelIndexation(host, video.uuid)
}
}
}
@ -123,7 +124,7 @@ export class IndexationScheduler extends AbstractScheduler {
logger.info('Added video data from %s.', host)
for (const c of channelsToSync) {
this.channelIndexer.scheduleIndexation(host, c)
this.channelIndexer.scheduleParallelIndexation(host, c)
}
logger.info('Removing non-existing channels and videos from ' + host)
@ -143,6 +144,8 @@ export class IndexationScheduler extends AbstractScheduler {
let start = 0
do {
await this.playlistIndexer.waitForBulkIndexation()
logger.debug('Getting playlist results from %s (from = %d, channelHandle = %s).', host, start, channelHandle)
playlists = await getPlaylistsOf(host, channelHandle, start)
@ -151,7 +154,7 @@ export class IndexationScheduler extends AbstractScheduler {
start += playlists.length
if (playlists.length !== 0) {
await this.playlistIndexer.indexElements(playlists)
this.playlistIndexer.addElementsToBulkIndex(playlists)
logger.debug('Indexed %d playlists from %s.', playlists.length, host)
}