add logs, refact resolver

This commit is contained in:
wryk 2020-02-04 20:07:34 +01:00
parent 66c57e566e
commit a322474ba3
2 changed files with 40 additions and 23 deletions

View File

@ -15,14 +15,29 @@ function parseLinkHeader(link) {
export const hashtagStreamingObservable = (domain, hashtag) => { export const hashtagStreamingObservable = (domain, hashtag) => {
return new Observable(observer => { return new Observable(observer => {
const onStatus = (event) => observer.next(JSON.parse(event.data)) const onOpen = () => {
const onError = (error) => observer.error(error) console.log(`Streaming ${domain} #${hashtag} : open`)
}
const onStatus = event => {
const status = JSON.parse(event.data)
console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`)
observer.next(status)
}
const onError = error => {
console.error(`Streaming ${domain} #${hashtag} : error`)
console.error(error)
observer.error(error)
}
const eventSource = new EventSource(`https://${domain}/api/v1/streaming/hashtag?tag=${hashtag}`) const eventSource = new EventSource(`https://${domain}/api/v1/streaming/hashtag?tag=${hashtag}`)
eventSource.addEventListener('open', onOpen)
eventSource.addEventListener('update', onStatus) eventSource.addEventListener('update', onStatus)
eventSource.addEventListener('error', onError) eventSource.addEventListener('error', onError)
return () => { return () => {
eventSource.removeEventListener('open', onOpen)
eventSource.removeEventListener('update', onStatus) eventSource.removeEventListener('update', onStatus)
eventSource.removeEventListener('error', onError) eventSource.removeEventListener('error', onError)
} }
@ -39,7 +54,11 @@ export async function* hashtagTimelineIterator (domain, hashtag) {
? parseLinkHeader(response.headers.get('link')).next ? parseLinkHeader(response.headers.get('link')).next
: null : null
yield* await response.json() const statuses = await response.json()
console.log(`Timeline ${domain} #${hashtag} : fetched ${statuses.length} statuses`)
yield* statuses
} }
} }
@ -47,26 +66,16 @@ export async function* hashtagIterator(domain, hashtag) {
const newerIterator = observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag)) const newerIterator = observableToAsyncIterator(hashtagStreamingObservable(domain, hashtag))
const olderIterator = hashtagTimelineIterator(domain, hashtag) const olderIterator = hashtagTimelineIterator(domain, hashtag)
let newer = newerIterator.next() const iterators = [newerIterator, olderIterator]
let older = olderIterator.next() const values = iterators.map(iterator => iterator.next())
while (true) { while (true) {
const promises = [newer, older].map((promise, index) => promise.then(result => ({ index, result }))) const promises = values.map((promise, index) => promise.then(result => ({ index, result })))
const { index, result: { done, value } } = await Promise.race(promises) const { index, result: { done, value } } = await Promise.race(promises)
switch (index) { values[index] = iterators[index].next()
default:
throw new Error()
case 0:
newer = newerIterator.next()
break;
case 1:
older = olderIterator.next()
break;
}
console.log(`Resolver ${domain} #${hashtag} : resolved with iterator ${index} status ${value.id}`)
yield value yield value
} }
} }

View File

@ -54,7 +54,7 @@ export async function* observableToAsyncIterator(observable) {
const value = await buffer[0].promise const value = await buffer[0].promise
buffer.unshift() buffer.unshift()
// might cause a early complete because done can be true true when more than one item are in buffer // might cause a early complete because done can be true when more than one item are in buffer
if (done) { if (done) {
return value return value
} else { } else {
@ -90,19 +90,27 @@ export async function* mkTracksIterator(statusesIterator) {
const tracks = execPipe( const tracks = execPipe(
statusesIterator, statusesIterator,
asyncFilter(status => { asyncFilter(status => {
if (knownStatus.has(status.id)) { if (!status) {
console.error(`No status, should not happen here`)
return false return false
} else { } else {
knownStatus.add(status.id) if (knownStatus.has(status.id)) {
return true console.log(`Drop already processed status ${status.id}`)
return false
} else {
knownStatus.add(status.id)
return true
}
} }
}), }),
asyncMap(status => ({ status, data: mkData(status) })), asyncMap(status => ({ status, data: mkData(status) })),
asyncFilter(({ data }) => { asyncFilter(({ status, data }) => {
if (!data) { if (!data) {
console.log(`Drop non processable status ${status.id}`)
return false return false
} else { } else {
if (knownYoutube.has(data.id)) { if (knownYoutube.has(data.id)) {
console.log(`Drop already processed youtube ${data.id}`)
return false return false
} else { } else {
knownYoutube.add(data.id) knownYoutube.add(data.id)