first implementation of websockets (and displaying the toots!)

This commit is contained in:
Nicolas Constant 2018-09-13 20:01:25 -04:00
parent f129c17dcd
commit 5b46d4ccf1
No known key found for this signature in database
GPG Key ID: 1E9F677FB01A5688
4 changed files with 86 additions and 35 deletions

View File

@ -13,9 +13,9 @@ export class StreamComponent implements OnInit {
@Input() @Input()
set stream(stream: Stream) { set stream(stream: Stream) {
this._stream = stream; this._stream = stream;
this._stream.statuses.subscribe((toots: TootWrapper[]) => { this._stream.statuses.subscribe((results: TootWrapper[]) => {
for (let t of toots) { for (let t of results) {
this.toots.push(t); this.toots.unshift(t);
} }
}); });
} }

View File

@ -5,7 +5,7 @@ import { BehaviorSubject } from "rxjs";
import { AccountWrapper } from "./account.models"; import { AccountWrapper } from "./account.models";
import { ApiRoutes } from "../services/models/api.settings"; import { ApiRoutes } from "../services/models/api.settings";
import { Account, Status } from "../services/models/mastodon.interfaces"; import { Account, Status } from "../services/models/mastodon.interfaces";
import { StreamingService, StreamingWrapper } from "../services/streaming.service"; import { StreamingService, StreamingWrapper, StatusUpdate, EventEnum } from "../services/streaming.service";
import { StreamTypeEnum } from "../states/streams.state"; import { StreamTypeEnum } from "../states/streams.state";
import { AccountInfo } from "../states/accounts.state"; import { AccountInfo } from "../states/accounts.state";
@ -13,10 +13,12 @@ import { AccountInfo } from "../states/accounts.state";
export class Stream { export class Stream {
private apiRoutes = new ApiRoutes(); private apiRoutes = new ApiRoutes();
private account: AccountInfo; private account: AccountInfo;
private websocketStreaming: StreamingWrapper;
statuses = new BehaviorSubject<TootWrapper[]>([]); statuses = new BehaviorSubject<TootWrapper[]>([]);
constructor( constructor(
private readonly streamingService: StreamingService,
private readonly httpClient: HttpClient, private readonly httpClient: HttpClient,
private readonly store: Store, private readonly store: Store,
public streamName: string, public streamName: string,
@ -29,6 +31,7 @@ export class Stream {
this.account = this.getRegisteredAccounts().find(x => x.username == user && x.instance == instance); this.account = this.getRegisteredAccounts().find(x => x.username == user && x.instance == instance);
this.retrieveToots(); //TODO change this for WebSockets this.retrieveToots(); //TODO change this for WebSockets
this.launchWebsocket();
} }
private getRegisteredAccounts(): AccountInfo[] { private getRegisteredAccounts(): AccountInfo[] {
@ -36,13 +39,8 @@ export class Stream {
return regAccounts; return regAccounts;
} }
private test: StreamingWrapper;
private retrieveToots(): void {
//TEST
const service = new StreamingService();
this.test = service.getStreaming(this.account.instance, this.account.token.access_token);
//END TEST
private retrieveToots(): void {
const route = `https://${this.account.instance}${this.getTimelineRoute()}`; const route = `https://${this.account.instance}${this.getTimelineRoute()}`;
const headers = new HttpHeaders({ 'Authorization': `Bearer ${this.account.token.access_token}` }); const headers = new HttpHeaders({ 'Authorization': `Bearer ${this.account.token.access_token}` });
@ -53,7 +51,34 @@ export class Stream {
}); });
this.statuses.next(statuses); this.statuses.next(statuses);
}); });
}
private launchWebsocket(): void {
//Web socket
let streamRequest: string;
switch (this.type) {
case StreamTypeEnum.global:
streamRequest = 'public';
break;
case StreamTypeEnum.local:
streamRequest = 'public:local';
break;
case StreamTypeEnum.personnal:
streamRequest = 'user';
break;
}
this.websocketStreaming = this.streamingService.getStreaming(this.account.instance, this.account.token.access_token, streamRequest);
this.websocketStreaming.statusUpdateSubjet.subscribe((update: StatusUpdate) => {
if (update) {
if (update.type === EventEnum.update) {
this.statuses.next([new TootWrapper(update.status)]);
}
}
});
} }

View File

@ -7,6 +7,7 @@ import { Store } from "@ngxs/store";
import { Http } from "@angular/http"; import { Http } from "@angular/http";
import { NavigationService } from "../../services/navigation.service"; import { NavigationService } from "../../services/navigation.service";
import { HttpClient } from "@angular/common/http"; import { HttpClient } from "@angular/common/http";
import { StreamingService } from "../../services/streaming.service";
@Component({ @Component({
@ -23,6 +24,7 @@ export class StreamsMainDisplayComponent implements OnInit, OnDestroy {
private columnSelectedSub: Subscription; private columnSelectedSub: Subscription;
constructor( constructor(
private readonly streamingService: StreamingService,
private readonly navigationService: NavigationService, private readonly navigationService: NavigationService,
private readonly httpClient: HttpClient, private readonly httpClient: HttpClient,
private readonly store: Store) { private readonly store: Store) {
@ -34,7 +36,7 @@ export class StreamsMainDisplayComponent implements OnInit, OnDestroy {
this.streamsStateSub = this.streams$.subscribe((streams: StreamElement[]) => { this.streamsStateSub = this.streams$.subscribe((streams: StreamElement[]) => {
this.streams.length = 0; this.streams.length = 0;
for (const stream of streams) { for (const stream of streams) {
const newStream = new Stream(this.httpClient, this.store, stream.name, stream.type, stream.username); const newStream = new Stream(this.streamingService, this.httpClient, this.store, stream.name, stream.type, stream.username);
this.streams.push(newStream); this.streams.push(newStream);
} }

View File

@ -1,48 +1,72 @@
import { Injectable } from "@angular/core"; import { Injectable } from "@angular/core";
import { Status } from "./models/mastodon.interfaces"; import { Status } from "./models/mastodon.interfaces";
import { BehaviorSubject } from "rxjs";
import { ApiRoutes } from "./models/api.settings";
@Injectable() @Injectable()
export class StreamingService { export class StreamingService {
private apiRoutes = new ApiRoutes();
constructor() { } constructor() { }
//TODO restructure this to handle real domain objects //TODO restructure this to handle real domain objects
getStreaming(instance: string, accessToken: string): StreamingWrapper { getStreaming(instance: string, accessToken: string, streamRequest: string): StreamingWrapper {
const route = `wss://${instance}/api/v1/streaming?access_token=${accessToken}&stream=public` const route = `wss://${instance}/api/v1/streaming?access_token=${accessToken}&stream=${streamRequest}`
return new StreamingWrapper(route); return new StreamingWrapper(route);
} }
} }
export class StreamingWrapper { export class StreamingWrapper {
statusUpdateSubjet = new BehaviorSubject<StatusUpdate>(null);
eventSource: WebSocket;
constructor(private readonly domain: string) { constructor(private readonly domain: string) {
const eventSource = new WebSocket(domain); this.start(domain);
eventSource.onmessage = x => this.tootParsing(<WebSocketEvent>JSON.parse(x.data)); }
eventSource.onerror = x => console.error(x);
eventSource.onopen = x => console.log(x);
eventSource.onclose = x => console.log(x);
}
tootParsing(event: WebSocketEvent){ private start(domain: string) {
console.warn(event.event); this.eventSource = new WebSocket(domain);
console.warn(event.payload); this.eventSource.onmessage = x => this.tootParsing(<WebSocketEvent>JSON.parse(x.data));
this.eventSource.onerror = x => console.error(x);
this.eventSource.onopen = x => console.log(x);
this.eventSource.onclose = x => { console.log(x);
setTimeout(() => {this.start(domain)}, 3000);}
}
} private tootParsing(event: WebSocketEvent) {
const newUpdate = new StatusUpdate();
switch (event.event) {
case 'update':
newUpdate.type = EventEnum.update;
newUpdate.status = <Status>JSON.parse(event.payload);
break;
case 'delete':
newUpdate.type = EventEnum.delete;
newUpdate.messageId = event.payload;
break;
default:
newUpdate.type = EventEnum.unknow;
}
this.statusUpdateSubjet.next(newUpdate);
}
} }
class WebSocketEvent { class WebSocketEvent {
event: string; event: string;
payload: Status; payload: any;
} }
export class StatusUpdate { export class StatusUpdate {
type: EventEnum; type: EventEnum;
status: Status; status: Status;
messageId: number;
} }
export enum EventEnum { export enum EventEnum {
unknow = 0, unknow = 0,
update = 1, update = 1,
delete = 2 delete = 2
} }