fix memory leak

This commit is contained in:
wryk 2020-02-21 23:31:30 +01:00
parent 8037bc8711
commit c9e2803519
2 changed files with 66 additions and 28 deletions

View File

@ -1,6 +1,6 @@
import Observable from 'core-js-pure/features/observable' import Observable from 'core-js-pure/features/observable'
import getUrls from 'get-urls' import getUrls from 'get-urls'
import { observableToAsyncIterator, raceIterator, urlsToMedia } from '/services/misc.js' import { urlsToMedia } from '/services/misc.js'
const LINK_RE = /<(.+?)>; rel="(\w+)"/gi const LINK_RE = /<(.+?)>; rel="(\w+)"/gi
@ -22,7 +22,6 @@ export async function* statusIterator({ domain, id }) {
yield await fetchStatus(domain, id) yield await fetchStatus(domain, id)
} }
// Observable<{ domain : string, hashtag : string, status : Status}>
export const hashtagStreamingObservable = (domain, hashtag) => { export const hashtagStreamingObservable = (domain, hashtag) => {
return new Observable(observer => { return new Observable(observer => {
const onOpen = () => { const onOpen = () => {
@ -47,6 +46,7 @@ export const hashtagStreamingObservable = (domain, hashtag) => {
eventSource.addEventListener('error', onError) eventSource.addEventListener('error', onError)
return () => { return () => {
console.log(`Streaming ${domain} #${hashtag} : closed`)
eventSource.removeEventListener('open', onOpen) eventSource.removeEventListener('open', onOpen)
eventSource.removeEventListener('update', onStatus) eventSource.removeEventListener('update', onStatus)
eventSource.removeEventListener('error', onError) eventSource.removeEventListener('error', onError)
@ -55,6 +55,18 @@ export const hashtagStreamingObservable = (domain, hashtag) => {
}) })
} }
export const hashtagsStreamingObservable = (domain, hashtags) => {
return new Observable(observer => {
const subscriptions = hashtags
.map(hashtag => hashtagStreamingObservable(domain, hashtag))
.map(observable => observable.subscribe(observer))
return () => {
subscriptions.forEach(subscription => subscription.unsubscribe())
}
})
}
export async function* hashtagTimelineIterator (domain, hashtag) { export async function* hashtagTimelineIterator (domain, hashtag) {
let nextLink = `https://${domain}/api/v1/timelines/tag/${hashtag}?limit=40` let nextLink = `https://${domain}/api/v1/timelines/tag/${hashtag}?limit=40`
@ -73,32 +85,54 @@ export async function* hashtagTimelineIterator (domain, hashtag) {
} }
} }
export const hashtagIterator = (domain, hashtag) => { export async function* hashtagsTimelineIterator (domain, hashtags) {
return raceIterator([ const iterators = hashtags.map(hashtag => hashtagTimelineIterator(domain, hashtag))
observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)), const promises = iterators.map(iterator => iterator.next())
hashtagTimelineIterator(domain, hashtag)
])
}
export async function* hashtagsIterator (domain, hashtags) {
const iterators = hashtags.map(hashtag => hashtagIterator(domain, hashtag))
const values = iterators.map(iterator => iterator.next())
while (true) { while (true) {
const promises = values.map((promise, index) => promise.then(result => ({ index, result }))) const results = (await Promise.all(promises))
const promisesValues = await Promise.all(promises) .map((result, index) => ({ index, result }))
.filter(({ result }) => !result.done)
const sorted = promisesValues if (results.length > 0) {
.sort((a, b) =>{ const sorted = results.sort((a, b) => b.result.value.date - a.result.value.date)
a.result.value.date - b.result.value.date const { index, result: { value } } = sorted[0]
promises[index] = iterators[index].next()
yield value
} else {
break
}
}
}
export async function* hashtagsIterator(domain, hashtags) {
const buffer = []
const streamingSubscription = hashtagsStreamingObservable(domain, hashtags).subscribe({
next: value => buffer.push(value),
error: error => console.error(error),
complete: () => console.log('complete')
}) })
const { index, result: { done, value } } = sorted[0] const timelineGenerator = hashtagsTimelineIterator(domain, hashtags)
values[index] = iterators[index].next()
yield value try {
while (true) {
if (buffer.length > 0) {
yield buffer.pop()
} else {
yield (await timelineGenerator.next()).value
}
}
} finally {
streamingSubscription.unsubscribe()
timelineGenerator.return()
} }
} }
const processStatus = (domain, status) => ({ const processStatus = (domain, status) => ({
title: '', title: '',
date: new Date(status.created_at), date: new Date(status.created_at),

View File

@ -91,13 +91,17 @@ export async function* raceIterator(iterators) {
} }
} }
export async function* tracksIterator(statusesIterator, cache) { export async function* tracksIterator(statusesGenerator, cache) {
try {
yield* execPipe( yield* execPipe(
statusesIterator, statusesGenerator,
asyncFilter(track => track != null), // should not be necessary asyncFilter(track => track != null), // should not be necessary
asyncFilter(notKnown(cache)), asyncFilter(notKnown(cache)),
asyncMap(completeTrack) asyncMap(completeTrack)
) )
} finally {
statusesGenerator.return()
}
} }
const notKnown = cache => track => { const notKnown = cache => track => {