diff --git a/src/app/components/stream/stream.component.ts b/src/app/components/stream/stream.component.ts index 7f153bdc..bc78b739 100644 --- a/src/app/components/stream/stream.component.ts +++ b/src/app/components/stream/stream.component.ts @@ -64,11 +64,15 @@ export class StreamComponent implements OnInit { } private launchWebsocket(): void { - this.websocketStreaming = this.streamingService.getStreaming(this.account.instance, this.account.token.access_token, this._streamElement.type); + this.websocketStreaming = this.streamingService.getStreaming(this.account, this._streamElement.type); this.websocketStreaming.statusUpdateSubjet.subscribe((update: StatusUpdate) => { if (update) { if (update.type === EventEnum.update) { - this.statuses.unshift(update.status); + console.log(update.status); + if (!this.statuses.find(x => x.id == update.status.id)) { + console.log('added'); + this.statuses.unshift(update.status); + } } } }); diff --git a/src/app/services/models/api.settings.ts b/src/app/services/models/api.settings.ts index 55297630..47e1d6ec 100644 --- a/src/app/services/models/api.settings.ts +++ b/src/app/services/models/api.settings.ts @@ -47,4 +47,5 @@ export class ApiRoutes { getDirectTimeline = '/api/v1/timelines/direct'; getTagTimeline = '/api/v1/timelines/tag/{0}'; getListTimeline = '/api/v1/timelines/list/{0}'; + getStreaming = '/api/v1/streaming?access_token={0}&stream={1}'; } diff --git a/src/app/services/streaming.service.ts b/src/app/services/streaming.service.ts index 677dd623..697a0b30 100644 --- a/src/app/services/streaming.service.ts +++ b/src/app/services/streaming.service.ts @@ -3,64 +3,78 @@ import { Status } from "./models/mastodon.interfaces"; import { BehaviorSubject } from "rxjs"; import { ApiRoutes } from "./models/api.settings"; import { StreamTypeEnum } from "../states/streams.state"; +import { MastodonService } from "./mastodon.service"; +import { AccountInfo } from "../states/accounts.state"; +import { stat } from "fs"; @Injectable() export class StreamingService { - private apiRoutes = new ApiRoutes(); + constructor( + private readonly mastodonService: MastodonService) { } - constructor() { } - - getStreaming(instance: string, accessToken: string, streamType: StreamTypeEnum): StreamingWrapper { - const request = this.getRequest(streamType); - const route = `wss://${instance}/api/v1/streaming?access_token=${accessToken}&stream=${request}` - return new StreamingWrapper(route); + getStreaming(accountInfo: AccountInfo, streamType: StreamTypeEnum): StreamingWrapper { + return new StreamingWrapper(this.mastodonService, accountInfo, streamType); } - private getRequest(type: StreamTypeEnum): string { - switch (type) { - case StreamTypeEnum.global: - return 'public'; - case StreamTypeEnum.local: - return 'public:local'; - case StreamTypeEnum.personnal: - return 'user'; - } - } + } export class StreamingWrapper { statusUpdateSubjet = new BehaviorSubject(null); eventSource: WebSocket; + private apiRoutes = new ApiRoutes(); - constructor(private readonly domain: string) { - this.start(domain); + constructor( + private readonly mastodonService: MastodonService, + private readonly accountInfo: AccountInfo, + private readonly streamType: StreamTypeEnum) { + + const request = this.getRequest(streamType); + const route = `wss://${accountInfo.instance}${this.apiRoutes.getStreaming}`.replace('{0}', accountInfo.token.access_token).replace('{1}', request); + this.start(route); } - private start(domain: string) { - this.eventSource = new WebSocket(domain); + private start(route: string) { + this.eventSource = new WebSocket(route); this.eventSource.onmessage = x => this.statusParsing(JSON.parse(x.data)); this.eventSource.onerror = x => this.webSocketGotError(x); this.eventSource.onopen = x => console.log(x); - this.eventSource.onclose = x => this.webSocketClosed(domain, x); + this.eventSource.onclose = x => this.webSocketClosed(route, x); } private errorClosing: boolean; private webSocketGotError(x: Event) { - console.error(x); this.errorClosing = true; - // this.eventSource.close(); } + private since_id: string; private webSocketClosed(domain, x: Event) { console.log(x); - if(this.errorClosing){ - + if (this.errorClosing) { + this.mastodonService.getTimeline(this.accountInfo, this.streamType, null, this.since_id) + .then((status: Status[]) => { + // status = status.sort((n1, n2) => { return (n1.id) < (n2.id); }); + status = status.sort((a, b) => a.id.localeCompare(b.id)); + for (const s of status) { + const update = new StatusUpdate(); + update.status = s; + update.type = EventEnum.update; + this.since_id = update.status.id; + this.statusUpdateSubjet.next(update); + } + }) + .catch(err => { + console.error(err); + }) + .then(() => { + setTimeout(() => { this.start(domain) }, 20 * 1000); + }); this.errorClosing = false; } else { - setTimeout(() => { this.start(domain) }, 3000); - } + setTimeout(() => { this.start(domain) }, 5000); + } } private statusParsing(event: WebSocketEvent) { @@ -82,7 +96,16 @@ export class StreamingWrapper { this.statusUpdateSubjet.next(newUpdate); } - + private getRequest(type: StreamTypeEnum): string { + switch (type) { + case StreamTypeEnum.global: + return 'public'; + case StreamTypeEnum.local: + return 'public:local'; + case StreamTypeEnum.personnal: + return 'user'; + } + } } class WebSocketEvent {