Prepare search endpoint

This commit is contained in:
Chocobozzz 2020-02-14 16:14:45 +01:00
parent 9dc427259d
commit 34dcbaa200
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
11 changed files with 435 additions and 33 deletions

View File

@ -0,0 +1,33 @@
import * as express from 'express'
import { paginationValidator } from '../../middlewares/validators/pagination'
import { setDefaultPagination } from '../../middlewares/pagination'
import { asyncMiddleware } from '../../middlewares/async'
import { getFormattedObjects } from '../../helpers/utils'
import { queryVideos } from '../../lib/elastic-search-videos'
import { videosSearchSortValidator } from '../../middlewares/validators/sort'
import { commonVideosFiltersValidator, videosSearchValidator } from '../../middlewares/validators/search'
import { setDefaultSearchSort } from '../../middlewares/sort'
const searchRouter = express.Router()
searchRouter.get('/videos',
paginationValidator,
setDefaultPagination,
videosSearchSortValidator,
setDefaultSearchSort,
commonVideosFiltersValidator,
videosSearchValidator,
asyncMiddleware(searchVideos)
)
// ---------------------------------------------------------------------------
export { searchRouter }
// ---------------------------------------------------------------------------
async function searchVideos (req: express.Request, res: express.Response) {
const resultList = await queryVideos(req.body)
return res.json(getFormattedObjects(resultList.data, resultList.total))
}

View File

@ -0,0 +1,21 @@
import { isArray } from './misc'
function isNumberArray (value: any) {
return isArray(value) && value.every(v => validator.isInt('' + v))
}
function isStringArray (value: any) {
return isArray(value) && value.every(v => typeof v === 'string')
}
function isNSFWQueryValid (value: any) {
return value === 'true' || value === 'false' || value === 'both'
}
// ---------------------------------------------------------------------------
export {
isNumberArray,
isStringArray,
isNSFWQueryValid
}

View File

@ -3,6 +3,22 @@ import { CONFIG } from '../initializers/constants'
const elasticSearch = new Client({ node: 'http://' + CONFIG.ELASTIC_SEARCH.HOSTNAME + ':' + CONFIG.ELASTIC_SEARCH.PORT })
export {
elasticSearch
function buildSort (value: string) {
let field: string
let direction: 'asc' | 'desc'
if (value.substring(0, 1) === '-') {
direction = 'desc'
field = value.substring(1)
} else {
direction = 'asc'
field = value
}
return { direction, field }
}
export {
elasticSearch,
buildSort
}

View File

@ -23,6 +23,7 @@ const CONFIG = {
}
const SORTABLE_COLUMNS = {
VIDEOS_SEARCH: [ 'name', 'duration', 'createdAt', 'publishedAt', 'views', 'likes', 'match' ]
}
const PAGINATION_COUNT_DEFAULT = 20

View File

@ -1,8 +1,10 @@
import { CONFIG } from '../initializers/constants'
import { IndexableVideo } from '../types/video.model'
import { DBVideo, DBVideoDetails, IndexableVideo, IndexableVideoDetails } from '../types/video.model'
import { flatMap } from 'lodash'
import { Avatar } from '@shared/models'
import { elasticSearch } from '../helpers/elastic-search'
import { buildSort, elasticSearch } from '../helpers/elastic-search'
import { VideosSearchQuery } from '../types/video-search.model'
import { inspect } from 'util'
function initVideosIndex () {
return elasticSearch.indices.create({
@ -23,7 +25,13 @@ function initVideosIndex () {
})
}
function indexVideos (videos: IndexableVideo[]) {
async function indexVideos (videos: IndexableVideo[]) {
const elIdIndex: { [elId: string]: string } = {}
for (const video of videos) {
elIdIndex[video.elasticSearchId] = video.uuid
}
const body = flatMap(videos, v => {
return [
{
@ -39,10 +47,17 @@ function indexVideos (videos: IndexableVideo[]) {
]
})
return elasticSearch.bulk({
const result = await elasticSearch.bulk({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body
})
const created: string[] = result.body.items
.map(i => i.update)
.filter(i => i.result === 'created')
.map(i => elIdIndex[i._id])
return { created }
}
function refreshVideosIndex () {
@ -67,15 +82,148 @@ async function listIndexInstances () {
return res.body.aggregations.hosts.buckets.map(b => b.key)
}
async function queryVideos (query: any) {
async function queryVideos (search: VideosSearchQuery) {
const bool: any = {}
const filter: any[] = []
if (search.search) {
Object.assign(bool, {
must: [
{
multi_match: {
query: search.search,
fields: [ 'name', 'description' ]
}
}
]
})
}
if (search.startDate) {
filter.push({
range: {
publishedAt: {
gte: search.startDate
}
}
})
}
if (search.endDate) {
filter.push({
range: {
publishedAt: {
lte: search.endDate
}
}
})
}
if (search.originallyPublishedStartDate) {
filter.push({
range: {
originallyPublishedAt: {
gte: search.startDate
}
}
})
}
if (search.originallyPublishedEndDate) {
filter.push({
range: {
originallyPublishedAt: {
lte: search.endDate
}
}
})
}
if (search.nsfw) {
filter.push({
term: {
nsfw: search.nsfw
}
})
}
if (search.categoryOneOf) {
filter.push({
terms: {
category: search.categoryOneOf
}
})
}
if (search.licenceOneOf) {
filter.push({
terms: {
licence: search.licenceOneOf
}
})
}
if (search.languageOneOf) {
filter.push({
terms: {
language: search.languageOneOf
}
})
}
// FIXME: rework
if (search.tagsOneOf) {
filter.push({
terms: {
tags: search.tagsOneOf,
minimum_should_match: 1
}
})
}
if (search.tagsAllOf) {
filter.push({
terms: {
tags: search.tagsOneOf,
minimum_should_match: 1
}
})
}
if (search.durationMin) {
filter.push({
range: {
duration: {
gte: search.durationMin
}
}
})
}
if (search.durationMax) {
filter.push({
range: {
duration: {
lte: search.durationMax
}
}
})
}
Object.assign(bool, { filter })
const res = await elasticSearch.search({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query
from: search.start,
size: search.count,
sort: buildVideosSort(search.sort),
query: { bool }
}
})
return res.body.hits.hits
const hits = res.body.hits
return { total: hits.total.value, data: hits.hits.map(h => h._source) }
}
export {
@ -88,8 +236,23 @@ export {
// ############################################################################
function formatVideo (v: IndexableVideo) {
function buildVideosSort (sort: string) {
const { direction, field: sortField } = buildSort(sort)
const field = sortField === 'match'
? '_score'
: sortField
return [
{
[field]: { order: direction }
}
]
}
function formatVideo (v: IndexableVideo | IndexableVideoDetails): DBVideo | DBVideoDetails {
return {
id: v.id,
uuid: v.uuid,
indexedAt: new Date(),
@ -129,7 +292,10 @@ function formatVideo (v: IndexableVideo) {
nsfw: v.nsfw,
host: v.host,
tags: (v as any).tags ? (v as any).tags : [],
account: {
id: v.account.id,
name: v.account.name,
displayName: v.account.displayName,
url: v.account.url,
@ -139,6 +305,7 @@ function formatVideo (v: IndexableVideo) {
},
channel: {
id: v.channel.id,
name: v.channel.name,
displayName: v.channel.displayName,
url: v.channel.url,
@ -161,6 +328,10 @@ function formatAvatar (obj: { avatar?: Avatar }) {
function buildChannelOrAccountMapping () {
return {
id: {
type: 'long'
},
name: {
type: 'text',
fields: {
@ -197,6 +368,10 @@ function buildChannelOrAccountMapping () {
function buildVideosMapping () {
return {
id: {
type: 'long'
},
uuid: {
type: 'keyword'
},
@ -268,6 +443,16 @@ function buildVideosMapping () {
type: 'text'
},
tags: {
type: 'text',
fields: {
raw: {
type: 'keyword'
}
}
},
duration: {
type: 'long'
},

View File

@ -3,12 +3,15 @@ import { CONFIG, INDEXER_COUNT, SCHEDULER_INTERVALS_MS } from '../../initializer
import { doRequest } from '../../helpers/requests'
import { logger } from '../../helpers/logger'
import { ResultList } from '../../../PeerTube/shared/models/result-list.model'
import { Video } from '../../../PeerTube/shared/models/videos/video.model'
import { Video, VideoDetails } from '../../../PeerTube/shared/models/videos/video.model'
import { indexVideos, listIndexInstances, refreshVideosIndex } from '../elastic-search-videos'
import { IndexableVideo } from '../../types/video.model'
import { IndexableVideo, IndexableDoc } from '../../types/video.model'
import { inspect } from 'util'
import { getRemovedHosts, listIndexInstancesHost } from '../instances-index'
import { elasticSearch } from '../../helpers/elastic-search'
import { AsyncQueue, queue } from 'async'
type GetVideoQueueParam = { host: string, uuid: string }
export class VideosIndexer extends AbstractScheduler {
@ -16,41 +19,57 @@ export class VideosIndexer extends AbstractScheduler {
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosIndexer
private readonly getVideoQueue: AsyncQueue<GetVideoQueueParam>
private constructor () {
super()
this.getVideoQueue = queue<GetVideoQueueParam, Error>((task, cb) => {
this.indexSpecificVideo(task.host, task.uuid)
.then(() => cb())
.catch(err => {
logger.error('Error in index specific video.', { err: inspect(err) })
cb()
})
})
}
protected async internalExecute () {
return this.indexVideos()
return this.runVideosIndexer()
}
private async indexVideos () {
private async runVideosIndexer () {
const dbHosts = await listIndexInstances()
const indexHosts = (await listIndexInstancesHost()).filter(h => h === 'peertube.cpy.re')
const hostsToRemove = getRemovedHosts(dbHosts, indexHosts)
await this.removeVideosFromHosts(hostsToRemove)
for (const instance of indexHosts) {
for (const host of indexHosts) {
try {
let videos: IndexableVideo[] = []
let start = 0
do {
videos = await this.getVideos(instance, start)
videos = await this.getVideos(host, start)
start += videos.length
logger.debug('Getting %d results from %s (from = %d).', videos.length, instance, start)
logger.debug('Getting %d results from %s (from = %d).', videos.length, host, start)
if (videos.length !== 0) {
await indexVideos(videos)
const { created } = await indexVideos(videos)
// Fetch complete video foreach created video (to get tags)
for (const c of created) {
this.getVideoQueue.push({ uuid: c, host })
}
}
} while (videos.length === INDEXER_COUNT.VIDEOS && start < 500)
logger.info('Added video data from %s.', instance)
logger.info('Added video data from %s.', host)
} catch (err) {
console.error(inspect(err, { depth: 10 }))
logger.warn('Cannot index videos from %s.', instance, { err })
logger.warn('Cannot index videos from %s.', host, { err })
}
}
@ -71,7 +90,18 @@ export class VideosIndexer extends AbstractScheduler {
json: true
})
return res.body.data.map(v => Object.assign(v, { elasticSearchId: host + v.id, host }))
return res.body.data.map(v => this.prepareVideoForDB(v, host))
}
private async getVideo (host: string, uuid: string): Promise<IndexableVideo> {
const url = 'https://' + host + '/api/v1/videos/' + uuid
const res = await doRequest<VideoDetails>({
uri: url,
json: true
})
return this.prepareVideoForDB(res.body, host)
}
private removeVideosFromHosts (hosts: string[]) {
@ -79,20 +109,34 @@ export class VideosIndexer extends AbstractScheduler {
logger.info('Will remove videos from hosts.', { hosts })
const should = hosts.map(host => ({ term: { host } }))
return elasticSearch.delete_by_query({
index: CONFIG.ELASTIC_SEARCH.INDEXES.VIDEOS,
body: {
query: {
bool: {
should
filter: {
terms: {
host: hosts
}
}
}
}
}
})
}
private async indexSpecificVideo (host: string, uuid: string) {
const video = await this.getVideo(host, uuid)
logger.info('Indexing specific video %s of %s.', uuid, host)
await indexVideos([ video ])
}
private prepareVideoForDB <T extends Video> (video: T, host: string): T & IndexableDoc {
return Object.assign(video, { elasticSearchId: host + video.id, host })
}
static get Instance () {
return this.instance || (this.instance = new this())
}

View File

@ -1,14 +1,22 @@
import * as express from 'express'
import 'express-validator'
function setDefaultSort (req: express.Request, res: express.Response, next: express.NextFunction) {
if (!req.query.sort) req.query.sort = '-createdAt'
const setDefaultSort = setDefaultSortFactory('-createdAt')
return next()
}
const setDefaultSearchSort = setDefaultSortFactory('-match')
// ---------------------------------------------------------------------------
export {
setDefaultSort
setDefaultSort,
setDefaultSearchSort
}
// ---------------------------------------------------------------------------
function setDefaultSortFactory (sort: string) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
if (!req.query.sort) req.query.sort = sort
return next()
}
}

View File

@ -0,0 +1,68 @@
import { query } from 'express-validator'
import * as express from 'express'
import { isDateValid, toArray } from '../../helpers/custom-validators/misc'
import { isNSFWQueryValid, isNumberArray, isStringArray } from '../../helpers/custom-validators/search-videos'
import { logger } from '../../helpers/logger'
import { areValidationErrors } from './utils'
const commonVideosFiltersValidator = [
query('categoryOneOf')
.optional()
.customSanitizer(toArray)
.custom(isNumberArray).withMessage('Should have a valid one of category array'),
query('licenceOneOf')
.optional()
.customSanitizer(toArray)
.custom(isNumberArray).withMessage('Should have a valid one of licence array'),
query('languageOneOf')
.optional()
.customSanitizer(toArray)
.custom(isStringArray).withMessage('Should have a valid one of language array'),
query('tagsOneOf')
.optional()
.customSanitizer(toArray)
.custom(isStringArray).withMessage('Should have a valid one of tags array'),
query('tagsAllOf')
.optional()
.customSanitizer(toArray)
.custom(isStringArray).withMessage('Should have a valid all of tags array'),
query('nsfw')
.optional()
.custom(isNSFWQueryValid).withMessage('Should have a valid NSFW attribute'),
(req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking commons video filters query', { parameters: req.query })
if (areValidationErrors(req, res)) return
return next()
}
]
const videosSearchValidator = [
query('search').optional().not().isEmpty().withMessage('Should have a valid search'),
query('startDate').optional().custom(isDateValid).withMessage('Should have a valid start date'),
query('endDate').optional().custom(isDateValid).withMessage('Should have a valid end date'),
query('originallyPublishedStartDate').optional().custom(isDateValid).withMessage('Should have a valid published start date'),
query('originallyPublishedEndDate').optional().custom(isDateValid).withMessage('Should have a valid published end date'),
query('durationMin').optional().isInt().withMessage('Should have a valid min duration'),
query('durationMax').optional().isInt().withMessage('Should have a valid max duration'),
(req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking videos search query', { parameters: req.query })
if (areValidationErrors(req, res)) return
return next()
}
]
// ---------------------------------------------------------------------------
export {
commonVideosFiltersValidator,
videosSearchValidator
}

View File

@ -1,5 +1,12 @@
import { SORTABLE_COLUMNS } from '../../initializers/constants'
import { checkSort, createSortableColumns } from './utils'
const SORTABLE_VIDEOS_SEARCH_COLUMNS = createSortableColumns(SORTABLE_COLUMNS.VIDEOS_SEARCH)
const videosSearchSortValidator = checkSort(SORTABLE_VIDEOS_SEARCH_COLUMNS)
// ---------------------------------------------------------------------------
export {
videosSearchSortValidator
}

View File

@ -0,0 +1,3 @@
import { VideosSearchQuery as PeerTubeVideosSearchQuery} from '../../PeerTube/shared/models/search/videos-search-query.model'
export type VideosSearchQuery = Omit<PeerTubeVideosSearchQuery, 'skipCount' | 'filter'>

View File

@ -1,6 +1,22 @@
import { Video } from '@shared/models/videos/video.model'
import { Video, VideoDetails } from '@shared/models/videos/video.model'
export interface IndexableVideo extends Video {
export interface IndexableDoc {
elasticSearchId: string
host: string
}
export interface IndexableVideo extends Video, IndexableDoc {
}
export interface IndexableVideoDetails extends VideoDetails, IndexableDoc {
}
export interface DBVideoDetails extends Omit<VideoDetails, 'isLocal'> {
indexedAt: Date
host: string
}
export interface DBVideo extends Omit<Video, 'isLocal'> {
indexedAt: Date
host: string
}