diff --git a/scripts/.gitignore b/scripts/.gitignore index 8c7b543b8d..9becffee7d 100644 --- a/scripts/.gitignore +++ b/scripts/.gitignore @@ -1,2 +1,2 @@ -logs/ -database/ \ No newline at end of file +/logs/ +/database/ \ No newline at end of file diff --git a/scripts/commands/database/cleanup.js b/scripts/commands/database/cleanup.js new file mode 100644 index 0000000000..ede11d701e --- /dev/null +++ b/scripts/commands/database/cleanup.js @@ -0,0 +1,25 @@ +const { db, logger } = require('../../core') + +async function main() { + logger.info(`loading streams...`) + await db.streams.load() + let streams = await db.streams.find({}) + + logger.info(`removing broken links...`) + let removed = 0 + const buffer = {} + for (const stream of streams) { + const duplicate = buffer[stream.channel_id] + if (duplicate && ['offline', 'timeout'].includes(stream.status.code)) { + await db.streams.remove({ _id: stream._id }) + removed++ + } else { + buffer[stream.channel_id] = stream + } + } + db.streams.compact() + + logger.info(`removed ${removed} links`) +} + +main() diff --git a/scripts/commands/database/create.js b/scripts/commands/database/create.js new file mode 100644 index 0000000000..7611872adf --- /dev/null +++ b/scripts/commands/database/create.js @@ -0,0 +1,82 @@ +const { db, file, parser, store, logger, id } = require('../../core') +const { program } = require('commander') +const _ = require('lodash') + +const options = program + .option( + '--max-clusters ', + 'Set maximum number of clusters', + parser.parseNumber, + 256 + ) + .option('--input-dir ', 'Set path to input directory', 'channels') + .parse(process.argv) + .opts() + +async function main() { + logger.info('starting...') + logger.info(`number of clusters: ${options.maxClusters}`) + + await saveToDatabase(await findStreams()) + + logger.info('done') +} + +main() + +async function findStreams() { + logger.info(`looking for streams...`) + + await db.streams.load() + const files = await file.list(`${options.inputDir}/**/*.m3u`) + const streams = [] + for (const filepath of files) { + const items = await parser.parsePlaylist(filepath) + for (const item of items) { + item.filepath = filepath + streams.push(item) + } + } + logger.info(`found ${streams.length} streams`) + + return streams +} + +async function saveToDatabase(streams = []) { + logger.info('saving to the database...') + + await db.streams.reset() + const chunks = split(_.shuffle(streams), options.maxClusters) + for (const [i, chunk] of chunks.entries()) { + for (const item of chunk) { + const stream = store.create() + stream.set('channel_id', { channel_id: item.tvg.id }) + stream.set('channel_name', { title: item.name }) + stream.set('filepath', { filepath: item.filepath }) + stream.set('resolution', { title: item.name }) + stream.set('status', { title: item.name }) + stream.set('url', { url: item.url }) + stream.set('http', { http: item.http }) + stream.set('is_broken', { status: stream.get('status') }) + stream.set('updated', { updated: false }) + stream.set('cluster_id', { cluster_id: i + 1 }) + + if (!stream.get('channel_id')) { + const channel_id = id.generate(item.name, item.filepath) + + stream.set('channel_id', { channel_id }) + stream.set('updated', { updated: true }) + } + + await db.streams.insert(stream.data()) + } + } +} + +function split(arr, n) { + let result = [] + for (let i = n; i > 0; i--) { + result.push(arr.splice(0, Math.ceil(arr.length / i))) + } + return result +} diff --git a/scripts/commands/database/matrix.js b/scripts/commands/database/matrix.js new file mode 100644 index 0000000000..f51e37f0b1 --- /dev/null +++ b/scripts/commands/database/matrix.js @@ -0,0 +1,16 @@ +const { logger, db } = require('../../core') + +async function main() { + await db.streams.load() + const docs = await db.streams.find({}).sort({ cluster_id: 1 }) + const cluster_id = docs.reduce((acc, curr) => { + if (!acc.includes(curr.cluster_id)) acc.push(curr.cluster_id) + return acc + }, []) + + const matrix = { cluster_id } + const output = `::set-output name=matrix::${JSON.stringify(matrix)}` + logger.info(output) +} + +main() diff --git a/scripts/commands/database/update.js b/scripts/commands/database/update.js new file mode 100644 index 0000000000..c1786bc9ad --- /dev/null +++ b/scripts/commands/database/update.js @@ -0,0 +1,176 @@ +const { db, store, parser, file, logger } = require('../../core') +const statuses = require('../../data/statuses') +const _ = require('lodash') + +const items = [] + +const LOGS_DIR = process.env.LOGS_DIR || 'scripts/logs/cluster/load' + +async function main() { + let streams = await loadStreams() + const results = await loadResults() + const origins = await findOrigins(results) + streams = await updateStreams(streams, results, origins) + + await updateDatabase(streams) +} + +main() + +async function loadStreams() { + logger.info('loading streams...') + + await db.streams.load() + const streams = await db.streams.find({}) + + logger.info(`found ${streams.length} streams`) + + return streams +} + +async function loadResults() { + logger.info('loading results from logs...') + + const results = {} + const files = await file.list(`${LOGS_DIR}/cluster_*.log`) + for (const filepath of files) { + const parsed = await parser.parseLogs(filepath) + for (const item of parsed) { + results[item._id] = item + } + } + + logger.info(`found ${Object.values(results).length} results`) + + return results +} + +async function findOrigins(results = {}) { + logger.info('searching for stream origins...') + + const origins = {} + for (const { error, requests } of Object.values(results)) { + if (error || !Array.isArray(requests) || !requests.length) continue + + let origin = requests.shift() + origin = new URL(origin.url) + for (const request of requests) { + const curr = new URL(request.url) + const key = curr.href.replace(/(^\w+:|^)/, '') + if (!origins[key] && curr.host === origin.host) { + origins[key] = origin.href + } + } + } + + logger.info(`found ${_.uniq(Object.values(origins)).length} origins`) + + return origins +} + +async function updateStreams(items = [], results = {}, origins = {}) { + logger.info('updating streams...') + + let updated = 0 + const output = [] + for (const item of items) { + const stream = store.create(item) + const result = results[item._id] + + if (result) { + const { error, streams, requests } = result + const resolution = parseResolution(streams) + const origin = findOrigin(requests, origins) + let status = parseStatus(error) + + if (status) { + const prevStatus = item.status + if (prevStatus.code === 'not_247') + // not_247 -> * = not_247 + status = item.status + else if (prevStatus.code === 'geo_blocked') + // geo_blocked -> * = geo_blocked + status = item.status + else if (status.code === 'geo_blocked') + // * -> geo_blocked = * + status = item.status + else if (prevStatus.code === 'offline' && status.code === 'online') + // offline -> online = not_247 + status = statuses['not_247'] + + stream.set('status', { status }) + stream.set('is_broken', { status: stream.get('status') }) + } + + if (resolution) { + stream.set('resolution', { resolution }) + } + + if (origin) { + stream.set('url', { url: origin }) + } + } + + if (stream.changed) { + stream.set('updated', true) + output.push(stream.data()) + updated++ + } + } + + logger.info(`updated ${updated} streams`) + + return output +} + +async function updateDatabase(streams = []) { + logger.info('updating database...') + + for (const stream of streams) { + await db.streams.update({ _id: stream._id }, stream) + } + db.streams.compact() + + logger.info('done') +} + +function findOrigin(requests = [], origins = {}) { + if (origins && Array.isArray(requests)) { + requests = requests.map(r => r.url.replace(/(^\w+:|^)/, '')) + for (const url of requests) { + if (origins[url]) { + return origins[url] + } + } + } + + return null +} + +function parseResolution(streams) { + const resolution = streams + .filter(s => s.codec_type === 'video') + .reduce( + (acc, curr) => { + if (curr.height > acc.height) return { width: curr.width, height: curr.height } + return acc + }, + { width: 0, height: 0 } + ) + + if (resolution.width > 0 && resolution.height > 0) return resolution + return null +} + +function parseStatus(error) { + if (error) { + if (error.includes('timed out')) { + return statuses['timeout'] + } else if (error.includes('403')) { + return statuses['geo_blocked'] + } + return statuses['offline'] + } + + return statuses['online'] +}