fix streaming

This commit is contained in:
wryk 2020-03-06 20:23:44 +01:00
parent 1fa83eadaa
commit d7bc1fd266

View File

@ -16,7 +16,6 @@ function parseLinkHeader(linkHeader) {
export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`) export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`)
.then(response => response.json()) .then(response => response.json())
.then(status => processStatus(domain, status))
export async function* statusIterator({ domain, id }) { export async function* statusIterator({ domain, id }) {
const partialTrack = await fetchStatus(domain, id) const partialTrack = await fetchStatus(domain, id)
@ -26,7 +25,7 @@ export async function* statusIterator({ domain, id }) {
} }
} }
export const hashtagStreamingObservable = (domain, hashtag) => { export const hashtagStreamingStatusesObservable = (domain, hashtag) => {
return new Observable(observer => { return new Observable(observer => {
const onOpen = () => { const onOpen = () => {
console.log(`Streaming ${domain} #${hashtag} : open`) console.log(`Streaming ${domain} #${hashtag} : open`)
@ -35,7 +34,7 @@ export const hashtagStreamingObservable = (domain, hashtag) => {
const onStatus = event => { const onStatus = event => {
const status = JSON.parse(event.data) const status = JSON.parse(event.data)
console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`) console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`)
observer.next(processStatus(domain, status)) observer.next(status)
} }
const onError = error => { const onError = error => {
@ -59,6 +58,27 @@ export const hashtagStreamingObservable = (domain, hashtag) => {
}) })
} }
export const hashtagStreamingObservable = (domain, hashtag) => {
return new Observable(observer => {
const subscription = hashtagStreamingStatusesObservable(domain, hashtag).subscribe({
next: status => {
const partialMedia = processStatus(domain, status)
if (partialMedia !== null) {
observer.next(partialMedia)
}
},
error: observer.error,
complete: observer.complete
})
return () => {
subscription.unsubscribe()
}
})
}
// don't handle correctly complete
export const hashtagsStreamingObservable = (domain, hashtags) => { export const hashtagsStreamingObservable = (domain, hashtags) => {
return new Observable(observer => { return new Observable(observer => {
const subscriptions = hashtags const subscriptions = hashtags