fix observable to async iterator

This commit is contained in:
wryk 2020-02-13 16:30:25 +01:00
parent a322474ba3
commit 48d484bca2
1 changed files with 10 additions and 13 deletions

View File

@ -31,34 +31,31 @@ export const defer = () => {
export async function* observableToAsyncIterator(observable) { export async function* observableToAsyncIterator(observable) {
const buffer = [defer()] const buffer = [defer()]
let done = false
const next = (x) => { const next = value => {
buffer[buffer.length - 1].resolve(x) buffer[buffer.length - 1].resolve(value)
buffer.push(defer()) buffer.push(defer())
} }
const complete = value => {
buffer[buffer.length - 1].resolve(value)
}
const error = (error) => { const error = (error) => {
buffer[buffer.length - 1].reject(error) buffer[buffer.length - 1].reject(error)
} }
const complete = (x) => {
buffer[buffer.length - 1].resolve(x)
done = true
}
const subscription = observable.subscribe({ next, error, complete }) const subscription = observable.subscribe({ next, error, complete })
try { try {
while (true) { while (true) {
const value = await buffer[0].promise const value = await buffer[0].promise
buffer.unshift() buffer.shift()
// might cause a early complete because done can be true when more than one item are in buffer if (buffer.length) {
if (done) {
return value
} else {
yield value yield value
} else {
return value
} }
} }
} finally { } finally {