Index videos

This commit is contained in:
Chocobozzz 2020-02-13 16:06:52 +01:00
parent dbbf47bba3
commit c2af798632
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
13 changed files with 419 additions and 19 deletions

View File

@ -12,6 +12,7 @@ elastic_search:
port: 9200
indexes:
videos: 'peertube-index-videos'
instances: 'peertube-index-instances'
log:
level: 'debug' # debug/info/warning/error

4
config/test.yaml Normal file
View File

@ -0,0 +1,4 @@
elastic_search:
indexes:
videos: 'peertube-index-videos-test1'
instances: 'peertube-index-instances-test1'

View File

@ -29,6 +29,7 @@
"express": "^4.16.4",
"express-validator": "^6.1.1",
"js-yaml": "^3.12.1",
"lodash": "^4.17.15",
"mkdirp": "^0.5.1",
"morgan": "^1.9.1",
"multer": "^1.4.1",
@ -47,6 +48,7 @@
"@types/body-parser": "^1.16.3",
"@types/config": "^0.0.34",
"@types/express": "^4.16.1",
"@types/lodash": "^4.14.149",
"@types/mkdirp": "^0.5.1",
"@types/morgan": "^1.7.32",
"@types/multer": "^1.3.3",
@ -64,6 +66,6 @@
"eslint-plugin-promise": "^4.2.1",
"eslint-plugin-standard": "^4.0.1",
"tslint-config-standard": "^8.0.1",
"typescript": "^3.3.3"
"typescript": "^3.7.5"
}
}

View File

@ -10,6 +10,8 @@ import * as morgan from 'morgan'
import { apiRouter } from './server/controllers/api'
import { logger } from './server/helpers/logger'
import { API_VERSION, CONFIG } from './server/initializers/constants'
import { VideosIndexer } from './server/lib/schedulers/videos-indexer'
import { initVideosIndex } from './server/helpers/elastic-search'
const app = express()
@ -51,6 +53,16 @@ app.use(function (err, req, res, next) {
// ----------- Run -----------
app.listen(CONFIG.LISTEN.PORT, () => {
app.listen(CONFIG.LISTEN.PORT, async () => {
logger.info('Server listening on port %d', CONFIG.LISTEN.PORT)
try {
await initVideosIndex()
} catch (err) {
logger.error('Cannot init videos index.', { err })
process.exit(-1)
}
VideosIndexer.Instance.enable()
VideosIndexer.Instance.execute()
})

View File

@ -1,15 +1,56 @@
import { Client } from '@elastic/elasticsearch'
import { CONFIG } from '../initializers/constants'
import { IndexableVideo } from '../types/video.model'
import { Avatar } from '@shared/models/avatars/avatar.model'
import { flatMap } from 'lodash'
const client = new Client({ node: CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
const client = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
function indexVideo (body: any) {
return client.index({
function initVideosIndex () {
return client.indices.create({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
settings: {
number_of_shards: 1,
number_of_replicas: 1
},
mappings: {
properties: buildVideosMapping()
}
}
}).catch(err => {
if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return
throw err
})
}
function indexVideos (videos: IndexableVideo[]) {
const body = flatMap(videos, v => {
return [
{
update: {
_id: v.elasticSearchId,
_index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS
}
},
{
doc: formatVideo(v),
doc_as_upsert: true
}
]
})
return client.bulk({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body
})
}
function refreshVideosIndex () {
return client.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS })
}
async function queryVideos (query: any) {
const res = await client.search({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
@ -18,10 +59,231 @@ async function queryVideos (query: any) {
}
})
return res.hits.hits
return res.body.hits.hits
}
export {
indexVideo,
queryVideos
indexVideos,
queryVideos,
refreshVideosIndex,
initVideosIndex
}
// ############################################################################
function formatVideo (v: IndexableVideo) {
return {
uuid: v.uuid,
createdAt: v.createdAt,
updatedAt: v.updatedAt,
publishedAt: v.publishedAt,
originallyPublishedAt: v.originallyPublishedAt,
category: {
id: v.category.id,
label: v.category.label
},
licence: {
id: v.licence.id,
label: v.licence.label
},
language: {
id: v.language.id,
label: v.language.label
},
privacy: {
id: v.privacy.id,
label: v.privacy.label
},
name: v.name,
description: v.description,
duration: v.duration,
thumbnailPath: v.thumbnailPath,
previewPath: v.previewPath,
embedPath: v.embedPath,
views: v.views,
likes: v.likes,
dislikes: v.dislikes,
nsfw: v.nsfw,
host: v.host,
account: {
name: v.account.name,
displayName: v.account.displayName,
url: v.account.url,
host: v.account.host,
avatar: formatAvatar(v.account)
},
channel: {
name: v.channel.name,
displayName: v.channel.displayName,
url: v.channel.url,
host: v.channel.host,
avatar: formatAvatar(v.channel)
}
}
}
function formatAvatar (obj: { avatar?: Avatar }) {
if (!obj.avatar) return null
return {
path: obj.avatar.path,
createdAt: obj.avatar.createdAt,
updatedAt: obj.avatar.updatedAt
}
}
function buildChannelOrAccountMapping () {
return {
name: {
type: 'text',
fields: {
raw: {
type: 'keyword'
}
}
},
displayName: {
type: 'text'
},
url: {
type: 'keyword'
},
host: {
type: 'keyword'
},
avatar: {
properties: {
path: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
}
}
}
}
}
function buildVideosMapping () {
return {
uuid: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
},
publishedAt: {
type: 'date'
},
originallyPublishedAt: {
type: 'date'
},
category: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
licence: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
language: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
privacy: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
name: {
type: 'text'
},
description: {
type: 'text'
},
duration: {
type: 'long'
},
thumbnailPath: {
type: 'keyword'
},
previewPath: {
type: 'keyword'
},
embedPath: {
type: 'keyword'
},
views: {
type: 'long'
},
likes: {
type: 'long'
},
dislikes: {
type: 'long'
},
nsfw: {
type: 'boolean'
},
host: {
type: 'keyword'
},
account: {
properties: buildChannelOrAccountMapping()
},
channel: {
properties: buildChannelOrAccountMapping()
}
}
}

View File

@ -1,10 +1,10 @@
import * as Bluebird from 'bluebird'
import * as request from 'request'
function doRequest (
function doRequest <T> (
requestOptions: request.CoreOptions & request.UriOptions
): Bluebird<{ response: request.RequestResponse, body: any }> {
return new Bluebird<{ response: request.RequestResponse, body: any }>((res, rej) => {
): Bluebird<{ response: request.RequestResponse, body: T }> {
return new Bluebird<{ response: request.RequestResponse, body: T }>((res, rej) => {
request(requestOptions, (err, response, body) => err ? rej(err) : res({ response, body }))
})
}

View File

@ -27,10 +27,16 @@ const SORTABLE_COLUMNS = {
const PAGINATION_COUNT_DEFAULT = 20
let SCHEDULER_INTERVAL = 60000 * 60 * 12 // 12 hours
const SCHEDULER_INTERVALS_MS = {
videosIndexer: 60000 * 60 * 12 // 12 hours
}
const INDEXER_COUNT = {
VIDEOS: 1
}
if (isTestInstance()) {
SCHEDULER_INTERVAL = 10000
SCHEDULER_INTERVALS_MS.videosIndexer = 10000
}
export {
@ -38,5 +44,6 @@ export {
API_VERSION,
PAGINATION_COUNT_DEFAULT,
SORTABLE_COLUMNS,
SCHEDULER_INTERVAL
SCHEDULER_INTERVALS_MS,
INDEXER_COUNT
}

View File

@ -0,0 +1,35 @@
import { logger } from '../../helpers/logger'
import * as Bluebird from 'bluebird'
export abstract class AbstractScheduler {
protected abstract schedulerIntervalMs: number
private interval: NodeJS.Timer
private isRunning = false
enable () {
if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
this.interval = setInterval(() => this.execute(), this.schedulerIntervalMs)
}
disable () {
clearInterval(this.interval)
}
async execute () {
if (this.isRunning === true) return
this.isRunning = true
try {
await this.internalExecute()
} catch (err) {
logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
} finally {
this.isRunning = false
}
}
protected abstract internalExecute (): Promise<any> | Bluebird<any>
}

View File

@ -0,0 +1,63 @@
import { AbstractScheduler } from './abstract-scheduler'
import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { doRequest } from '../../helpers/requests'
import { logger } from '../../helpers/logger'
import { ResultList } from '../../../PeerTube/shared/models/result-list.model'
import { Video } from '../../../PeerTube/shared/models/videos/video.model'
import { indexVideos, refreshVideosIndex } from '../../helpers/elastic-search'
import { IndexableVideo } from '../../types/video.model'
import { inspect } from 'util'
export class VideosIndexer extends AbstractScheduler {
private static instance: AbstractScheduler
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer
private constructor () {
super()
}
protected async internalExecute () {
return this.indexVideos()
}
private async indexVideos () {
const instances = [ 'peertube.cpy.re' ]
for (const instance of instances) {
try {
const videos = await this.getVideos(instance)
await indexVideos(videos)
logger.info('Added video data from %s.', instance)
} catch (err) {
console.error(inspect(err, { depth: 10 }))
logger.warn('Cannot index videos from %s.', instance, { err })
}
}
await refreshVideosIndex()
}
private async getVideos (host: string): Promise<IndexableVideo[]> {
const url = 'https://' + host + '/api/v1/videos'
const res = await doRequest<ResultList<Video>>({
uri: url,
qs: {
start: 0,
filter: 'local',
count: INDEXER_COUNT.VIDEOS
},
json: true
})
return res.body.data.map(v => Object.assign(v, { elasticSearchId: host + v.id, host }))
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}

View File

@ -0,0 +1,6 @@
import { Video } from '@shared/models/videos/video.model'
export interface IndexableVideo extends Video {
elasticSearchId: string
host: string
}

View File

@ -16,7 +16,10 @@
"types": [
"node"
],
"baseUrl": "."
"baseUrl": ".",
"paths": {
"@shared/*": [ "PeerTube/shared/*" ]
}
},
"exclude": [
"node_modules",

View File

@ -116,6 +116,11 @@
resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.136.tgz#413e85089046b865d960c9ff1d400e04c31ab60f"
integrity sha512-0GJhzBdvsW2RUccNHOBkabI8HZVdOXmXbXhuKlDEd5Vv12P7oAVGfomGp3Ne21o5D/qu1WmthlNKFaoZJJeErA==
"@types/lodash@^4.14.149":
version "4.14.149"
resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.149.tgz#1342d63d948c6062838fbf961012f74d4e638440"
integrity sha512-ijGqzZt/b7BfzcK9vTrS6MFljQRPn5BFWOx8oE0GYxribu6uV+aA9zZuXI1zc/etK9E8nrgdoF2+LgUw7+9tJQ==
"@types/mime@*":
version "2.0.1"
resolved "https://registry.yarnpkg.com/@types/mime/-/mime-2.0.1.tgz#dc488842312a7f075149312905b5e3c0b054c79d"
@ -2667,10 +2672,10 @@ typedarray@^0.0.6:
resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777"
integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c=
typescript@^3.3.3:
version "3.5.3"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977"
integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g==
typescript@^3.7.5:
version "3.7.5"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.7.5.tgz#0692e21f65fd4108b9330238aac11dd2e177a1ae"
integrity sha512-/P5lkRXkWHNAbcJIiHPfRoKqyd7bsyCma1hZNUGfn20qm64T6ZBlrzprymeu918H+mB/0rIg2gGK/BXkhhYgBw==
unpipe@1.0.0, unpipe@~1.0.0:
version "1.0.0"