TL: added a fallback strategy in case websocket isn't working

This commit is contained in:
Nicolas Constant 2018-09-16 03:00:35 -04:00
parent c5cd5e8f7a
commit 703225ffc8
No known key found for this signature in database
GPG Key ID: 1E9F677FB01A5688
3 changed files with 59 additions and 31 deletions

View File

@ -64,11 +64,15 @@ export class StreamComponent implements OnInit {
}
private launchWebsocket(): void {
this.websocketStreaming = this.streamingService.getStreaming(this.account.instance, this.account.token.access_token, this._streamElement.type);
this.websocketStreaming = this.streamingService.getStreaming(this.account, this._streamElement.type);
this.websocketStreaming.statusUpdateSubjet.subscribe((update: StatusUpdate) => {
if (update) {
if (update.type === EventEnum.update) {
this.statuses.unshift(update.status);
console.log(update.status);
if (!this.statuses.find(x => x.id == update.status.id)) {
console.log('added');
this.statuses.unshift(update.status);
}
}
}
});

View File

@ -47,4 +47,5 @@ export class ApiRoutes {
getDirectTimeline = '/api/v1/timelines/direct';
getTagTimeline = '/api/v1/timelines/tag/{0}';
getListTimeline = '/api/v1/timelines/list/{0}';
getStreaming = '/api/v1/streaming?access_token={0}&stream={1}';
}

View File

@ -3,64 +3,78 @@ import { Status } from "./models/mastodon.interfaces";
import { BehaviorSubject } from "rxjs";
import { ApiRoutes } from "./models/api.settings";
import { StreamTypeEnum } from "../states/streams.state";
import { MastodonService } from "./mastodon.service";
import { AccountInfo } from "../states/accounts.state";
import { stat } from "fs";
@Injectable()
export class StreamingService {
private apiRoutes = new ApiRoutes();
constructor(
private readonly mastodonService: MastodonService) { }
constructor() { }
getStreaming(instance: string, accessToken: string, streamType: StreamTypeEnum): StreamingWrapper {
const request = this.getRequest(streamType);
const route = `wss://${instance}/api/v1/streaming?access_token=${accessToken}&stream=${request}`
return new StreamingWrapper(route);
getStreaming(accountInfo: AccountInfo, streamType: StreamTypeEnum): StreamingWrapper {
return new StreamingWrapper(this.mastodonService, accountInfo, streamType);
}
private getRequest(type: StreamTypeEnum): string {
switch (type) {
case StreamTypeEnum.global:
return 'public';
case StreamTypeEnum.local:
return 'public:local';
case StreamTypeEnum.personnal:
return 'user';
}
}
}
export class StreamingWrapper {
statusUpdateSubjet = new BehaviorSubject<StatusUpdate>(null);
eventSource: WebSocket;
private apiRoutes = new ApiRoutes();
constructor(private readonly domain: string) {
this.start(domain);
constructor(
private readonly mastodonService: MastodonService,
private readonly accountInfo: AccountInfo,
private readonly streamType: StreamTypeEnum) {
const request = this.getRequest(streamType);
const route = `wss://${accountInfo.instance}${this.apiRoutes.getStreaming}`.replace('{0}', accountInfo.token.access_token).replace('{1}', request);
this.start(route);
}
private start(domain: string) {
this.eventSource = new WebSocket(domain);
private start(route: string) {
this.eventSource = new WebSocket(route);
this.eventSource.onmessage = x => this.statusParsing(<WebSocketEvent>JSON.parse(x.data));
this.eventSource.onerror = x => this.webSocketGotError(x);
this.eventSource.onopen = x => console.log(x);
this.eventSource.onclose = x => this.webSocketClosed(domain, x);
this.eventSource.onclose = x => this.webSocketClosed(route, x);
}
private errorClosing: boolean;
private webSocketGotError(x: Event) {
console.error(x);
this.errorClosing = true;
// this.eventSource.close();
}
private since_id: string;
private webSocketClosed(domain, x: Event) {
console.log(x);
if(this.errorClosing){
if (this.errorClosing) {
this.mastodonService.getTimeline(this.accountInfo, this.streamType, null, this.since_id)
.then((status: Status[]) => {
// status = status.sort((n1, n2) => { return (<number>n1.id) < (<number>n2.id); });
status = status.sort((a, b) => a.id.localeCompare(b.id));
for (const s of status) {
const update = new StatusUpdate();
update.status = s;
update.type = EventEnum.update;
this.since_id = update.status.id;
this.statusUpdateSubjet.next(update);
}
})
.catch(err => {
console.error(err);
})
.then(() => {
setTimeout(() => { this.start(domain) }, 20 * 1000);
});
this.errorClosing = false;
} else {
setTimeout(() => { this.start(domain) }, 3000);
}
setTimeout(() => { this.start(domain) }, 5000);
}
}
private statusParsing(event: WebSocketEvent) {
@ -82,7 +96,16 @@ export class StreamingWrapper {
this.statusUpdateSubjet.next(newUpdate);
}
private getRequest(type: StreamTypeEnum): string {
switch (type) {
case StreamTypeEnum.global:
return 'public';
case StreamTypeEnum.local:
return 'public:local';
case StreamTypeEnum.personnal:
return 'user';
}
}
}
class WebSocketEvent {