From c9e2803519ff500d0a1fbf389f58421a9f88b291 Mon Sep 17 00:00:00 2001 From: wryk Date: Fri, 21 Feb 2020 23:31:30 +0100 Subject: [PATCH] fix memory leak --- src/services/mastodon.js | 76 +++++++++++++++++++++++++++++----------- src/services/misc.js | 18 ++++++---- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/src/services/mastodon.js b/src/services/mastodon.js index c2b5484..4ca5657 100644 --- a/src/services/mastodon.js +++ b/src/services/mastodon.js @@ -1,6 +1,6 @@ import Observable from 'core-js-pure/features/observable' import getUrls from 'get-urls' -import { observableToAsyncIterator, raceIterator, urlsToMedia } from '/services/misc.js' +import { urlsToMedia } from '/services/misc.js' const LINK_RE = /<(.+?)>; rel="(\w+)"/gi @@ -22,7 +22,6 @@ export async function* statusIterator({ domain, id }) { yield await fetchStatus(domain, id) } -// Observable<{ domain : string, hashtag : string, status : Status}> export const hashtagStreamingObservable = (domain, hashtag) => { return new Observable(observer => { const onOpen = () => { @@ -47,6 +46,7 @@ export const hashtagStreamingObservable = (domain, hashtag) => { eventSource.addEventListener('error', onError) return () => { + console.log(`Streaming ${domain} #${hashtag} : closed`) eventSource.removeEventListener('open', onOpen) eventSource.removeEventListener('update', onStatus) eventSource.removeEventListener('error', onError) @@ -55,6 +55,18 @@ export const hashtagStreamingObservable = (domain, hashtag) => { }) } +export const hashtagsStreamingObservable = (domain, hashtags) => { + return new Observable(observer => { + const subscriptions = hashtags + .map(hashtag => hashtagStreamingObservable(domain, hashtag)) + .map(observable => observable.subscribe(observer)) + + return () => { + subscriptions.forEach(subscription => subscription.unsubscribe()) + } + }) +} + export async function* hashtagTimelineIterator (domain, hashtag) { let nextLink = `https://${domain}/api/v1/timelines/tag/${hashtag}?limit=40` @@ -73,32 +85,54 @@ export async function* hashtagTimelineIterator (domain, hashtag) { } } -export const hashtagIterator = (domain, hashtag) => { - return raceIterator([ - observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)), - hashtagTimelineIterator(domain, hashtag) - ]) -} - -export async function* hashtagsIterator (domain, hashtags) { - const iterators = hashtags.map(hashtag => hashtagIterator(domain, hashtag)) - const values = iterators.map(iterator => iterator.next()) +export async function* hashtagsTimelineIterator (domain, hashtags) { + const iterators = hashtags.map(hashtag => hashtagTimelineIterator(domain, hashtag)) + const promises = iterators.map(iterator => iterator.next()) while (true) { - const promises = values.map((promise, index) => promise.then(result => ({ index, result }))) - const promisesValues = await Promise.all(promises) + const results = (await Promise.all(promises)) + .map((result, index) => ({ index, result })) + .filter(({ result }) => !result.done) - const sorted = promisesValues - .sort((a, b) =>{ - a.result.value.date - b.result.value.date - }) + if (results.length > 0) { + const sorted = results.sort((a, b) => b.result.value.date - a.result.value.date) + const { index, result: { value } } = sorted[0] - const { index, result: { done, value } } = sorted[0] - values[index] = iterators[index].next() - yield value + promises[index] = iterators[index].next() + yield value + } else { + break + } + } + +} + +export async function* hashtagsIterator(domain, hashtags) { + const buffer = [] + + const streamingSubscription = hashtagsStreamingObservable(domain, hashtags).subscribe({ + next: value => buffer.push(value), + error: error => console.error(error), + complete: () => console.log('complete') + }) + + const timelineGenerator = hashtagsTimelineIterator(domain, hashtags) + + try { + while (true) { + if (buffer.length > 0) { + yield buffer.pop() + } else { + yield (await timelineGenerator.next()).value + } + } + } finally { + streamingSubscription.unsubscribe() + timelineGenerator.return() } } + const processStatus = (domain, status) => ({ title: '', date: new Date(status.created_at), diff --git a/src/services/misc.js b/src/services/misc.js index a8b8c91..db7e2e3 100644 --- a/src/services/misc.js +++ b/src/services/misc.js @@ -91,13 +91,17 @@ export async function* raceIterator(iterators) { } } -export async function* tracksIterator(statusesIterator, cache) { - yield* execPipe( - statusesIterator, - asyncFilter(track => track != null), // should not be necessary - asyncFilter(notKnown(cache)), - asyncMap(completeTrack) - ) +export async function* tracksIterator(statusesGenerator, cache) { + try { + yield* execPipe( + statusesGenerator, + asyncFilter(track => track != null), // should not be necessary + asyncFilter(notKnown(cache)), + asyncMap(completeTrack) + ) + } finally { + statusesGenerator.return() + } } const notKnown = cache => track => {