From 49deded9bd9c9201f0101d70a8193113eee1a977 Mon Sep 17 00:00:00 2001 From: wryk Date: Sat, 15 Feb 2020 23:12:53 +0100 Subject: [PATCH] refact status processing --- src/components/Queue.svelte | 10 +- src/components/Radio.svelte | 11 +- src/components/Viewer.svelte | 3 +- src/services/mastodon.js | 53 +++++----- src/services/misc.js | 196 ++++++++++++++++++++++++++--------- src/store.js | 2 +- 6 files changed, 183 insertions(+), 92 deletions(-) diff --git a/src/components/Queue.svelte b/src/components/Queue.svelte index 6645dec..1c2fafa 100644 --- a/src/components/Queue.svelte +++ b/src/components/Queue.svelte @@ -3,8 +3,8 @@ {#if $next}
select($next)}> -
{$next.metadata.title}
-
by {$next.status.account.acct}
+
{$next.title}
+
by {$next.username}
{/if} @@ -15,10 +15,10 @@
HISTORY
- {#each history as track (track.status.id)} + {#each history as track}
select(track)}> -
{track.metadata.title}
-
shared by {track.status.account.acct}
+
{track.title}
+
shared by {track.username}
{/each} diff --git a/src/components/Radio.svelte b/src/components/Radio.svelte index ceeb82f..dbc13ed 100644 --- a/src/components/Radio.svelte +++ b/src/components/Radio.svelte @@ -1,5 +1,5 @@ - {`${ $current ? `${$current.metadata.title} ∴ ` : ''}Eldritch Radio`} + {`${ $current ? `${$current.title} ∴ ` : ''}Eldritch Radio`}
@@ -28,8 +28,8 @@ import Controls from '/components/Controls.svelte' import Queue from '/components/Queue.svelte' import Viewer from '/components/Viewer.svelte' - import { hashtagIterator, combinedIterator } from '/services/mastodon.js' - import { mkTracksIterator } from '/services/misc.js' + import { hashtagsIterator } from '/services/mastodon.js' + import { tracksIterator } from '/services/misc.js' import { domain, hashtags, queue, next, current, enqueueing, select } from '/store.js' @@ -37,13 +37,10 @@ let currentUnsubcribe = null onMount(async () => { - // const iterator = mkTracksIterator(hashtagIterator(get(domain), get(hashtags)[0])) const domainValue = get(domain) const hashtagsValue = get(hashtags) - const iterator = combinedIterator( - hashtagsValue.map(hashtag => mkTracksIterator(hashtagIterator(domainValue, hashtag))) - ) + const iterator = tracksIterator(hashtagsIterator(domainValue, hashtagsValue)) const { value: first } = await iterator.next() diff --git a/src/components/Viewer.svelte b/src/components/Viewer.svelte index 863b092..f5c41e6 100644 --- a/src/components/Viewer.svelte +++ b/src/components/Viewer.svelte @@ -1,7 +1,7 @@
; rel="(\w+)"/gi -export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`).then(x => x.json()) - function parseLinkHeader(link) { const links = {} @@ -15,6 +13,9 @@ function parseLinkHeader(link) { return links } +export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`).then(x => x.json()) + +// Observable<{ domain : string, hashtag : string, status : Status}> export const hashtagStreamingObservable = (domain, hashtag) => { return new Observable(observer => { const onOpen = () => { @@ -24,7 +25,7 @@ export const hashtagStreamingObservable = (domain, hashtag) => { const onStatus = event => { const status = JSON.parse(event.data) console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`) - observer.next(status) + observer.next(processStatus(domain, status)) } const onError = error => { @@ -60,29 +61,19 @@ export async function* hashtagTimelineIterator (domain, hashtag) { console.log(`Timeline ${domain} #${hashtag} : fetched ${statuses.length} statuses`) - yield* statuses + yield* statuses.map(status => processStatus(domain, status)) } } -export async function* hashtagIterator(domain, hashtag) { - const newerIterator = observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)) - const olderIterator = hashtagTimelineIterator(domain, hashtag) - - const iterators = [newerIterator, olderIterator] - const values = iterators.map(iterator => iterator.next()) - - while (true) { - const promises = values.map((promise, index) => promise.then(result => ({ index, result }))) - const { index, result: { done, value } } = await Promise.race(promises) - - values[index] = iterators[index].next() - - console.log(`Resolver ${domain} #${hashtag} : resolved with iterator ${index}`) - yield value - } +export const hashtagIterator = (domain, hashtag) => { + return raceIterator([ + observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)), + hashtagTimelineIterator(domain, hashtag) + ]) } -export async function* combinedIterator(iterators) { +export async function* hashtagsIterator (domain, hashtags) { + const iterators = hashtags.map(hashtag => hashtagIterator(domain, hashtag)) const values = iterators.map(iterator => iterator.next()) while (true) { @@ -91,14 +82,24 @@ export async function* combinedIterator(iterators) { const sorted = promisesValues .sort((a, b) =>{ - new Date(a.result.value.status.created_at) - new Date(b.result.value.status.created_at) + a.result.value.date - b.result.value.date }) const { index, result: { done, value } } = sorted[0] - values[index] = iterators[index].next() - - console.log(`CombinedResolver : resolved with iterator ${index}`) yield value } } + +const processStatus = (domain, status) => ({ + title: null, + username: status.account.username, + date: new Date(status.createdAt), + content: status.content, + referer: { + url: status.url, + credentials: { type: 'mastodon', domain, id: status.id } + }, + media: null +}) + diff --git a/src/services/misc.js b/src/services/misc.js index 8794889..fbafd46 100644 --- a/src/services/misc.js +++ b/src/services/misc.js @@ -1,5 +1,5 @@ import getUrls from 'get-urls' -import { execPipe, asyncFilter, asyncMap } from 'iter-tools' +import { execPipe, asyncFilter, asyncMap, map, findOr } from 'iter-tools' export const tap = f => x => { f(x) @@ -80,65 +80,159 @@ export const secondsToElapsedTime = (seconds) => { .join(':') } -export async function* mkTracksIterator(statusesIterator) { - const knownStatus = new Set() - const knownYoutube = new Set() +export async function* raceIterator(iterators) { + const values = iterators.map(iterator => iterator.next()) - const tracks = execPipe( - statusesIterator, - asyncFilter(status => { - if (!status) { - console.error(`No status, should not happen here`) - return false - } else { - if (knownStatus.has(status.id)) { - console.log(`Drop already processed status ${status.id}`) - return false - } else { - knownStatus.add(status.id) - return true - } - } - }), - asyncMap(status => ({ status, data: mkData(status) })), - asyncFilter(({ status, data }) => { - if (!data) { - console.log(`Drop non processable status ${status.id}`) - return false - } else { - if (knownYoutube.has(data.id)) { - console.log(`Drop already processed youtube ${data.id}`) - return false - } else { - knownYoutube.add(data.id) - return true - } - } - }), - asyncMap(async ({ status, data }) => ({ status, data, metadata: await mkMetadata(data) })) - ) + while (true) { + const promises = values.map((promise, index) => promise.then(result => ({ index, result }))) + const { index, result: { done, value } } = await Promise.race(promises) - yield* tracks + values[index] = iterators[index].next() + yield value + } } -function mkData(status) -{ - const urls = getUrls(status.content) +const mkMapSet = () => ({ set: new Set(), children: new Map() }) - for (const urlAsString of urls) { - const url = new URL(urlAsString) +const pathSet = () => { + const root = mkMapSet() - if (['youtube.com', 'm.youtube.com', 'music.youtube.com'].includes(url.hostname) && url.searchParams.has('v')) { - return { url: urlAsString, id: url.searchParams.get('v') } - } else if (url.hostname === 'youtu.be') { - return { url: urlAsString, id: url.pathname.substring(1) } + const has = (keys, value) => { + let x = root + + for (const key of keys) { + if (x.children.has(key)) { + x = x.children.get(key) + } else { + return false + } } + + return x.set.has(value) } - return null + const add = (keys, value) => { + let x = root + + for (const key of keys) { + if (!x.children.has(key)) { + x.children.set(key, mkMapSet()) + } + + x = x.children.get(key) + } + + x.set.add(value) + } + + return { root, has, add } } -async function mkMetadata(entry) { - return fetch(`https://noembed.com/embed?url=https://www.youtube.com/watch?v=${entry.id}`) - .then(response => response.json()) +export async function* tracksIterator(statusesIterator) { + const known = pathSet() + + yield* execPipe( + statusesIterator, + asyncFilter(knownByReferer(known)), + asyncMap(processReferer), + asyncFilter(knownByMedia(known)), + asyncMap(processMedia) + ) +} + +const knownByReferer = known => track => { + if (!track) { + console.error(`No status, should not happen here`) + return false + } else { + switch (track.referer.credentials.type) { + default: + throw new Error() + + case 'mastodon': + const path = [ + 'referer', + 'mastodon', + track.referer.credentials.domain + ] + + const id = track.referer.credentials.id + + if (known.has(path, id)) { + console.log(`Drop already processed referer ${id}`) + return false + } else { + known.add(path, id) + return true + } + } + } +} + +const knownByMedia = known => track => { + if (track !== null) { + switch (track.media.credentials.type) { + default: + throw new Error() + + case 'youtube': + const path = [ + 'media', + 'youtube' + ] + + const id = track.media.credentials.id + + if (known.has(path, id)) { + console.log(`Drop already processed media ${id}`) + return false + } else { + known.add(path, id) + return true + } + } + } else { + return false + } +} + +const processReferer = track => { + const urls = getUrls(track.content) + + const media = execPipe( + urls, + map(parseSource), + findOr(null, x => x !== null) + ) + + if (media) { + return { ...track, media } + } else { + return null + } +} + +const processMedia = async track => { + const metadata = await fetchMetadata(track.media) + return { ...track, title: metadata.title } +} + +const parseSource = (url) => { + const { hostname, pathname, searchParams } = new URL(url) + + if (['youtube.com', 'm.youtube.com', 'music.youtube.com'].includes(hostname) && searchParams.has('v')) { + return { url, credentials: { type: 'youtube', id: searchParams.get('v') } } + } else if (hostname === 'youtu.be') { + return { url, credentials: { type: 'youtube', id: pathname.substring(1) } } + } else { + return null + } +} + +const fetchMetadata = (media) => { + switch (media.credentials.type) { + case 'youtube': + return fetch(`https://noembed.com/embed?url=https://www.youtube.com/watch?v=${media.credentials.id}`) + .then(response => response.json()) + } } \ No newline at end of file diff --git a/src/store.js b/src/store.js index 08a6aba..0459860 100644 --- a/src/store.js +++ b/src/store.js @@ -31,7 +31,7 @@ export const canNext = derived([queue, index], ([$queue, $index]) => $index !== export const select = track => { - console.log(`Select ${track.metadata.title}`) + console.log(`Select ${track.title}`) current.set(track) }