mastoradio-fork/src/services/mastodon.js

143 lines
4.4 KiB
JavaScript
Raw Normal View History

2020-01-20 12:23:00 +01:00
import Observable from 'core-js-pure/features/observable'
2020-02-16 17:02:39 +01:00
import getUrls from 'get-urls'
2020-02-21 23:31:30 +01:00
import { urlsToMedia } from '/services/misc.js'
2020-01-20 03:26:18 +01:00
const LINK_RE = /<(.+?)>; rel="(\w+)"/gi
2020-02-22 03:39:15 +01:00
function parseLinkHeader(linkHeader) {
const links = new Map()
2020-01-20 03:26:18 +01:00
2020-02-22 03:39:15 +01:00
for (const [ , url, name ] of linkHeader.matchAll(LINK_RE)) {
links.set(name, url)
2020-01-20 03:26:18 +01:00
}
return links
}
2020-02-16 17:02:39 +01:00
export const fetchStatus = (domain, id) => fetch(`https://${domain}/api/v1/statuses/${id}`)
.then(response => response.json())
.then(status => processStatus(domain, status))
2020-02-15 23:12:53 +01:00
export async function* statusIterator({ domain, id }) {
yield await fetchStatus(domain, id)
}
2020-01-20 03:26:18 +01:00
export const hashtagStreamingObservable = (domain, hashtag) => {
return new Observable(observer => {
2020-02-04 20:07:34 +01:00
const onOpen = () => {
console.log(`Streaming ${domain} #${hashtag} : open`)
}
const onStatus = event => {
const status = JSON.parse(event.data)
console.log(`Streaming ${domain} #${hashtag} : status ${status.id}`)
2020-02-15 23:12:53 +01:00
observer.next(processStatus(domain, status))
2020-02-04 20:07:34 +01:00
}
const onError = error => {
console.error(`Streaming ${domain} #${hashtag} : error`)
console.error(error)
observer.error(error)
}
2020-01-20 03:26:18 +01:00
const eventSource = new EventSource(`https://${domain}/api/v1/streaming/hashtag?tag=${hashtag}`)
2020-02-04 20:07:34 +01:00
eventSource.addEventListener('open', onOpen)
2020-01-20 03:26:18 +01:00
eventSource.addEventListener('update', onStatus)
eventSource.addEventListener('error', onError)
return () => {
2020-02-21 23:31:30 +01:00
console.log(`Streaming ${domain} #${hashtag} : closed`)
2020-02-04 20:07:34 +01:00
eventSource.removeEventListener('open', onOpen)
2020-01-20 03:26:18 +01:00
eventSource.removeEventListener('update', onStatus)
eventSource.removeEventListener('error', onError)
2020-02-16 17:02:39 +01:00
eventSource.close()
2020-01-20 03:26:18 +01:00
}
})
}
2020-02-21 23:31:30 +01:00
export const hashtagsStreamingObservable = (domain, hashtags) => {
return new Observable(observer => {
const subscriptions = hashtags
.map(hashtag => hashtagStreamingObservable(domain, hashtag))
.map(observable => observable.subscribe(observer))
return () => {
subscriptions.forEach(subscription => subscription.unsubscribe())
}
})
}
2020-01-20 03:26:18 +01:00
export async function* hashtagTimelineIterator (domain, hashtag) {
let nextLink = `https://${domain}/api/v1/timelines/tag/${hashtag}?limit=40`
while (nextLink) {
const response = await fetch(nextLink)
nextLink = response.headers.has('link')
2020-02-22 03:39:15 +01:00
? parseLinkHeader(response.headers.get('link')).get('next')
2020-01-20 03:26:18 +01:00
: null
2020-02-04 20:07:34 +01:00
const statuses = await response.json()
console.log(`Timeline ${domain} #${hashtag} : fetched ${statuses.length} statuses`)
2020-02-15 23:12:53 +01:00
yield* statuses.map(status => processStatus(domain, status))
2020-01-20 03:26:18 +01:00
}
}
2020-02-21 23:31:30 +01:00
export async function* hashtagsTimelineIterator (domain, hashtags) {
const iterators = hashtags.map(hashtag => hashtagTimelineIterator(domain, hashtag))
const promises = iterators.map(iterator => iterator.next())
while (true) {
const results = (await Promise.all(promises))
.map((result, index) => ({ index, result }))
.filter(({ result }) => !result.done)
if (results.length > 0) {
const sorted = results.sort((a, b) => b.result.value.date - a.result.value.date)
const { index, result: { value } } = sorted[0]
promises[index] = iterators[index].next()
yield value
} else {
break
}
}
2020-01-20 03:26:18 +01:00
}
2020-02-21 23:31:30 +01:00
export async function* hashtagsIterator(domain, hashtags) {
const buffer = []
2020-02-14 17:49:56 +01:00
2020-02-21 23:31:30 +01:00
const streamingSubscription = hashtagsStreamingObservable(domain, hashtags).subscribe({
next: value => buffer.push(value),
error: error => console.error(error),
complete: () => console.log('complete')
})
2020-02-14 17:49:56 +01:00
2020-02-21 23:31:30 +01:00
const timelineGenerator = hashtagsTimelineIterator(domain, hashtags)
2020-02-14 17:49:56 +01:00
2020-02-21 23:31:30 +01:00
try {
while (true) {
if (buffer.length > 0) {
yield buffer.pop()
} else {
yield (await timelineGenerator.next()).value
}
}
} finally {
streamingSubscription.unsubscribe()
timelineGenerator.return()
2020-02-14 17:49:56 +01:00
}
}
2020-02-15 23:12:53 +01:00
const processStatus = (domain, status) => ({
2020-02-22 03:39:15 +01:00
username: status.account.username,
content: status.content,
2020-02-17 01:23:35 +01:00
date: new Date(status.created_at),
2020-02-22 03:39:15 +01:00
url: status.url,
credentials: { type: 'mastodon', domain, id: status.id }
2020-02-15 23:12:53 +01:00
})