@@ -28,8 +28,8 @@
import Controls from '/components/Controls.svelte'
import Queue from '/components/Queue.svelte'
import Viewer from '/components/Viewer.svelte'
- import { hashtagIterator, combinedIterator } from '/services/mastodon.js'
- import { mkTracksIterator } from '/services/misc.js'
+ import { hashtagsIterator } from '/services/mastodon.js'
+ import { tracksIterator } from '/services/misc.js'
import { domain, hashtags, queue, next, current, enqueueing, select } from '/store.js'
@@ -37,13 +37,10 @@
let currentUnsubcribe = null
onMount(async () => {
- // const iterator = mkTracksIterator(hashtagIterator(get(domain), get(hashtags)[0]))
const domainValue = get(domain)
const hashtagsValue = get(hashtags)
- const iterator = combinedIterator(
- hashtagsValue.map(hashtag => mkTracksIterator(hashtagIterator(domainValue, hashtag)))
- )
+ const iterator = tracksIterator(hashtagsIterator(domainValue, hashtagsValue))
const { value: first } = await iterator.next()
diff --git a/src/components/Viewer.svelte b/src/components/Viewer.svelte
index 863b092..f5c41e6 100644
--- a/src/components/Viewer.svelte
+++ b/src/components/Viewer.svelte
@@ -1,7 +1,7 @@
; rel="(\w+)"/gi
-export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`).then(x => x.json())
-
function parseLinkHeader(link) {
const links = {}
@@ -15,6 +13,9 @@ function parseLinkHeader(link) {
return links
}
+export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`).then(x => x.json())
+
+// Observable<{ domain : string, hashtag : string, status : Status}>
export const hashtagStreamingObservable = (domain, hashtag) => {
return new Observable(observer => {
const onOpen = () => {
@@ -24,7 +25,7 @@ export const hashtagStreamingObservable = (domain, hashtag) => {
const onStatus = event => {
const status = JSON.parse(event.data)
console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`)
- observer.next(status)
+ observer.next(processStatus(domain, status))
}
const onError = error => {
@@ -60,29 +61,19 @@ export async function* hashtagTimelineIterator (domain, hashtag) {
console.log(`Timeline ${domain} #${hashtag} : fetched ${statuses.length} statuses`)
- yield* statuses
+ yield* statuses.map(status => processStatus(domain, status))
}
}
-export async function* hashtagIterator(domain, hashtag) {
- const newerIterator = observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag))
- const olderIterator = hashtagTimelineIterator(domain, hashtag)
-
- const iterators = [newerIterator, olderIterator]
- const values = iterators.map(iterator => iterator.next())
-
- while (true) {
- const promises = values.map((promise, index) => promise.then(result => ({ index, result })))
- const { index, result: { done, value } } = await Promise.race(promises)
-
- values[index] = iterators[index].next()
-
- console.log(`Resolver ${domain} #${hashtag} : resolved with iterator ${index}`)
- yield value
- }
+export const hashtagIterator = (domain, hashtag) => {
+ return raceIterator([
+ observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)),
+ hashtagTimelineIterator(domain, hashtag)
+ ])
}
-export async function* combinedIterator(iterators) {
+export async function* hashtagsIterator (domain, hashtags) {
+ const iterators = hashtags.map(hashtag => hashtagIterator(domain, hashtag))
const values = iterators.map(iterator => iterator.next())
while (true) {
@@ -91,14 +82,24 @@ export async function* combinedIterator(iterators) {
const sorted = promisesValues
.sort((a, b) =>{
- new Date(a.result.value.status.created_at) - new Date(b.result.value.status.created_at)
+ a.result.value.date - b.result.value.date
})
const { index, result: { done, value } } = sorted[0]
-
values[index] = iterators[index].next()
-
- console.log(`CombinedResolver : resolved with iterator ${index}`)
yield value
}
}
+
+const processStatus = (domain, status) => ({
+ title: null,
+ username: status.account.username,
+ date: new Date(status.createdAt),
+ content: status.content,
+ referer: {
+ url: status.url,
+ credentials: { type: 'mastodon', domain, id: status.id }
+ },
+ media: null
+})
+
diff --git a/src/services/misc.js b/src/services/misc.js
index 8794889..fbafd46 100644
--- a/src/services/misc.js
+++ b/src/services/misc.js
@@ -1,5 +1,5 @@
import getUrls from 'get-urls'
-import { execPipe, asyncFilter, asyncMap } from 'iter-tools'
+import { execPipe, asyncFilter, asyncMap, map, findOr } from 'iter-tools'
export const tap = f => x => {
f(x)
@@ -80,65 +80,159 @@ export const secondsToElapsedTime = (seconds) => {
.join(':')
}
-export async function* mkTracksIterator(statusesIterator) {
- const knownStatus = new Set()
- const knownYoutube = new Set()
+export async function* raceIterator(iterators) {
+ const values = iterators.map(iterator => iterator.next())
- const tracks = execPipe(
- statusesIterator,
- asyncFilter(status => {
- if (!status) {
- console.error(`No status, should not happen here`)
- return false
- } else {
- if (knownStatus.has(status.id)) {
- console.log(`Drop already processed status ${status.id}`)
- return false
- } else {
- knownStatus.add(status.id)
- return true
- }
- }
- }),
- asyncMap(status => ({ status, data: mkData(status) })),
- asyncFilter(({ status, data }) => {
- if (!data) {
- console.log(`Drop non processable status ${status.id}`)
- return false
- } else {
- if (knownYoutube.has(data.id)) {
- console.log(`Drop already processed youtube ${data.id}`)
- return false
- } else {
- knownYoutube.add(data.id)
- return true
- }
- }
- }),
- asyncMap(async ({ status, data }) => ({ status, data, metadata: await mkMetadata(data) }))
- )
+ while (true) {
+ const promises = values.map((promise, index) => promise.then(result => ({ index, result })))
+ const { index, result: { done, value } } = await Promise.race(promises)
- yield* tracks
+ values[index] = iterators[index].next()
+ yield value
+ }
}
-function mkData(status)
-{
- const urls = getUrls(status.content)
+const mkMapSet = () => ({ set: new Set(), children: new Map() })
- for (const urlAsString of urls) {
- const url = new URL(urlAsString)
+const pathSet = () => {
+ const root = mkMapSet()
- if (['youtube.com', 'm.youtube.com', 'music.youtube.com'].includes(url.hostname) && url.searchParams.has('v')) {
- return { url: urlAsString, id: url.searchParams.get('v') }
- } else if (url.hostname === 'youtu.be') {
- return { url: urlAsString, id: url.pathname.substring(1) }
+ const has = (keys, value) => {
+ let x = root
+
+ for (const key of keys) {
+ if (x.children.has(key)) {
+ x = x.children.get(key)
+ } else {
+ return false
+ }
}
+
+ return x.set.has(value)
}
- return null
+ const add = (keys, value) => {
+ let x = root
+
+ for (const key of keys) {
+ if (!x.children.has(key)) {
+ x.children.set(key, mkMapSet())
+ }
+
+ x = x.children.get(key)
+ }
+
+ x.set.add(value)
+ }
+
+ return { root, has, add }
}
-async function mkMetadata(entry) {
- return fetch(`https://noembed.com/embed?url=https://www.youtube.com/watch?v=${entry.id}`)
- .then(response => response.json())
+export async function* tracksIterator(statusesIterator) {
+ const known = pathSet()
+
+ yield* execPipe(
+ statusesIterator,
+ asyncFilter(knownByReferer(known)),
+ asyncMap(processReferer),
+ asyncFilter(knownByMedia(known)),
+ asyncMap(processMedia)
+ )
+}
+
+const knownByReferer = known => track => {
+ if (!track) {
+ console.error(`No status, should not happen here`)
+ return false
+ } else {
+ switch (track.referer.credentials.type) {
+ default:
+ throw new Error()
+
+ case 'mastodon':
+ const path = [
+ 'referer',
+ 'mastodon',
+ track.referer.credentials.domain
+ ]
+
+ const id = track.referer.credentials.id
+
+ if (known.has(path, id)) {
+ console.log(`Drop already processed referer ${id}`)
+ return false
+ } else {
+ known.add(path, id)
+ return true
+ }
+ }
+ }
+}
+
+const knownByMedia = known => track => {
+ if (track !== null) {
+ switch (track.media.credentials.type) {
+ default:
+ throw new Error()
+
+ case 'youtube':
+ const path = [
+ 'media',
+ 'youtube'
+ ]
+
+ const id = track.media.credentials.id
+
+ if (known.has(path, id)) {
+ console.log(`Drop already processed media ${id}`)
+ return false
+ } else {
+ known.add(path, id)
+ return true
+ }
+ }
+ } else {
+ return false
+ }
+}
+
+const processReferer = track => {
+ const urls = getUrls(track.content)
+
+ const media = execPipe(
+ urls,
+ map(parseSource),
+ findOr(null, x => x !== null)
+ )
+
+ if (media) {
+ return { ...track, media }
+ } else {
+ return null
+ }
+}
+
+const processMedia = async track => {
+ const metadata = await fetchMetadata(track.media)
+ return { ...track, title: metadata.title }
+}
+
+const parseSource = (url) => {
+ const { hostname, pathname, searchParams } = new URL(url)
+
+ if (['youtube.com', 'm.youtube.com', 'music.youtube.com'].includes(hostname) && searchParams.has('v')) {
+ return { url, credentials: { type: 'youtube', id: searchParams.get('v') } }
+ } else if (hostname === 'youtu.be') {
+ return { url, credentials: { type: 'youtube', id: pathname.substring(1) } }
+ } else {
+ return null
+ }
+}
+
+const fetchMetadata = (media) => {
+ switch (media.credentials.type) {
+ case 'youtube':
+ return fetch(`https://noembed.com/embed?url=https://www.youtube.com/watch?v=${media.credentials.id}`)
+ .then(response => response.json())
+ }
}
\ No newline at end of file
diff --git a/src/store.js b/src/store.js
index 08a6aba..0459860 100644
--- a/src/store.js
+++ b/src/store.js
@@ -31,7 +31,7 @@ export const canNext = derived([queue, index], ([$queue, $index]) => $index !==
export const select = track => {
- console.log(`Select ${track.metadata.title}`)
+ console.log(`Select ${track.title}`)
current.set(track)
}