diff --git a/src/services/mastodon.js b/src/services/mastodon.js index c3e577b..8e97aef 100644 --- a/src/services/mastodon.js +++ b/src/services/mastodon.js @@ -15,14 +15,29 @@ function parseLinkHeader(link) { export const hashtagStreamingObservable = (domain, hashtag) => { return new Observable(observer => { - const onStatus = (event) => observer.next(JSON.parse(event.data)) - const onError = (error) => observer.error(error) + const onOpen = () => { + console.log(`Streaming ${domain} #${hashtag} : open`) + } + + const onStatus = event => { + const status = JSON.parse(event.data) + console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`) + observer.next(status) + } + + const onError = error => { + console.error(`Streaming ${domain} #${hashtag} : error`) + console.error(error) + observer.error(error) + } const eventSource = new EventSource(`https://${domain}/api/v1/streaming/hashtag?tag=${hashtag}`) + eventSource.addEventListener('open', onOpen) eventSource.addEventListener('update', onStatus) eventSource.addEventListener('error', onError) return () => { + eventSource.removeEventListener('open', onOpen) eventSource.removeEventListener('update', onStatus) eventSource.removeEventListener('error', onError) } @@ -39,7 +54,11 @@ export async function* hashtagTimelineIterator (domain, hashtag) { ? parseLinkHeader(response.headers.get('link')).next : null - yield* await response.json() + const statuses = await response.json() + + console.log(`Timeline ${domain} #${hashtag} : fetched ${statuses.length} statuses`) + + yield* statuses } } @@ -47,26 +66,16 @@ export async function* hashtagIterator(domain, hashtag) { const newerIterator = observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)) const olderIterator = hashtagTimelineIterator(domain, hashtag) - let newer = newerIterator.next() - let older = olderIterator.next() + const iterators = [newerIterator, olderIterator] + const values = iterators.map(iterator => iterator.next()) while (true) { - const promises = [newer, older].map((promise, index) => promise.then(result => ({ index, result }))) + const promises = values.map((promise, index) => promise.then(result => ({ index, result }))) const { index, result: { done, value } } = await Promise.race(promises) - switch (index) { - default: - throw new Error() - - case 0: - newer = newerIterator.next() - break; - - case 1: - older = olderIterator.next() - break; - } + values[index] = iterators[index].next() + console.log(`Resolver ${domain} #${hashtag} : resolved with iterator ${index} status ${value.id}`) yield value } } diff --git a/src/services/misc.js b/src/services/misc.js index 767b4c0..d78047e 100644 --- a/src/services/misc.js +++ b/src/services/misc.js @@ -54,7 +54,7 @@ export async function* observableToAsyncIterator(observable) { const value = await buffer[0].promise buffer.unshift() - // might cause a early complete because done can be true true when more than one item are in buffer + // might cause a early complete because done can be true when more than one item are in buffer if (done) { return value } else { @@ -90,19 +90,27 @@ export async function* mkTracksIterator(statusesIterator) { const tracks = execPipe( statusesIterator, asyncFilter(status => { - if (knownStatus.has(status.id)) { + if (!status) { + console.error(`No status, should not happen here`) return false } else { - knownStatus.add(status.id) - return true + if (knownStatus.has(status.id)) { + console.log(`Drop already processed status ${status.id}`) + return false + } else { + knownStatus.add(status.id) + return true + } } }), asyncMap(status => ({ status, data: mkData(status) })), - asyncFilter(({ data }) => { + asyncFilter(({ status, data }) => { if (!data) { + console.log(`Drop non processable status ${status.id}`) return false } else { if (knownYoutube.has(data.id)) { + console.log(`Drop already processed youtube ${data.id}`) return false } else { knownYoutube.add(data.id)