From 223c18c7c0473f97d084bc62b77eb2e7d1ef66ff Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 19 Feb 2020 15:39:35 +0100 Subject: [PATCH] Index and serve channels --- config/default.yaml | 1 + config/test.yaml | 1 + server.ts | 15 +- server/controllers/api/index.ts | 6 +- server/controllers/api/search-channels.ts | 34 ++++ server/controllers/api/search-videos.ts | 6 +- server/helpers/elastic-search.ts | 108 ++++++++++- server/initializers/constants.ts | 6 +- server/lib/elastic-search-avatar.ts | 43 +++++ server/lib/elastic-search-channels.ts | 213 ++++++++++++++++++++++ server/lib/elastic-search-instances.ts | 35 ++++ server/lib/elastic-search-videos.ts | 150 +++------------ server/lib/peertube-instance.ts | 60 ++++++ server/lib/schedulers/videos-indexer.ts | 114 +++++------- server/middlewares/validators/search.ts | 13 ++ server/middlewares/validators/sort.ts | 5 +- server/types/channel-search.model.ts | 5 + server/types/channel.model.ts | 16 ++ server/types/elastic-search.model.ts | 4 + server/types/video.model.ts | 6 +- 20 files changed, 623 insertions(+), 218 deletions(-) create mode 100644 server/controllers/api/search-channels.ts create mode 100644 server/lib/elastic-search-avatar.ts create mode 100644 server/lib/elastic-search-channels.ts create mode 100644 server/lib/elastic-search-instances.ts create mode 100644 server/lib/peertube-instance.ts create mode 100644 server/types/channel-search.model.ts create mode 100644 server/types/channel.model.ts create mode 100644 server/types/elastic-search.model.ts diff --git a/config/default.yaml b/config/default.yaml index 215041a..af09c55 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -12,6 +12,7 @@ elastic_search: port: 9200 indexes: videos: 'peertube-index-videos' + channels: 'peertube-index-channels' log: level: 'debug' # debug/info/warning/error diff --git a/config/test.yaml b/config/test.yaml index 0141630..18e0211 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -1,3 +1,4 @@ elastic_search: indexes: videos: 'peertube-index-videos-test1' + channels: 'peertube-index-channels-test1' diff --git a/server.ts b/server.ts index e200145..3680f49 100644 --- a/server.ts +++ b/server.ts @@ -1,8 +1,4 @@ import { isTestInstance } from './server/helpers/core-utils' -if (isTestInstance()) { - require('source-map-support').install() -} - import * as bodyParser from 'body-parser' import * as express from 'express' import * as cors from 'cors' @@ -12,6 +8,11 @@ import { logger } from './server/helpers/logger' import { API_VERSION, CONFIG } from './server/initializers/constants' import { VideosIndexer } from './server/lib/schedulers/videos-indexer' import { initVideosIndex } from './server/lib/elastic-search-videos' +import { initChannelsIndex } from './server/lib/elastic-search-channels' + +if (isTestInstance()) { + require('source-map-support').install() +} const app = express() @@ -57,7 +58,10 @@ app.listen(CONFIG.LISTEN.PORT, async () => { logger.info('Server listening on port %d', CONFIG.LISTEN.PORT) try { - await initVideosIndex() + await Promise.all([ + initVideosIndex(), + initChannelsIndex() + ]) } catch (err) { logger.error('Cannot init videos index.', { err }) process.exit(-1) @@ -65,4 +69,5 @@ app.listen(CONFIG.LISTEN.PORT, async () => { VideosIndexer.Instance.enable() VideosIndexer.Instance.execute() + .catch(err => logger.error('Cannot run video indexer', { err })) }) diff --git a/server/controllers/api/index.ts b/server/controllers/api/index.ts index 646b58e..a9a297f 100644 --- a/server/controllers/api/index.ts +++ b/server/controllers/api/index.ts @@ -1,10 +1,12 @@ import * as express from 'express' import { badRequest } from '../../helpers/utils' -import { searchRouter } from './search-videos' +import { searchVideosRouter } from './search-videos' +import { searchChannelsRouter } from './search-channels' const apiRouter = express.Router() -apiRouter.use('/', searchRouter) +apiRouter.use('/', searchVideosRouter) +apiRouter.use('/', searchChannelsRouter) apiRouter.use('/ping', pong) apiRouter.use('/*', badRequest) diff --git a/server/controllers/api/search-channels.ts b/server/controllers/api/search-channels.ts new file mode 100644 index 0000000..0867b5b --- /dev/null +++ b/server/controllers/api/search-channels.ts @@ -0,0 +1,34 @@ +import * as express from 'express' +import { paginationValidator } from '../../middlewares/validators/pagination' +import { setDefaultPagination } from '../../middlewares/pagination' +import { asyncMiddleware } from '../../middlewares/async' +import { channelsSearchSortValidator } from '../../middlewares/validators/sort' +import { videoChannelsSearchValidator } from '../../middlewares/validators/search' +import { setDefaultSearchSort } from '../../middlewares/sort' +import { formatChannelForAPI, queryChannels } from '../../lib/elastic-search-channels' + +const searchChannelsRouter = express.Router() + +searchChannelsRouter.get('/search/video-channels', + paginationValidator, + setDefaultPagination, + channelsSearchSortValidator, + setDefaultSearchSort, + videoChannelsSearchValidator, + asyncMiddleware(searchChannels) +) + +// --------------------------------------------------------------------------- + +export { searchChannelsRouter } + +// --------------------------------------------------------------------------- + +async function searchChannels (req: express.Request, res: express.Response) { + const resultList = await queryChannels(req.query) + + return res.json({ + total: resultList.total, + data: resultList.data.map(v => formatChannelForAPI(v, req.query.fromHost)) + }) +} diff --git a/server/controllers/api/search-videos.ts b/server/controllers/api/search-videos.ts index 52b36b3..292ed37 100644 --- a/server/controllers/api/search-videos.ts +++ b/server/controllers/api/search-videos.ts @@ -7,9 +7,9 @@ import { videosSearchSortValidator } from '../../middlewares/validators/sort' import { commonVideosFiltersValidator, videosSearchValidator } from '../../middlewares/validators/search' import { setDefaultSearchSort } from '../../middlewares/sort' -const searchRouter = express.Router() +const searchVideosRouter = express.Router() -searchRouter.get('/search/videos', +searchVideosRouter.get('/search/videos', paginationValidator, setDefaultPagination, videosSearchSortValidator, @@ -21,7 +21,7 @@ searchRouter.get('/search/videos', // --------------------------------------------------------------------------- -export { searchRouter } +export { searchVideosRouter } // --------------------------------------------------------------------------- diff --git a/server/helpers/elastic-search.ts b/server/helpers/elastic-search.ts index 24c9b38..b36d10b 100644 --- a/server/helpers/elastic-search.ts +++ b/server/helpers/elastic-search.ts @@ -1,24 +1,120 @@ -import { Client } from '@elastic/elasticsearch' +import { ApiResponse, Client } from '@elastic/elasticsearch' import { CONFIG } from '../initializers/constants' +import { logger } from './logger' +import { flatMap } from 'lodash' +import { IndexableDoc } from '../types/elastic-search.model' const elasticSearch = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT }) function buildSort (value: string) { - let field: string + let sortField: string let direction: 'asc' | 'desc' if (value.substring(0, 1) === '-') { direction = 'desc' - field = value.substring(1) + sortField = value.substring(1) } else { direction = 'asc' - field = value + sortField = value } - return { direction, field } + const field = sortField === 'match' + ? '_score' + : sortField + + return [ + { + [field]: { order: direction } + } + ] +} + +function buildIndex (name: string, mapping: object) { + logger.info('Initialize %s Elastic Search index.', name) + + return elasticSearch.indices.create({ + index: name, + body: { + settings: { + number_of_shards: 1, + number_of_replicas: 1 + }, + mappings: { + properties: mapping + } + } + }).catch(err => { + if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return + + throw err + }) +} + +async function indexDocuments (options: { + objects: T[] + formatter: (o: T) => any + replace: boolean + index: string +}) { + const { objects, formatter, replace, index } = options + + const elIdIndex: { [elId: string]: T } = {} + + for (const object of objects) { + elIdIndex[object.elasticSearchId] = object + } + + const method = replace ? 'index' : 'update' + + const body = flatMap(objects, v => { + const doc = formatter(v) + + const options = replace + ? doc + : { doc, doc_as_upsert: true } + + return [ + { + [method]: { + _id: v.elasticSearchId, + _index: index + } + }, + options + ] + }) + + const result = await elasticSearch.bulk({ + index, + body + }) + + const resultBody = result.body + + if (resultBody.errors === true) { + const msg = 'Cannot insert data in elastic search.' + logger.error(msg, { err: resultBody }) + throw new Error(msg) + } + + const created: T[] = result.body.items + .map(i => i[method]) + .filter(i => i.result === 'created') + .map(i => elIdIndex[i._id]) + + return { created } +} + +function extractQueryResult (result: ApiResponse) { + const hits = result.body.hits + + return { total: hits.total.value, data: hits.hits.map(h => h._source) } } export { elasticSearch, - buildSort + indexDocuments, + buildSort, + extractQueryResult, + buildIndex } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index a2a0289..07bf574 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -11,7 +11,8 @@ const CONFIG = { HOSTNAME: config.get('elastic_search.hostname'), PORT: config.get('elastic_search.port'), INDEXES: { - VIDEOS: config.get('elastic_search.indexes.videos') + VIDEOS: config.get('elastic_search.indexes.videos'), + CHANNELS: config.get('elastic_search.indexes.channels') } }, LOG: { @@ -23,7 +24,8 @@ const CONFIG = { } const SORTABLE_COLUMNS = { - VIDEOS_SEARCH: [ 'name', 'duration', 'createdAt', 'publishedAt', 'views', 'likes', 'match' ] + VIDEOS_SEARCH: [ 'name', 'duration', 'createdAt', 'publishedAt', 'views', 'likes', 'match' ], + CHANNELS_SEARCH: [ 'match', 'displayName', 'createdAt' ] } const PAGINATION_COUNT_DEFAULT = 20 diff --git a/server/lib/elastic-search-avatar.ts b/server/lib/elastic-search-avatar.ts new file mode 100644 index 0000000..18d8c97 --- /dev/null +++ b/server/lib/elastic-search-avatar.ts @@ -0,0 +1,43 @@ +import { Avatar } from '@shared/models' + +function formatAvatarForAPI (obj: { avatar?: Avatar }) { + if (!obj.avatar) return null + + return { + path: obj.avatar.path, + createdAt: obj.avatar.createdAt, + updatedAt: obj.avatar.updatedAt + } +} + +function formatAvatarForDB (obj: { avatar?: Avatar }) { + if (!obj.avatar) return null + + return { + path: obj.avatar.path, + createdAt: obj.avatar.createdAt, + updatedAt: obj.avatar.updatedAt + } +} + +function buildAvatarMapping () { + return { + path: { + type: 'keyword' + }, + createdAt: { + type: 'date', + format: 'date_optional_time' + }, + updatedAt: { + type: 'date', + format: 'date_optional_time' + } + } +} + +export { + formatAvatarForAPI, + formatAvatarForDB, + buildAvatarMapping +} diff --git a/server/lib/elastic-search-channels.ts b/server/lib/elastic-search-channels.ts new file mode 100644 index 0000000..fd37174 --- /dev/null +++ b/server/lib/elastic-search-channels.ts @@ -0,0 +1,213 @@ +import { CONFIG } from '../initializers/constants' +import { VideoChannel } from '@shared/models' +import { buildIndex, buildSort, elasticSearch, extractQueryResult, indexDocuments } from '../helpers/elastic-search' +import { logger } from '../helpers/logger' +import { DBChannel, IndexableChannel } from '../types/channel.model' +import { ChannelsSearchQuery } from '../types/channel-search.model' +import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar' + +function initChannelsIndex () { + return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS, buildChannelsMapping()) +} + +async function indexChannels (channels: IndexableChannel[], replace = false) { + return indexDocuments({ + objects: channels, + formatter: c => formatChannelForDB(c), + replace, + index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS + }) +} + +function refreshChannelsIndex () { + return elasticSearch.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS }) +} + +async function queryChannels (search: ChannelsSearchQuery) { + const bool: any = {} + + if (search.search) { + Object.assign(bool, { + must: [ + { + multi_match: { + query: search.search, + fields: [ 'name', 'displayName', 'description' ], + fuzziness: 'AUTO' + } + } + ] + }) + } + + const body = { + from: search.start, + size: search.count, + sort: buildSort(search.sort), + query: { bool } + } + + logger.debug('Will query Elastic Search for channels.', { body }) + + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.CHANNELS, + body + }) + + return extractQueryResult(res) +} + +export { + initChannelsIndex, + indexChannels, + refreshChannelsIndex, + formatChannelForAPI, + queryChannels +} + +// ############################################################################ + +function formatChannelForDB (c: IndexableChannel): DBChannel { + return { + id: c.id, + + url: c.url, + name: c.name, + host: c.host, + + avatar: formatAvatarForDB(c), + + displayName: c.displayName, + + indexedAt: new Date(), + + followingCount: c.followingCount, + followersCount: c.followersCount, + createdAt: c.createdAt, + updatedAt: c.updatedAt, + + description: c.description, + support: c.support, + + ownerAccount: { + id: c.ownerAccount.id, + url: c.ownerAccount.url, + + displayName: c.ownerAccount.displayName, + description: c.ownerAccount.description, + name: c.ownerAccount.name, + host: c.ownerAccount.host, + followingCount: c.ownerAccount.followingCount, + followersCount: c.ownerAccount.followersCount, + createdAt: c.ownerAccount.createdAt, + updatedAt: c.ownerAccount.updatedAt, + + avatar: formatAvatarForDB(c.ownerAccount) + } + } +} + +function formatChannelForAPI (c: DBChannel, fromHost?: string): VideoChannel { + return { + id: c.id, + + url: c.url, + name: c.name, + host: c.host, + followingCount: c.followingCount, + followersCount: c.followersCount, + createdAt: c.createdAt, + updatedAt: c.updatedAt, + avatar: formatAvatarForAPI(c), + + displayName: c.displayName, + description: c.description, + support: c.support, + isLocal: fromHost === c.host, + + ownerAccount: { + id: c.ownerAccount.id, + url: c.ownerAccount.url, + + displayName: c.ownerAccount.displayName, + description: c.ownerAccount.description, + name: c.ownerAccount.name, + host: c.ownerAccount.host, + followingCount: c.ownerAccount.followingCount, + followersCount: c.ownerAccount.followersCount, + createdAt: c.ownerAccount.createdAt, + updatedAt: c.ownerAccount.updatedAt, + + avatar: formatAvatarForAPI(c.ownerAccount) + } + } +} + +function buildChannelOrAccountCommonMapping () { + return { + id: { + type: 'long' + }, + + url: { + type: 'keyword' + }, + + name: { + type: 'text', + fields: { + raw: { + type: 'keyword' + } + } + }, + + host: { + type: 'keyword' + }, + + displayName: { + type: 'text' + }, + + avatar: { + properties: buildAvatarMapping() + }, + + followingCount: { + type: 'long' + }, + followersCount: { + type: 'long' + }, + + createdAt: { + type: 'date', + format: 'date_optional_time' + }, + updatedAt: { + type: 'date', + format: 'date_optional_time' + }, + + description: { + type: 'text' + } + } +} + +function buildChannelsMapping () { + const base = buildChannelOrAccountCommonMapping() + + Object.assign(base, { + support: { + type: 'keyword' + }, + + ownerAccount: { + properties: buildChannelOrAccountCommonMapping() + } + }) + + return base +} diff --git a/server/lib/elastic-search-instances.ts b/server/lib/elastic-search-instances.ts new file mode 100644 index 0000000..d768519 --- /dev/null +++ b/server/lib/elastic-search-instances.ts @@ -0,0 +1,35 @@ +import { elasticSearch } from '../helpers/elastic-search' +import { CONFIG } from '../initializers/constants' +import { getRemovedHosts, listIndexInstancesHost } from './instances-index' + +async function listIndexInstances () { + const res = await elasticSearch.search({ + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, + body: { + size: 0, + aggs: { + hosts: { + terms: { + field: 'host' + } + } + } + } + }) + + return res.body.aggregations.hosts.buckets.map(b => b.key) +} + +async function buildInstanceHosts () { + const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re') + + const dbHosts = await listIndexInstances() + const removedHosts = getRemovedHosts(dbHosts, indexHosts) + + return { indexHosts, removedHosts } +} + +export { + listIndexInstances, + buildInstanceHosts +} diff --git a/server/lib/elastic-search-videos.ts b/server/lib/elastic-search-videos.ts index b755744..88c0b57 100644 --- a/server/lib/elastic-search-videos.ts +++ b/server/lib/elastic-search-videos.ts @@ -1,100 +1,47 @@ import { CONFIG } from '../initializers/constants' import { DBVideo, DBVideoDetails, IndexableVideo, IndexableVideoDetails } from '../types/video.model' -import { flatMap } from 'lodash' -import { Avatar, Video } from '@shared/models' -import { buildSort, elasticSearch } from '../helpers/elastic-search' +import { Video } from '@shared/models' +import { buildIndex, buildSort, elasticSearch, extractQueryResult, indexDocuments } from '../helpers/elastic-search' import { VideosSearchQuery } from '../types/video-search.model' import { logger } from '../helpers/logger' +import { buildAvatarMapping, formatAvatarForAPI, formatAvatarForDB } from './elastic-search-avatar' function initVideosIndex () { - logger.info('Initialize %s Elastic Search index.', CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS) - - return elasticSearch.indices.create({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body: { - settings: { - number_of_shards: 1, - number_of_replicas: 1 - }, - mappings: { - properties: buildVideosMapping() - } - } - }).catch(err => { - if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return - - throw err - }) + return buildIndex(CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, buildVideosMapping()) } async function indexVideos (videos: IndexableVideo[], replace = false) { - const elIdIndex: { [elId: string]: string } = {} - - for (const video of videos) { - elIdIndex[video.elasticSearchId] = video.uuid - } - - const method = replace ? 'index' : 'update' - - const body = flatMap(videos, v => { - const doc = formatVideoForDB(v) - - const options = replace - ? doc - : { doc, doc_as_upsert: true } - - return [ - { - [method]: { - _id: v.elasticSearchId, - _index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS - } - }, - options - ] + return indexDocuments({ + objects: videos, + formatter: v => formatVideoForDB(v), + replace, + index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS }) - - const result = await elasticSearch.bulk({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body - }) - - const resultBody = result.body - - if (resultBody.errors === true) { - const msg = 'Cannot insert data in elastic search.' - logger.error(msg, { err: resultBody }) - throw new Error(msg) - } - - const created: string[] = result.body.items - .map(i => i[method]) - .filter(i => i.result === 'created') - .map(i => elIdIndex[i._id]) - - return { created } } function refreshVideosIndex () { return elasticSearch.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS }) } -async function listIndexInstances () { - const res = await elasticSearch.search({ +function removeVideosFromHosts (hosts: string[]) { + if (hosts.length === 0) return + + logger.info('Will remove videos from hosts.', { hosts }) + + return elasticSearch.delete_by_query({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, body: { - size: 0, - aggs: { - hosts: { - terms: { - field: 'host' + query: { + bool: { + filter: { + terms: { + host: hosts + } } } } } }) - - return res.body.aggregations.hosts.buckets.map(b => b.key) } async function queryVideos (search: VideosSearchQuery) { @@ -230,7 +177,7 @@ async function queryVideos (search: VideosSearchQuery) { const body = { from: search.start, size: search.count, - sort: buildVideosSort(search.sort), + sort: buildSort(search.sort), query: { bool } } @@ -241,35 +188,20 @@ async function queryVideos (search: VideosSearchQuery) { body }) - const hits = res.body.hits - return { total: hits.total.value, data: hits.hits.map(h => h._source) } + return extractQueryResult(res) } export { indexVideos, queryVideos, refreshVideosIndex, + removeVideosFromHosts, initVideosIndex, - listIndexInstances, formatVideoForAPI } // ############################################################################ -function buildVideosSort (sort: string) { - const { direction, field: sortField } = buildSort(sort) - - const field = sortField === 'match' - ? '_score' - : sortField - - return [ - { - [field]: { order: direction } - } - ] -} - function formatVideoForDB (v: IndexableVideo | IndexableVideoDetails): DBVideo | DBVideoDetails { return { id: v.id, @@ -336,16 +268,6 @@ function formatVideoForDB (v: IndexableVideo | IndexableVideoDetails): DBVideo | } } -function formatAvatarForDB (obj: { avatar?: Avatar }) { - if (!obj.avatar) return null - - return { - path: obj.avatar.path, - createdAt: obj.avatar.createdAt, - updatedAt: obj.avatar.updatedAt - } -} - function formatVideoForAPI (v: DBVideo, fromHost?: string): Video { return { id: v.id, @@ -409,16 +331,6 @@ function formatVideoForAPI (v: DBVideo, fromHost?: string): Video { } } -function formatAvatarForAPI (obj: { avatar?: Avatar }) { - if (!obj.avatar) return null - - return { - path: obj.avatar.path, - createdAt: obj.avatar.createdAt, - updatedAt: obj.avatar.updatedAt - } -} - function buildChannelOrAccountMapping () { return { id: { @@ -444,19 +356,7 @@ function buildChannelOrAccountMapping () { }, avatar: { - properties: { - path: { - type: 'keyword' - }, - createdAt: { - type: 'date', - format: 'date_optional_time' - }, - updatedAt: { - type: 'date', - format: 'date_optional_time' - } - } + properties: buildAvatarMapping() } } } diff --git a/server/lib/peertube-instance.ts b/server/lib/peertube-instance.ts new file mode 100644 index 0000000..a431b32 --- /dev/null +++ b/server/lib/peertube-instance.ts @@ -0,0 +1,60 @@ +import { IndexableVideo } from '../types/video.model' +import { doRequest } from '../helpers/requests' +import { ResultList, Video, VideoChannel, VideoDetails } from '@shared/models' +import { IndexableChannel } from '../types/channel.model' +import { INDEXER_COUNT } from '../initializers/constants' +import { IndexableDoc } from '../types/elastic-search.model' + +async function getVideo (host: string, uuid: string): Promise { + const url = 'https://' + host + '/api/v1/videos/' + uuid + + const res = await doRequest({ + uri: url, + json: true + }) + + return prepareVideoForDB(res.body, host) +} + +async function getChannel (host: string, name: string): Promise { + const url = 'https://' + host + '/api/v1/video-channels/' + name + + const res = await doRequest({ + uri: url, + json: true + }) + + return prepareChannelForDB(res.body, host) +} + +async function getVideos (host: string, start: number): Promise { + const url = 'https://' + host + '/api/v1/videos' + + const res = await doRequest>({ + uri: url, + qs: { + start, + filter: 'local', + skipCount: true, + count: INDEXER_COUNT.VIDEOS + }, + json: true + }) + + return res.body.data.map(v => prepareVideoForDB(v, host)) +} + +function prepareVideoForDB (video: T, host: string): T & IndexableDoc { + return Object.assign(video, { elasticSearchId: host + video.id, host }) +} + +function prepareChannelForDB (channel: T, host: string): T & IndexableDoc { + return Object.assign(channel, { elasticSearchId: host + channel.id, host }) +} + +export { + getVideo, + getChannel, + getVideos, + prepareChannelForDB +} diff --git a/server/lib/schedulers/videos-indexer.ts b/server/lib/schedulers/videos-indexer.ts index 3d50949..14c5b61 100644 --- a/server/lib/schedulers/videos-indexer.ts +++ b/server/lib/schedulers/videos-indexer.ts @@ -1,17 +1,16 @@ import { AbstractScheduler } from './abstract-scheduler' -import { CONFIG, INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' -import { doRequest } from '../../helpers/requests' +import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { logger } from '../../helpers/logger' -import { ResultList } from '../../../PeerTube/shared/models/result-list.model' -import { Video, VideoDetails } from '../../../PeerTube/shared/models/videos/video.model' -import { indexVideos, listIndexInstances, refreshVideosIndex } from '../elastic-search-videos' -import { IndexableVideo, IndexableDoc } from '../../types/video.model' +import { indexVideos, refreshVideosIndex, removeVideosFromHosts } from '../elastic-search-videos' +import { IndexableVideo } from '../../types/video.model' import { inspect } from 'util' -import { getRemovedHosts, listIndexInstancesHost } from '../instances-index' -import { elasticSearch } from '../../helpers/elastic-search' import { AsyncQueue, queue } from 'async' +import { buildInstanceHosts } from '../elastic-search-instances' +import { getChannel, getVideo, getVideos } from '../peertube-instance' +import { indexChannels, refreshChannelsIndex } from '../elastic-search-channels' type GetVideoQueueParam = { host: string, uuid: string } +type GetChannelQueueParam = { host: string, name: string } export class VideosIndexer extends AbstractScheduler { @@ -19,12 +18,13 @@ export class VideosIndexer extends AbstractScheduler { protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer - private readonly getVideoQueue: AsyncQueue + private readonly indexVideoQueue: AsyncQueue + private readonly indexChannelQueue: AsyncQueue private constructor () { super() - this.getVideoQueue = queue((task, cb) => { + this.indexVideoQueue = queue((task, cb) => { this.indexSpecificVideo(task.host, task.uuid) .then(() => cb()) .catch(err => { @@ -32,10 +32,28 @@ export class VideosIndexer extends AbstractScheduler { cb() }) }) + + this.indexChannelQueue = queue((task, cb) => { + this.indexSpecificChannel(task.host, task.name) + .then(() => cb()) + .catch(err => { + logger.error('Error in index specific channel.', { err: inspect(err) }) + cb() + }) + }) + + this.indexChannelQueue.drain(async () => { + logger.info('Refresh channels index.') + await refreshChannelsIndex() + }) } scheduleVideoIndexation (host: string, uuid: string) { - this.getVideoQueue.push({ uuid, host }) + this.indexVideoQueue.push({ uuid, host }) + } + + scheduleChannelIndexation (host: string, name: string) { + this.indexChannelQueue.push({ name, host }) } protected async internalExecute () { @@ -43,11 +61,10 @@ export class VideosIndexer extends AbstractScheduler { } private async runVideosIndexer () { - const dbHosts = await listIndexInstances() - const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re') + const { indexHosts, removedHosts } = await buildInstanceHosts() + const channelsToSync = new Set() - const hostsToRemove = getRemovedHosts(dbHosts, indexHosts) - await this.removeVideosFromHosts(hostsToRemove) + await removeVideosFromHosts(removedHosts) for (const host of indexHosts) { try { @@ -55,7 +72,7 @@ export class VideosIndexer extends AbstractScheduler { let start = 0 do { - videos = await this.getVideos(host, start) + videos = await getVideos(host, start) start += videos.length logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start) @@ -65,12 +82,16 @@ export class VideosIndexer extends AbstractScheduler { // Fetch complete video foreach created video (to get tags) for (const c of created) { - this.scheduleVideoIndexation(host, c) + this.scheduleVideoIndexation(host, c.uuid) } + + videos.forEach(v => channelsToSync.add(v.channel.name)) } } while (videos.length === INDEXER_COUNT.VIDEOS && start < 500) logger.info('Added video data from %s.', host) + + channelsToSync.forEach(c => this.scheduleChannelIndexation(host, c)) } catch (err) { console.error(inspect(err, { depth: 10 })) logger.warn('Cannot index videos from %s.', host, { err }) @@ -80,65 +101,20 @@ export class VideosIndexer extends AbstractScheduler { await refreshVideosIndex() } - private async getVideos (host: string, start: number): Promise { - const url = 'https://' + host + '/api/v1/videos' - - const res = await doRequest>({ - uri: url, - qs: { - start, - filter: 'local', - skipCount: true, - count: INDEXER_COUNT.VIDEOS - }, - json: true - }) - - return res.body.data.map(v => this.prepareVideoForDB(v, host)) - } - - private async getVideo (host: string, uuid: string): Promise { - const url = 'https://' + host + '/api/v1/videos/' + uuid - - const res = await doRequest({ - uri: url, - json: true - }) - - return this.prepareVideoForDB(res.body, host) - } - - private removeVideosFromHosts (hosts: string[]) { - if (hosts.length === 0) return - - logger.info('Will remove videos from hosts.', { hosts }) - - return elasticSearch.delete_by_query({ - index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS, - body: { - query: { - bool: { - filter: { - terms: { - host: hosts - } - } - } - } - } - }) - } - private async indexSpecificVideo (host: string, uuid: string) { - const video = await this.getVideo(host, uuid) + const video = await getVideo(host, uuid) logger.info('Indexing specific video %s of %s.', uuid, host) await indexVideos([ video ], true) } - private prepareVideoForDB (video: T, host: string): T & IndexableDoc { - return Object.assign(video, { elasticSearchId: host + video.id, host }) + private async indexSpecificChannel (host: string, name: string) { + const channel = await getChannel(host, name) + + logger.info('Indexing specific channel %s@%s.', name, host) + + await indexChannels([ channel ], true) } static get Instance () { diff --git a/server/middlewares/validators/search.ts b/server/middlewares/validators/search.ts index e8bec74..190106a 100644 --- a/server/middlewares/validators/search.ts +++ b/server/middlewares/validators/search.ts @@ -60,9 +60,22 @@ const videosSearchValidator = [ } ] +const videoChannelsSearchValidator = [ + query('search').not().isEmpty().withMessage('Should have a valid search'), + + (req: express.Request, res: express.Response, next: express.NextFunction) => { + logger.debug('Checking video channels search query', { parameters: req.query }) + + if (areValidationErrors(req, res)) return + + return next() + } +] + // --------------------------------------------------------------------------- export { + videoChannelsSearchValidator, commonVideosFiltersValidator, videosSearchValidator } diff --git a/server/middlewares/validators/sort.ts b/server/middlewares/validators/sort.ts index 5fa00ef..c281a80 100644 --- a/server/middlewares/validators/sort.ts +++ b/server/middlewares/validators/sort.ts @@ -2,11 +2,14 @@ import { SORTABLE_COLUMNS } from '../../initializers/constants' import { checkSort, createSortableColumns } from './utils' const SORTABLE_VIDEOS_SEARCH_COLUMNS = createSortableColumns(SORTABLE_COLUMNS.VIDEOS_SEARCH) +const SORTABLE_CHANNELS_SEARCH_COLUMNS = createSortableColumns(SORTABLE_COLUMNS.CHANNELS_SEARCH) const videosSearchSortValidator = checkSort(SORTABLE_VIDEOS_SEARCH_COLUMNS) +const channelsSearchSortValidator = checkSort(SORTABLE_VIDEOS_SEARCH_COLUMNS) // --------------------------------------------------------------------------- export { - videosSearchSortValidator + videosSearchSortValidator, + channelsSearchSortValidator } diff --git a/server/types/channel-search.model.ts b/server/types/channel-search.model.ts new file mode 100644 index 0000000..1670da9 --- /dev/null +++ b/server/types/channel-search.model.ts @@ -0,0 +1,5 @@ +import { + VideoChannelsSearchQuery as PeerTubeChannelsSearchQuery +} from '../../PeerTube/shared/models/search/video-channels-search-query.model' + +export type ChannelsSearchQuery = PeerTubeChannelsSearchQuery diff --git a/server/types/channel.model.ts b/server/types/channel.model.ts new file mode 100644 index 0000000..b2ac2f6 --- /dev/null +++ b/server/types/channel.model.ts @@ -0,0 +1,16 @@ +import { IndexableDoc } from './elastic-search.model' +import { VideoChannel, VideoChannelSummary } from '@shared/models' + +export interface IndexableChannelSummary extends VideoChannelSummary, IndexableDoc { +} + +export interface IndexableChannel extends VideoChannel, IndexableDoc { +} + +export interface DBChannel extends Omit { + indexedAt: Date +} + +export interface DBChannelSummary extends VideoChannelSummary { + indexedAt: Date +} diff --git a/server/types/elastic-search.model.ts b/server/types/elastic-search.model.ts new file mode 100644 index 0000000..a252b26 --- /dev/null +++ b/server/types/elastic-search.model.ts @@ -0,0 +1,4 @@ +export interface IndexableDoc { + elasticSearchId: string + host: string +} diff --git a/server/types/video.model.ts b/server/types/video.model.ts index 2e5a861..8a501cf 100644 --- a/server/types/video.model.ts +++ b/server/types/video.model.ts @@ -1,9 +1,5 @@ import { Video, VideoDetails } from '@shared/models/videos/video.model' - -export interface IndexableDoc { - elasticSearchId: string - host: string -} +import { IndexableDoc } from './elastic-search.model' export interface IndexableVideo extends Video, IndexableDoc { }