Index instances

This commit is contained in:
Chocobozzz 2020-02-14 14:09:31 +01:00
parent c2af798632
commit 9dc427259d
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
10 changed files with 383 additions and 299 deletions

View File

@ -82,8 +82,7 @@
],
"parserOptions": {
"project": [
"./tsconfig.json",
"./server/tools/tsconfig.json"
"./tsconfig.json"
]
}
}

View File

@ -12,7 +12,6 @@ elastic_search:
port: 9200
indexes:
videos: 'peertube-index-videos'
instances: 'peertube-index-instances'
log:
level: 'debug' # debug/info/warning/error

View File

@ -1,4 +1,3 @@
elastic_search:
indexes:
videos: 'peertube-index-videos-test1'
instances: 'peertube-index-instances-test1'

View File

@ -11,7 +11,7 @@ import { apiRouter } from './server/controllers/api'
import { logger } from './server/helpers/logger'
import { API_VERSION, CONFIG } from './server/initializers/constants'
import { VideosIndexer } from './server/lib/schedulers/videos-indexer'
import { initVideosIndex } from './server/helpers/elastic-search'
import { initVideosIndex } from './server/lib/elastic-search-videos'
const app = express()

View File

@ -1,289 +1,8 @@
import { Client } from '@elastic/elasticsearch'
import { CONFIG } from '../initializers/constants'
import { IndexableVideo } from '../types/video.model'
import { Avatar } from '@shared/models/avatars/avatar.model'
import { flatMap } from 'lodash'
const client = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
function initVideosIndex () {
return client.indices.create({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
settings: {
number_of_shards: 1,
number_of_replicas: 1
},
mappings: {
properties: buildVideosMapping()
}
}
}).catch(err => {
if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return
throw err
})
}
function indexVideos (videos: IndexableVideo[]) {
const body = flatMap(videos, v => {
return [
{
update: {
_id: v.elasticSearchId,
_index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS
}
},
{
doc: formatVideo(v),
doc_as_upsert: true
}
]
})
return client.bulk({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body
})
}
function refreshVideosIndex () {
return client.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS })
}
async function queryVideos (query: any) {
const res = await client.search({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query
}
})
return res.body.hits.hits
}
const elasticSearch = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
export {
indexVideos,
queryVideos,
refreshVideosIndex,
initVideosIndex
}
// ############################################################################
function formatVideo (v: IndexableVideo) {
return {
uuid: v.uuid,
createdAt: v.createdAt,
updatedAt: v.updatedAt,
publishedAt: v.publishedAt,
originallyPublishedAt: v.originallyPublishedAt,
category: {
id: v.category.id,
label: v.category.label
},
licence: {
id: v.licence.id,
label: v.licence.label
},
language: {
id: v.language.id,
label: v.language.label
},
privacy: {
id: v.privacy.id,
label: v.privacy.label
},
name: v.name,
description: v.description,
duration: v.duration,
thumbnailPath: v.thumbnailPath,
previewPath: v.previewPath,
embedPath: v.embedPath,
views: v.views,
likes: v.likes,
dislikes: v.dislikes,
nsfw: v.nsfw,
host: v.host,
account: {
name: v.account.name,
displayName: v.account.displayName,
url: v.account.url,
host: v.account.host,
avatar: formatAvatar(v.account)
},
channel: {
name: v.channel.name,
displayName: v.channel.displayName,
url: v.channel.url,
host: v.channel.host,
avatar: formatAvatar(v.channel)
}
}
}
function formatAvatar (obj: { avatar?: Avatar }) {
if (!obj.avatar) return null
return {
path: obj.avatar.path,
createdAt: obj.avatar.createdAt,
updatedAt: obj.avatar.updatedAt
}
}
function buildChannelOrAccountMapping () {
return {
name: {
type: 'text',
fields: {
raw: {
type: 'keyword'
}
}
},
displayName: {
type: 'text'
},
url: {
type: 'keyword'
},
host: {
type: 'keyword'
},
avatar: {
properties: {
path: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
}
}
}
}
}
function buildVideosMapping () {
return {
uuid: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
},
publishedAt: {
type: 'date'
},
originallyPublishedAt: {
type: 'date'
},
category: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
licence: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
language: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
privacy: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
name: {
type: 'text'
},
description: {
type: 'text'
},
duration: {
type: 'long'
},
thumbnailPath: {
type: 'keyword'
},
previewPath: {
type: 'keyword'
},
embedPath: {
type: 'keyword'
},
views: {
type: 'long'
},
likes: {
type: 'long'
},
dislikes: {
type: 'long'
},
nsfw: {
type: 'boolean'
},
host: {
type: 'keyword'
},
account: {
properties: buildChannelOrAccountMapping()
},
channel: {
properties: buildChannelOrAccountMapping()
}
}
elasticSearch
}

View File

@ -17,7 +17,7 @@ const CONFIG = {
LOG: {
LEVEL: config.get<string>('log.level')
},
INSTANCES_URL: {
INSTANCES_INDEX: {
URL: config.get<string>('instances-index.url')
}
}
@ -32,7 +32,7 @@ const SCHEDULER_INTERVALS_MS = {
}
const INDEXER_COUNT = {
VIDEOS: 1
VIDEOS: 10
}
if (isTestInstance()) {

View File

@ -0,0 +1,310 @@
import { CONFIG } from '../initializers/constants'
import { IndexableVideo } from '../types/video.model'
import { flatMap } from 'lodash'
import { Avatar } from '@shared/models'
import { elasticSearch } from '../helpers/elastic-search'
function initVideosIndex () {
return elasticSearch.indices.create({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
settings: {
number_of_shards: 1,
number_of_replicas: 1
},
mappings: {
properties: buildVideosMapping()
}
}
}).catch(err => {
if (err.name === 'ResponseError' && err.meta?.body?.error.root_cause[0]?.type === 'resource_already_exists_exception') return
throw err
})
}
function indexVideos (videos: IndexableVideo[]) {
const body = flatMap(videos, v => {
return [
{
update: {
_id: v.elasticSearchId,
_index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS
}
},
{
doc: formatVideo(v),
doc_as_upsert: true
}
]
})
return elasticSearch.bulk({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body
})
}
function refreshVideosIndex () {
return elasticSearch.indices.refresh({ index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS })
}
async function listIndexInstances () {
const res = await elasticSearch.search({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
size: 0,
aggs: {
hosts: {
terms: {
field: 'host'
}
}
}
}
})
return res.body.aggregations.hosts.buckets.map(b => b.key)
}
async function queryVideos (query: any) {
const res = await elasticSearch.search({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query
}
})
return res.body.hits.hits
}
export {
indexVideos,
queryVideos,
refreshVideosIndex,
initVideosIndex,
listIndexInstances
}
// ############################################################################
function formatVideo (v: IndexableVideo) {
return {
uuid: v.uuid,
indexedAt: new Date(),
createdAt: v.createdAt,
updatedAt: v.updatedAt,
publishedAt: v.publishedAt,
originallyPublishedAt: v.originallyPublishedAt,
category: {
id: v.category.id,
label: v.category.label
},
licence: {
id: v.licence.id,
label: v.licence.label
},
language: {
id: v.language.id,
label: v.language.label
},
privacy: {
id: v.privacy.id,
label: v.privacy.label
},
name: v.name,
description: v.description,
duration: v.duration,
thumbnailPath: v.thumbnailPath,
previewPath: v.previewPath,
embedPath: v.embedPath,
views: v.views,
likes: v.likes,
dislikes: v.dislikes,
nsfw: v.nsfw,
host: v.host,
account: {
name: v.account.name,
displayName: v.account.displayName,
url: v.account.url,
host: v.account.host,
avatar: formatAvatar(v.account)
},
channel: {
name: v.channel.name,
displayName: v.channel.displayName,
url: v.channel.url,
host: v.channel.host,
avatar: formatAvatar(v.channel)
}
}
}
function formatAvatar (obj: { avatar?: Avatar }) {
if (!obj.avatar) return null
return {
path: obj.avatar.path,
createdAt: obj.avatar.createdAt,
updatedAt: obj.avatar.updatedAt
}
}
function buildChannelOrAccountMapping () {
return {
name: {
type: 'text',
fields: {
raw: {
type: 'keyword'
}
}
},
displayName: {
type: 'text'
},
url: {
type: 'keyword'
},
host: {
type: 'keyword'
},
avatar: {
properties: {
path: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
}
}
}
}
}
function buildVideosMapping () {
return {
uuid: {
type: 'keyword'
},
createdAt: {
type: 'date'
},
updatedAt: {
type: 'date'
},
publishedAt: {
type: 'date'
},
originallyPublishedAt: {
type: 'date'
},
indexedAt: {
type: 'date'
},
category: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
licence: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
language: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
privacy: {
properties: {
id: {
type: 'keyword'
},
label: {
type: 'text'
}
}
},
name: {
type: 'text'
},
description: {
type: 'text'
},
duration: {
type: 'long'
},
thumbnailPath: {
type: 'keyword'
},
previewPath: {
type: 'keyword'
},
embedPath: {
type: 'keyword'
},
views: {
type: 'long'
},
likes: {
type: 'long'
},
dislikes: {
type: 'long'
},
nsfw: {
type: 'boolean'
},
host: {
type: 'keyword'
},
account: {
properties: buildChannelOrAccountMapping()
},
channel: {
properties: buildChannelOrAccountMapping()
}
}
}

View File

@ -0,0 +1,21 @@
import { CONFIG } from '../initializers/constants'
import { doRequest } from '../helpers/requests'
async function listIndexInstancesHost () {
const uri = CONFIG.INSTANCES_INDEX.URL + '/api/v1/instances/hosts'
const qs = { count: 5000 }
const { body } = await doRequest({ uri, qs, json: true })
return body.data.map(o => o.host as string)
}
function getRemovedHosts (dbHosts: string[], indexHosts: string[]) {
return dbHosts.filter(dbHost => indexHosts.includes(dbHost) === false)
}
export {
getRemovedHosts,
listIndexInstancesHost
}

View File

@ -1,5 +1,6 @@
import { logger } from '../../helpers/logger'
import * as Bluebird from 'bluebird'
import { inspect } from 'util'
export abstract class AbstractScheduler {
@ -25,7 +26,7 @@ export abstract class AbstractScheduler {
try {
await this.internalExecute()
} catch (err) {
logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
logger.error('Cannot execute %s scheduler.', this.constructor.name, { err: inspect(err) })
} finally {
this.isRunning = false
}

View File

@ -1,12 +1,14 @@
import { AbstractScheduler } from './abstract-scheduler'
import { INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { CONFIG, INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { doRequest } from '../../helpers/requests'
import { logger } from '../../helpers/logger'
import { ResultList } from '../../../PeerTube/shared/models/result-list.model'
import { Video } from '../../../PeerTube/shared/models/videos/video.model'
import { indexVideos, refreshVideosIndex } from '../../helpers/elastic-search'
import { indexVideos, listIndexInstances, refreshVideosIndex } from '../elastic-search-videos'
import { IndexableVideo } from '../../types/video.model'
import { inspect } from 'util'
import { getRemovedHosts, listIndexInstancesHost } from '../instances-index'
import { elasticSearch } from '../../helpers/elastic-search'
export class VideosIndexer extends AbstractScheduler {
@ -23,13 +25,27 @@ export class VideosIndexer extends AbstractScheduler {
}
private async indexVideos () {
const instances = [ 'peertube.cpy.re' ]
const dbHosts = await listIndexInstances()
const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re')
for (const instance of instances) {
const hostsToRemove = getRemovedHosts(dbHosts, indexHosts)
await this.removeVideosFromHosts(hostsToRemove)
for (const instance of indexHosts) {
try {
const videos = await this.getVideos(instance)
let videos: IndexableVideo[] = []
let start = 0
await indexVideos(videos)
do {
videos = await this.getVideos(instance, start)
start += videos.length
logger.debug('Getting %d results from %s (from = %d).', videos.length, instance, start)
if (videos.length !== 0) {
await indexVideos(videos)
}
} while (videos.length === INDEXER_COUNT.VIDEOS && start < 500)
logger.info('Added video data from %s.', instance)
} catch (err) {
@ -41,14 +57,15 @@ export class VideosIndexer extends AbstractScheduler {
await refreshVideosIndex()
}
private async getVideos (host: string): Promise<IndexableVideo[]> {
private async getVideos (host: string, start: number): Promise<IndexableVideo[]> {
const url = 'https://' + host + '/api/v1/videos'
const res = await doRequest<ResultList<Video>>({
uri: url,
qs: {
start: 0,
start,
filter: 'local',
skipCount: true,
count: INDEXER_COUNT.VIDEOS
},
json: true
@ -57,6 +74,25 @@ export class VideosIndexer extends AbstractScheduler {
return res.body.data.map(v => Object.assign(v, { elasticSearchId: host + v.id, host }))
}
private removeVideosFromHosts (hosts: string[]) {
if (hosts.length === 0) return
logger.info('Will remove videos from hosts.', { hosts })
const should = hosts.map(host => ({ term: { host } }))
return elasticSearch.delete_by_query({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query: {
bool: {
should
}
}
}
})
}
static get Instance () {
return this.instance || (this.instance = new this())
}