From c2af79863206f2aa5c3714587c5ef54ff21a9a66 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 13 Feb 2020 16:06:52 +0100 Subject: [PATCH] Index videos --- config/default.yaml | 1 + config/test.yaml | 4 + package.json | 4 +- server.ts | 14 +- server/helpers/elastic-search.ts | 274 +++++++++++++++++++- server/helpers/requests.ts | 6 +- server/initializers/constants.ts | 13 +- server/lib/schedulers/abstract-scheduler.ts | 35 +++ server/lib/schedulers/videos-indexer.ts | 63 +++++ server/lib/videos-indexer.ts | 0 server/types/video.model.ts | 6 + tsconfig.json | 5 +- yarn.lock | 13 +- 13 files changed, 419 insertions(+), 19 deletions(-) create mode 100644 config/test.yaml create mode 100644 server/lib/schedulers/abstract-scheduler.ts create mode 100644 server/lib/schedulers/videos-indexer.ts delete mode 100644 server/lib/videos-indexer.ts create mode 100644 server/types/video.model.ts diff --git a/config/default.yaml b/config/default.yaml index 215041a..de58a74 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -12,6 +12,7 @@ elastic_search: port: 9200 indexes: videos: 'peertube-index-videos' + instances: 'peertube-index-instances' log: level: 'debug' # debug/info/warning/error diff --git a/config/test.yaml b/config/test.yaml new file mode 100644 index 0000000..50281b6 --- /dev/null +++ b/config/test.yaml @@ -0,0 +1,4 @@ +elastic_search: + indexes: + videos: 'peertube-index-videos-test1' + instances: 'peertube-index-instances-test1' diff --git a/package.json b/package.json index 79336a9..b9fd362 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "express": "^4.16.4", "express-validator": "^6.1.1", "js-yaml": "^3.12.1", + "lodash": "^4.17.15", "mkdirp": "^0.5.1", "morgan": "^1.9.1", "multer": "^1.4.1", @@ -47,6 +48,7 @@ "@types/body-parser": "^1.16.3", "@types/config": "^0.0.34", "@types/express": "^4.16.1", + "@types/lodash": "^4.14.149", "@types/mkdirp": "^0.5.1", "@types/morgan": "^1.7.32", "@types/multer": "^1.3.3", @@ -64,6 +66,6 @@ "eslint-plugin-promise": "^4.2.1", "eslint-plugin-standard": "^4.0.1", "tslint-config-standard": "^8.0.1", - "typescript": "^3.3.3" + "typescript": "^3.7.5" } } diff --git a/server.ts b/server.ts index dc81ffc..8f8a9fe 100644 --- a/server.ts +++ b/server.ts @@ -10,6 +10,8 @@ import * as morgan from 'morgan' import { apiRouter } from './server/controllers/api' import { logger } from './server/helpers/logger' import { API_VERSION, CONFIG } from './server/initializers/constants' +import { VideosIndexer } from './server/lib/schedulers/videos-indexer' +import { initVideosIndex } from './server/helpers/elastic-search' const app = express() @@ -51,6 +53,16 @@ app.use(function (err, req, res, next) { // ----------- Run ----------- -app.listen(CONFIG.LISTEN.PORT, () => { +app.listen(CONFIG.LISTEN.PORT, async () => { logger.info('Server listening on port %d', CONFIG.LISTEN.PORT) + + try { + await initVideosIndex() + } catch (err) { + logger.error('Cannot init videos index.', { err }) + process.exit(-1) + } + + VideosIndexer.Instance.enable() + VideosIndexer.Instance.execute() }) diff --git a/server/helpers/elastic-search.ts b/server/helpers/elastic-search.ts index f2e4042..44dacd6 100644 --- a/server/helpers/elastic-search.ts +++ b/server/helpers/elastic-search.ts @@ -1,15 +1,56 @@ import { Client } from '@elastic/elasticsearch' import { CONFIG } from '../initializers/constants' +import { IndexableVideo } from '../types/video.model' +import { Avatar } from '@shared/models/avatars/avatar.model' +import { flatMap } from 'lodash' -const client = new Client({ node: CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT }) +const client = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT }) -function indexVideo (body: any) { - return client.index({ +function initVideosIndex () { + return client.indices.create({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + settings: { + number_of_shards: 1, + number_of_replicas: 1 + }, + mappings: { + properties: buildVideosMapping() + } + } + }).catch(err => { + if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return + + throw err + }) +} + +function indexVideos (videos: IndexableVideo[]) { + const body = flatMap(videos, v => { + return [ + { + update: { + _id: v.elasticSearchId, + _index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS + } + }, + { + doc: formatVideo(v), + doc_as_upsert: true + } + ] + }) + + return client.bulk({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, body }) } +function refreshVideosIndex () { + return client.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS }) +} + async function queryVideos (query: any) { const res = await client.search({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, @@ -18,10 +59,231 @@ async function queryVideos (query: any) { } }) - return res.hits.hits + return res.body.hits.hits } export { - indexVideo, - queryVideos + indexVideos, + queryVideos, + refreshVideosIndex, + initVideosIndex +} + +// ############################################################################ + +function formatVideo (v: IndexableVideo) { + return { + uuid: v.uuid, + + createdAt: v.createdAt, + updatedAt: v.updatedAt, + publishedAt: v.publishedAt, + originallyPublishedAt: v.originallyPublishedAt, + + category: { + id: v.category.id, + label: v.category.label + }, + licence: { + id: v.licence.id, + label: v.licence.label + }, + language: { + id: v.language.id, + label: v.language.label + }, + privacy: { + id: v.privacy.id, + label: v.privacy.label + }, + + name: v.name, + description: v.description, + duration: v.duration, + thumbnailPath: v.thumbnailPath, + previewPath: v.previewPath, + embedPath: v.embedPath, + + views: v.views, + likes: v.likes, + dislikes: v.dislikes, + + nsfw: v.nsfw, + host: v.host, + + account: { + name: v.account.name, + displayName: v.account.displayName, + url: v.account.url, + host: v.account.host, + + avatar: formatAvatar(v.account) + }, + + channel: { + name: v.channel.name, + displayName: v.channel.displayName, + url: v.channel.url, + host: v.channel.host, + + avatar: formatAvatar(v.channel) + } + } +} + +function formatAvatar (obj: { avatar?: Avatar }) { + if (!obj.avatar) return null + + return { + path: obj.avatar.path, + createdAt: obj.avatar.createdAt, + updatedAt: obj.avatar.updatedAt + } +} + +function buildChannelOrAccountMapping () { + return { + name: { + type: 'text', + fields: { + raw: { + type: 'keyword' + } + } + }, + displayName: { + type: 'text' + }, + url: { + type: 'keyword' + }, + host: { + type: 'keyword' + }, + + avatar: { + properties: { + path: { + type: 'keyword' + }, + createdAt: { + type: 'date' + }, + updatedAt: { + type: 'date' + } + } + } + } +} + +function buildVideosMapping () { + return { + uuid: { + type: 'keyword' + }, + createdAt: { + type: 'date' + }, + updatedAt: { + type: 'date' + }, + publishedAt: { + type: 'date' + }, + originallyPublishedAt: { + type: 'date' + }, + + category: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + licence: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + language: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + privacy: { + properties: { + id: { + type: 'keyword' + }, + label: { + type: 'text' + } + } + }, + + name: { + type: 'text' + }, + + description: { + type: 'text' + }, + + duration: { + type: 'long' + }, + + thumbnailPath: { + type: 'keyword' + }, + previewPath: { + type: 'keyword' + }, + embedPath: { + type: 'keyword' + }, + + views: { + type: 'long' + }, + likes: { + type: 'long' + }, + dislikes: { + type: 'long' + }, + nsfw: { + type: 'boolean' + }, + + host: { + type: 'keyword' + }, + + account: { + properties: buildChannelOrAccountMapping() + }, + + channel: { + properties: buildChannelOrAccountMapping() + } + } } diff --git a/server/helpers/requests.ts b/server/helpers/requests.ts index 6e1e4a6..d920f62 100644 --- a/server/helpers/requests.ts +++ b/server/helpers/requests.ts @@ -1,10 +1,10 @@ import * as Bluebird from 'bluebird' import * as request from 'request' -function doRequest ( +function doRequest ( requestOptions: request.CoreOptions & request.UriOptions -): Bluebird<{ response: request.RequestResponse, body: any }> { - return new Bluebird<{ response: request.RequestResponse, body: any }>((res, rej) => { +): Bluebird<{ response: request.RequestResponse, body: T }> { + return new Bluebird<{ response: request.RequestResponse, body: T }>((res, rej) => { request(requestOptions, (err, response, body) => err ? rej(err) : res({ response, body })) }) } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 13ec162..b707ab5 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -27,10 +27,16 @@ const SORTABLE_COLUMNS = { const PAGINATION_COUNT_DEFAULT = 20 -let SCHEDULER_INTERVAL = 60000 * 60 * 12 // 12 hours +const SCHEDULER_INTERVALS_MS = { + videosIndexer: 60000 * 60 * 12 // 12 hours +} + +const INDEXER_COUNT = { + VIDEOS: 1 +} if (isTestInstance()) { - SCHEDULER_INTERVAL = 10000 + SCHEDULER_INTERVALS_MS.videosIndexer = 10000 } export { @@ -38,5 +44,6 @@ export { API_VERSION, PAGINATION_COUNT_DEFAULT, SORTABLE_COLUMNS, - SCHEDULER_INTERVAL + SCHEDULER_INTERVALS_MS, + INDEXER_COUNT } diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts new file mode 100644 index 0000000..0e60889 --- /dev/null +++ b/server/lib/schedulers/abstract-scheduler.ts @@ -0,0 +1,35 @@ +import { logger } from '../../helpers/logger' +import * as Bluebird from 'bluebird' + +export abstract class AbstractScheduler { + + protected abstract schedulerIntervalMs: number + + private interval: NodeJS.Timer + private isRunning = false + + enable () { + if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') + + this.interval = setInterval(() => this.execute(), this.schedulerIntervalMs) + } + + disable () { + clearInterval(this.interval) + } + + async execute () { + if (this.isRunning === true) return + this.isRunning = true + + try { + await this.internalExecute() + } catch (err) { + logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) + } finally { + this.isRunning = false + } + } + + protected abstract internalExecute (): Promise | Bluebird +} diff --git a/server/lib/schedulers/videos-indexer.ts b/server/lib/schedulers/videos-indexer.ts new file mode 100644 index 0000000..436ff60 --- /dev/null +++ b/server/lib/schedulers/videos-indexer.ts @@ -0,0 +1,63 @@ +import { AbstractScheduler } from './abstract-scheduler' +import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { doRequest } from '../../helpers/requests' +import { logger } from '../../helpers/logger' +import { ResultList } from '../../../PeerTube/shared/models/result-list.model' +import { Video } from '../../../PeerTube/shared/models/videos/video.model' +import { indexVideos, refreshVideosIndex } from '../../helpers/elastic-search' +import { IndexableVideo } from '../../types/video.model' +import { inspect } from 'util' + +export class VideosIndexer extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer + + private constructor () { + super() + } + + protected async internalExecute () { + return this.indexVideos() + } + + private async indexVideos () { + const instances = [ 'peertube.cpy.re' ] + + for (const instance of instances) { + try { + const videos = await this.getVideos(instance) + + await indexVideos(videos) + + logger.info('Added video data from %s.', instance) + } catch (err) { + console.error(inspect(err, { depth: 10 })) + logger.warn('Cannot index videos from %s.', instance, { err }) + } + } + + await refreshVideosIndex() + } + + private async getVideos (host: string): Promise { + const url = 'https://' + host + '/api/v1/videos' + + const res = await doRequest>({ + uri: url, + qs: { + start: 0, + filter: 'local', + count: INDEXER_COUNT.VIDEOS + }, + json: true + }) + + return res.body.data.map(v => Object.assign(v, { elasticSearchId: host + v.id, host })) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/videos-indexer.ts b/server/lib/videos-indexer.ts deleted file mode 100644 index e69de29..0000000 diff --git a/server/types/video.model.ts b/server/types/video.model.ts new file mode 100644 index 0000000..8e0c792 --- /dev/null +++ b/server/types/video.model.ts @@ -0,0 +1,6 @@ +import { Video } from '@shared/models/videos/video.model' + +export interface IndexableVideo extends Video { + elasticSearchId: string + host: string +} diff --git a/tsconfig.json b/tsconfig.json index fd5fe5d..edb9d4a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,7 +16,10 @@ "types": [ "node" ], - "baseUrl": "." + "baseUrl": ".", + "paths": { + "@shared/*": [ "PeerTube/shared/*" ] + } }, "exclude": [ "node_modules", diff --git a/yarn.lock b/yarn.lock index bf94620..b39a00b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -116,6 +116,11 @@ resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.136.tgz#413e85089046b865d960c9ff1d400e04c31ab60f" integrity sha512-0GJhzBdvsW2RUccNHOBkabI8HZVdOXmXbXhuKlDEd5Vv12P7oAVGfomGp3Ne21o5D/qu1WmthlNKFaoZJJeErA== +"@types/lodash@^4.14.149": + version "4.14.149" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.149.tgz#1342d63d948c6062838fbf961012f74d4e638440" + integrity sha512-ijGqzZt/b7BfzcK9vTrS6MFljQRPn5BFWOx8oE0GYxribu6uV+aA9zZuXI1zc/etK9E8nrgdoF2+LgUw7+9tJQ== + "@types/mime@*": version "2.0.1" resolved "https://registry.yarnpkg.com/@types/mime/-/mime-2.0.1.tgz#dc488842312a7f075149312905b5e3c0b054c79d" @@ -2667,10 +2672,10 @@ typedarray@^0.0.6: resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777" integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c= -typescript@^3.3.3: - version "3.5.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977" - integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g== +typescript@^3.7.5: + version "3.7.5" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.7.5.tgz#0692e21f65fd4108b9330238aac11dd2e177a1ae" + integrity sha512-/P5lkRXkWHNAbcJIiHPfRoKqyd7bsyCma1hZNUGfn20qm64T6ZBlrzprymeu918H+mB/0rIg2gGK/BXkhhYgBw== unpipe@1.0.0, unpipe@~1.0.0: version "1.0.0"