From 9dc427259d17ff3fe4d94a08e357e34206b5e6a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 14 Feb 2020 14:09:31 +0100 Subject: [PATCH] Index instances --- .eslintrc.json | 3 +- config/default.yaml | 1 - config/test.yaml | 1 - server.ts | 2 +- server/helpers/elastic-search.ts | 285 +----------------- server/initializers/constants.ts | 4 +- server/lib/elastic-search-videos.ts | 310 ++++++++++++++++++++ server/lib/instances-index.ts | 21 ++ server/lib/schedulers/abstract-scheduler.ts | 3 +- server/lib/schedulers/videos-indexer.ts | 52 +++- 10 files changed, 383 insertions(+), 299 deletions(-) create mode 100644 server/lib/elastic-search-videos.ts create mode 100644 server/lib/instances-index.ts diff --git a/.eslintrc.json b/.eslintrc.json index 4cf4405..0d14c82 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -82,8 +82,7 @@ ], "parserOptions": { "project": [ - "./tsconfig.json", - "./server/tools/tsconfig.json" + "./tsconfig.json" ] } } diff --git a/config/default.yaml b/config/default.yaml index de58a74..215041a 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -12,7 +12,6 @@ elastic_search: port: 9200 indexes: videos: 'peertube-index-videos' - instances: 'peertube-index-instances' log: level: 'debug' # debug/info/warning/error diff --git a/config/test.yaml b/config/test.yaml index 50281b6..0141630 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -1,4 +1,3 @@ elastic_search: indexes: videos: 'peertube-index-videos-test1' - instances: 'peertube-index-instances-test1' diff --git a/server.ts b/server.ts index 8f8a9fe..e200145 100644 --- a/server.ts +++ b/server.ts @@ -11,7 +11,7 @@ import { apiRouter } from './server/controllers/api' import { logger } from './server/helpers/logger' import { API_VERSION, CONFIG } from './server/initializers/constants' import { VideosIndexer } from './server/lib/schedulers/videos-indexer' -import { initVideosIndex } from './server/helpers/elastic-search' +import { initVideosIndex } from './server/lib/elastic-search-videos' const app = express() diff --git a/server/helpers/elastic-search.ts b/server/helpers/elastic-search.ts index 44dacd6..b17c913 100644 --- a/server/helpers/elastic-search.ts +++ b/server/helpers/elastic-search.ts @@ -1,289 +1,8 @@ import { Client } from '@elastic/elasticsearch' import { CONFIG } from '../initializers/constants' -import { IndexableVideo } from '../types/video.model' -import { Avatar } from '@shared/models/avatars/avatar.model' -import { flatMap } from 'lodash' -const client = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT }) - -function initVideosIndex () { - return client.indices.create({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body: { - settings: { - number_of_shards: 1, - number_of_replicas: 1 - }, - mappings: { - properties: buildVideosMapping() - } - } - }).catch(err => { - if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return - - throw err - }) -} - -function indexVideos (videos: IndexableVideo[]) { - const body = flatMap(videos, v => { - return [ - { - update: { - _id: v.elasticSearchId, - _index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS - } - }, - { - doc: formatVideo(v), - doc_as_upsert: true - } - ] - }) - - return client.bulk({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body - }) -} - -function refreshVideosIndex () { - return client.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS }) -} - -async function queryVideos (query: any) { - const res = await client.search({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body: { - query - } - }) - - return res.body.hits.hits -} +const elasticSearch = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT }) export { - indexVideos, - queryVideos, - refreshVideosIndex, - initVideosIndex -} - -// ############################################################################ - -function formatVideo (v: IndexableVideo) { - return { - uuid: v.uuid, - - createdAt: v.createdAt, - updatedAt: v.updatedAt, - publishedAt: v.publishedAt, - originallyPublishedAt: v.originallyPublishedAt, - - category: { - id: v.category.id, - label: v.category.label - }, - licence: { - id: v.licence.id, - label: v.licence.label - }, - language: { - id: v.language.id, - label: v.language.label - }, - privacy: { - id: v.privacy.id, - label: v.privacy.label - }, - - name: v.name, - description: v.description, - duration: v.duration, - thumbnailPath: v.thumbnailPath, - previewPath: v.previewPath, - embedPath: v.embedPath, - - views: v.views, - likes: v.likes, - dislikes: v.dislikes, - - nsfw: v.nsfw, - host: v.host, - - account: { - name: v.account.name, - displayName: v.account.displayName, - url: v.account.url, - host: v.account.host, - - avatar: formatAvatar(v.account) - }, - - channel: { - name: v.channel.name, - displayName: v.channel.displayName, - url: v.channel.url, - host: v.channel.host, - - avatar: formatAvatar(v.channel) - } - } -} - -function formatAvatar (obj: { avatar?: Avatar }) { - if (!obj.avatar) return null - - return { - path: obj.avatar.path, - createdAt: obj.avatar.createdAt, - updatedAt: obj.avatar.updatedAt - } -} - -function buildChannelOrAccountMapping () { - return { - name: { - type: 'text', - fields: { - raw: { - type: 'keyword' - } - } - }, - displayName: { - type: 'text' - }, - url: { - type: 'keyword' - }, - host: { - type: 'keyword' - }, - - avatar: { - properties: { - path: { - type: 'keyword' - }, - createdAt: { - type: 'date' - }, - updatedAt: { - type: 'date' - } - } - } - } -} - -function buildVideosMapping () { - return { - uuid: { - type: 'keyword' - }, - createdAt: { - type: 'date' - }, - updatedAt: { - type: 'date' - }, - publishedAt: { - type: 'date' - }, - originallyPublishedAt: { - type: 'date' - }, - - category: { - properties: { - id: { - type: 'keyword' - }, - label: { - type: 'text' - } - } - }, - - licence: { - properties: { - id: { - type: 'keyword' - }, - label: { - type: 'text' - } - } - }, - - language: { - properties: { - id: { - type: 'keyword' - }, - label: { - type: 'text' - } - } - }, - - privacy: { - properties: { - id: { - type: 'keyword' - }, - label: { - type: 'text' - } - } - }, - - name: { - type: 'text' - }, - - description: { - type: 'text' - }, - - duration: { - type: 'long' - }, - - thumbnailPath: { - type: 'keyword' - }, - previewPath: { - type: 'keyword' - }, - embedPath: { - type: 'keyword' - }, - - views: { - type: 'long' - }, - likes: { - type: 'long' - }, - dislikes: { - type: 'long' - }, - nsfw: { - type: 'boolean' - }, - - host: { - type: 'keyword' - }, - - account: { - properties: buildChannelOrAccountMapping() - }, - - channel: { - properties: buildChannelOrAccountMapping() - } - } + elasticSearch } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index b707ab5..53a3322 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -17,7 +17,7 @@ const CONFIG = { LOG: { LEVEL: config.get('log.level') }, - INSTANCES_URL: { + INSTANCES_INDEX: { URL: config.get('instances-index.url') } } @@ -32,7 +32,7 @@ const SCHEDULER_INTERVALS_MS = { } const INDEXER_COUNT = { - VIDEOS: 1 + VIDEOS: 10 } if (isTestInstance()) { diff --git a/server/lib/elastic-search-videos.ts b/server/lib/elastic-search-videos.ts new file mode 100644 index 0000000..17b1d25 --- /dev/null +++ b/server/lib/elastic-search-videos.ts @@ -0,0 +1,310 @@ +import { CONFIG } from '../initializers/constants' +import { IndexableVideo } from '../types/video.model' +import { flatMap } from 'lodash' +import { Avatar } from '@shared/models' +import { elasticSearch } from '../helpers/elastic-search' + +function initVideosIndex () { + return elasticSearch.indices.create({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + settings: { + number_of_shards: 1, + number_of_replicas: 1 + }, + mappings: { + properties: buildVideosMapping() + } + } + }).catch(err => { + if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return + + throw err + }) +} + +function indexVideos (videos: IndexableVideo[]) { + const body = flatMap(videos, v => { + return [ + { + update: { + _id: v.elasticSearchId, + _index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS + } + }, + { + doc: formatVideo(v), + doc_as_upsert: true + } + ] + }) + + return elasticSearch.bulk({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body + }) +} + +function refreshVideosIndex () { + return elasticSearch.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS }) +} + +async function listIndexInstances () { + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + size: 0, + aggs: { + hosts: { + terms: { + field: 'host' + } + } + } + } + }) + + return res.body.aggregations.hosts.buckets.map(b => b.key) +} + +async function queryVideos (query: any) { + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + query + } + }) + + return res.body.hits.hits +} + +export { + indexVideos, + queryVideos, + refreshVideosIndex, + initVideosIndex, + listIndexInstances +} + +// ############################################################################ + +function formatVideo (v: IndexableVideo) { + return { + uuid: v.uuid, + + indexedAt: new Date(), + createdAt: v.createdAt, + updatedAt: v.updatedAt, + publishedAt: v.publishedAt, + originallyPublishedAt: v.originallyPublishedAt, + + category: { + id: v.category.id, + label: v.category.label + }, + licence: { + id: v.licence.id, + label: v.licence.label + }, + language: { + id: v.language.id, + label: v.language.label + }, + privacy: { + id: v.privacy.id, + label: v.privacy.label + }, + + name: v.name, + description: v.description, + duration: v.duration, + thumbnailPath: v.thumbnailPath, + previewPath: v.previewPath, + embedPath: v.embedPath, + + views: v.views, + likes: v.likes, + dislikes: v.dislikes, + + nsfw: v.nsfw, + host: v.host, + + account: { + name: v.account.name, + displayName: v.account.displayName, + url: v.account.url, + host: v.account.host, + + avatar: formatAvatar(v.account) + }, + + channel: { + name: v.channel.name, + displayName: v.channel.displayName, + url: v.channel.url, + host: v.channel.host, + + avatar: formatAvatar(v.channel) + } + } +} + +function formatAvatar (obj: { avatar?: Avatar }) { + if (!obj.avatar) return null + + return { + path: obj.avatar.path, + createdAt: obj.avatar.createdAt, + updatedAt: obj.avatar.updatedAt + } +} + +function buildChannelOrAccountMapping () { + return { + name: { + type: 'text', + fields: { + raw: { + type: 'keyword' + } + } + }, + displayName: { + type: 'text' + }, + url: { + type: 'keyword' + }, + host: { + type: 'keyword' + }, + + avatar: { + properties: { + path: { + type: 'keyword' + }, + createdAt: { + type: 'date' + }, + updatedAt: { + type: 'date' + } + } + } + } +} + +function buildVideosMapping () { + return { + uuid: { + type: 'keyword' + }, + createdAt: { + type: 'date' + }, + updatedAt: { + type: 'date' + }, + publishedAt: { + type: 'date' + }, + originallyPublishedAt: { + type: 'date' + }, + indexedAt: { + type: 'date' + }, + + category: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + licence: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + language: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + privacy: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + name: { + type: 'text' + }, + + description: { + type: 'text' + }, + + duration: { + type: 'long' + }, + + thumbnailPath: { + type: 'keyword' + }, + previewPath: { + type: 'keyword' + }, + embedPath: { + type: 'keyword' + }, + + views: { + type: 'long' + }, + likes: { + type: 'long' + }, + dislikes: { + type: 'long' + }, + nsfw: { + type: 'boolean' + }, + + host: { + type: 'keyword' + }, + + account: { + properties: buildChannelOrAccountMapping() + }, + + channel: { + properties: buildChannelOrAccountMapping() + } + } +} diff --git a/server/lib/instances-index.ts b/server/lib/instances-index.ts new file mode 100644 index 0000000..34d3155 --- /dev/null +++ b/server/lib/instances-index.ts @@ -0,0 +1,21 @@ +import { CONFIG } from '../initializers/constants' +import { doRequest } from '../helpers/requests' + +async function listIndexInstancesHost () { + const uri = CONFIG.INSTANCES_INDEX.URL + '/api/v1/instances/hosts' + + const qs = { count: 5000 } + + const { body } = await doRequest({ uri, qs, json: true }) + + return body.data.map(o => o.host as string) +} + +function getRemovedHosts (dbHosts: string[], indexHosts: string[]) { + return dbHosts.filter(dbHost => indexHosts.includes(dbHost) === false) +} + +export { + getRemovedHosts, + listIndexInstancesHost +} diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts index 0e60889..c7fb5f0 100644 --- a/server/lib/schedulers/abstract-scheduler.ts +++ b/server/lib/schedulers/abstract-scheduler.ts @@ -1,5 +1,6 @@ import { logger } from '../../helpers/logger' import * as Bluebird from 'bluebird' +import { inspect } from 'util' export abstract class AbstractScheduler { @@ -25,7 +26,7 @@ export abstract class AbstractScheduler { try { await this.internalExecute() } catch (err) { - logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) + logger.error('Cannot execute %s scheduler.', this.constructor.name, { err: inspect(err) }) } finally { this.isRunning = false } diff --git a/server/lib/schedulers/videos-indexer.ts b/server/lib/schedulers/videos-indexer.ts index 436ff60..0ec4d86 100644 --- a/server/lib/schedulers/videos-indexer.ts +++ b/server/lib/schedulers/videos-indexer.ts @@ -1,12 +1,14 @@ import { AbstractScheduler } from './abstract-scheduler' -import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { CONFIG, INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { doRequest } from '../../helpers/requests' import { logger } from '../../helpers/logger' import { ResultList } from '../../../PeerTube/shared/models/result-list.model' import { Video } from '../../../PeerTube/shared/models/videos/video.model' -import { indexVideos, refreshVideosIndex } from '../../helpers/elastic-search' +import { indexVideos, listIndexInstances, refreshVideosIndex } from '../elastic-search-videos' import { IndexableVideo } from '../../types/video.model' import { inspect } from 'util' +import { getRemovedHosts, listIndexInstancesHost } from '../instances-index' +import { elasticSearch } from '../../helpers/elastic-search' export class VideosIndexer extends AbstractScheduler { @@ -23,13 +25,27 @@ export class VideosIndexer extends AbstractScheduler { } private async indexVideos () { - const instances = [ 'peertube.cpy.re' ] + const dbHosts = await listIndexInstances() + const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re') - for (const instance of instances) { + const hostsToRemove = getRemovedHosts(dbHosts, indexHosts) + await this.removeVideosFromHosts(hostsToRemove) + + for (const instance of indexHosts) { try { - const videos = await this.getVideos(instance) + let videos: IndexableVideo[] = [] + let start = 0 - await indexVideos(videos) + do { + videos = await this.getVideos(instance, start) + start += videos.length + + logger.debug('Getting %d results from %s (from = %d).', videos.length, instance, start) + + if (videos.length !== 0) { + await indexVideos(videos) + } + } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) logger.info('Added video data from %s.', instance) } catch (err) { @@ -41,14 +57,15 @@ export class VideosIndexer extends AbstractScheduler { await refreshVideosIndex() } - private async getVideos (host: string): Promise { + private async getVideos (host: string, start: number): Promise { const url = 'https://' + host + '/api/v1/videos' const res = await doRequest>({ uri: url, qs: { - start: 0, + start, filter: 'local', + skipCount: true, count: INDEXER_COUNT.VIDEOS }, json: true @@ -57,6 +74,25 @@ export class VideosIndexer extends AbstractScheduler { return res.body.data.map(v => Object.assign(v, { elasticSearchId: host + v.id, host })) } + private removeVideosFromHosts (hosts: string[]) { + if (hosts.length === 0) return + + logger.info('Will remove videos from hosts.', { hosts }) + + const should = hosts.map(host => ({ term: { host } })) + + return elasticSearch.delete_by_query({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + query: { + bool: { + should + } + } + } + }) + } + static get Instance () { return this.instance || (this.instance = new this()) }