hashtag streaming is now working
This commit is contained in:
parent
d92cca787b
commit
83488ad4c5
|
@ -159,7 +159,7 @@ export class StreamComponent implements OnInit {
|
|||
}
|
||||
|
||||
private launchWebsocket(): void {
|
||||
this.websocketStreaming = this.streamingService.getStreaming(this.account, this._streamElement.type);
|
||||
this.websocketStreaming = this.streamingService.getStreaming(this.account, this._streamElement);
|
||||
this.websocketStreaming.statusUpdateSubjet.subscribe((update: StatusUpdate) => {
|
||||
if (update) {
|
||||
if (update.type === EventEnum.update) {
|
||||
|
@ -186,7 +186,6 @@ export class StreamComponent implements OnInit {
|
|||
if (this.bufferStream.length > 60) {
|
||||
this.bufferWasCleared = true;
|
||||
this.bufferStream.length = 40;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ import { Injectable } from "@angular/core";
|
|||
import { Status } from "./models/mastodon.interfaces";
|
||||
import { BehaviorSubject } from "rxjs";
|
||||
import { ApiRoutes } from "./models/api.settings";
|
||||
import { StreamTypeEnum } from "../states/streams.state";
|
||||
import { StreamTypeEnum, StreamElement } from "../states/streams.state";
|
||||
import { MastodonService } from "./mastodon.service";
|
||||
import { AccountInfo } from "../states/accounts.state";
|
||||
import { stat } from "fs";
|
||||
|
@ -12,8 +12,8 @@ export class StreamingService {
|
|||
constructor(
|
||||
private readonly mastodonService: MastodonService) { }
|
||||
|
||||
getStreaming(accountInfo: AccountInfo, streamType: StreamTypeEnum): StreamingWrapper {
|
||||
return new StreamingWrapper(this.mastodonService, accountInfo, streamType);
|
||||
getStreaming(accountInfo: AccountInfo, stream: StreamElement): StreamingWrapper {
|
||||
return new StreamingWrapper(this.mastodonService, accountInfo, stream);
|
||||
}
|
||||
|
||||
|
||||
|
@ -26,11 +26,10 @@ export class StreamingWrapper {
|
|||
|
||||
constructor(
|
||||
private readonly mastodonService: MastodonService,
|
||||
private readonly accountInfo: AccountInfo,
|
||||
private readonly streamType: StreamTypeEnum) {
|
||||
private readonly account: AccountInfo,
|
||||
private readonly stream: StreamElement) {
|
||||
|
||||
const request = this.getRequest(streamType);
|
||||
const route = `wss://${accountInfo.instance}${this.apiRoutes.getStreaming}`.replace('{0}', accountInfo.token.access_token).replace('{1}', request);
|
||||
const route = this.getRoute(account, stream);
|
||||
this.start(route);
|
||||
}
|
||||
|
||||
|
@ -39,7 +38,7 @@ export class StreamingWrapper {
|
|||
this.eventSource.onmessage = x => this.statusParsing(<WebSocketEvent>JSON.parse(x.data));
|
||||
this.eventSource.onerror = x => this.webSocketGotError(x);
|
||||
this.eventSource.onopen = x => console.log(x);
|
||||
this.eventSource.onclose = x => this.webSocketClosed(route, x);
|
||||
this.eventSource.onclose = x => this.webSocketClosed(route, x);
|
||||
}
|
||||
|
||||
private errorClosing: boolean;
|
||||
|
@ -57,26 +56,26 @@ export class StreamingWrapper {
|
|||
}
|
||||
}
|
||||
|
||||
private pullNewStatuses(domain){
|
||||
this.mastodonService.getTimeline(this.accountInfo, this.streamType, null, this.since_id)
|
||||
.then((status: Status[]) => {
|
||||
// status = status.sort((n1, n2) => { return (<number>n1.id) < (<number>n2.id); });
|
||||
status = status.sort((a, b) => a.id.localeCompare(b.id));
|
||||
for (const s of status) {
|
||||
const update = new StatusUpdate();
|
||||
update.status = s;
|
||||
update.type = EventEnum.update;
|
||||
this.since_id = update.status.id;
|
||||
this.statusUpdateSubjet.next(update);
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
console.error(err);
|
||||
})
|
||||
.then(() => {
|
||||
// setTimeout(() => { this.start(domain) }, 20 * 1000);
|
||||
setTimeout(() => { this.pullNewStatuses(domain) }, 15 * 1000);
|
||||
});
|
||||
private pullNewStatuses(domain) {
|
||||
this.mastodonService.getTimeline(this.account, this.stream.type, null, this.since_id, 20, this.stream.tag, this.stream.list)
|
||||
.then((status: Status[]) => {
|
||||
// status = status.sort((n1, n2) => { return (<number>n1.id) < (<number>n2.id); });
|
||||
status = status.sort((a, b) => a.id.localeCompare(b.id));
|
||||
for (const s of status) {
|
||||
const update = new StatusUpdate();
|
||||
update.status = s;
|
||||
update.type = EventEnum.update;
|
||||
this.since_id = update.status.id;
|
||||
this.statusUpdateSubjet.next(update);
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
console.error(err);
|
||||
})
|
||||
.then(() => {
|
||||
// setTimeout(() => { this.start(domain) }, 20 * 1000);
|
||||
setTimeout(() => { this.pullNewStatuses(domain) }, 15 * 1000);
|
||||
});
|
||||
}
|
||||
|
||||
private statusParsing(event: WebSocketEvent) {
|
||||
|
@ -98,7 +97,17 @@ export class StreamingWrapper {
|
|||
this.statusUpdateSubjet.next(newUpdate);
|
||||
}
|
||||
|
||||
private getRequest(type: StreamTypeEnum): string {
|
||||
private getRoute(account: AccountInfo, stream: StreamElement): string {
|
||||
const streamingRouteType = this.getStreamingRouteType(stream.type);
|
||||
let route = `wss://${account.instance}${this.apiRoutes.getStreaming}`.replace('{0}', account.token.access_token).replace('{1}', streamingRouteType);
|
||||
|
||||
if(stream.tag) route = `${route}&tag=${stream.tag}`;
|
||||
if(stream.list) route = `${route}&tag=${stream.list}`;
|
||||
|
||||
return route;
|
||||
}
|
||||
|
||||
private getStreamingRouteType(type: StreamTypeEnum): string {
|
||||
switch (type) {
|
||||
case StreamTypeEnum.global:
|
||||
return 'public';
|
||||
|
@ -106,6 +115,14 @@ export class StreamingWrapper {
|
|||
return 'public:local';
|
||||
case StreamTypeEnum.personnal:
|
||||
return 'user';
|
||||
case StreamTypeEnum.directmessages:
|
||||
return 'direct';
|
||||
case StreamTypeEnum.tag:
|
||||
return 'hashtag';
|
||||
case StreamTypeEnum.list:
|
||||
return 'list';
|
||||
default:
|
||||
throw Error('Not supported');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue