Fix streams

This commit is contained in:
Bleak Grey 2020-05-30 13:31:02 +03:00
parent f0326fec29
commit 41208fa8eb
7 changed files with 85 additions and 67 deletions

View File

@ -207,7 +207,7 @@ public class Tootle.API.Status : GLib.Object {
new Request.DELETE (@"/api/v1/statuses/$id") new Request.DELETE (@"/api/v1/statuses/$id")
.with_account (accounts.active) .with_account (accounts.active)
.then ((sess, msg) => { .then ((sess, msg) => {
streams.status_removed (id); streams.force_delete (id);
cb (sess, msg); cb (sess, msg);
}) })
.on_error ((status, reason) => err (status, reason)) .on_error ((status, reason) => err (status, reason))

View File

@ -133,23 +133,20 @@ public class Tootle.InstanceAccount : API.Account, IStreamListener {
if (settings.notifications) if (settings.notifications)
app.send_notification (app.application_id + ":" + obj.id.to_string (), notification); app.send_notification (app.application_id + ":" + obj.id.to_string (), notification);
if (is_current ())
streams.notification (obj);
if (obj.kind == API.NotificationType.WATCHLIST) { if (obj.kind == API.NotificationType.WATCHLIST) {
cached_notifications.add (obj); cached_notifications.add (obj);
accounts.save (); accounts.save ();
} }
} }
public override void on_status_removed (int64 id) { // public override void on_status_removed (int64 id) {
if (is_current ()) // if (is_current ())
streams.status_removed (id); // streams.force_delete (id);
} // }
public override void on_status_added (API.Status status) { // public override void on_status_added (API.Status status) {
if (!is_current ()) // if (!is_current ())
return; // return;
// watchlist.users.@foreach (item => { // watchlist.users.@foreach (item => {
// var acct = status.account.acct; // var acct = status.account.acct;
@ -162,6 +159,6 @@ public class Tootle.InstanceAccount : API.Account, IStreamListener {
// } // }
// return true; // return true;
// }); // });
} // }
} }

View File

@ -1,12 +1,8 @@
using GLib;
using Soup; using Soup;
using Gee; using Gee;
public class Tootle.Streams : Object { public class Tootle.Streams : Object {
public signal void notification (API.Notification n);
public signal void status_removed (int64 id);
protected HashTable<string, Connection> connections { protected HashTable<string, Connection> connections {
get; get;
set; set;
@ -34,7 +30,7 @@ public class Tootle.Streams : Object {
} }
public bool start () { public bool start () {
//info (@"Opening stream: $name"); info (@"Opening stream: $name");
network.session.websocket_connect_async.begin (msg, null, null, null, (obj, res) => { network.session.websocket_connect_async.begin (msg, null, null, null, (obj, res) => {
socket = network.session.websocket_connect_async.end (res); socket = network.session.websocket_connect_async.end (res);
socket.error.connect (on_error); socket.error.connect (on_error);
@ -69,10 +65,11 @@ public class Tootle.Streams : Object {
void on_closed () { void on_closed () {
if (!closing) { if (!closing) {
warning (@"CLOSED: $name. Reconnecting in $timeout seconds."); warning (@"DISCONNECTED: $name. Reconnecting in $timeout seconds.");
GLib.Timeout.add_seconds (timeout, start); GLib.Timeout.add_seconds (timeout, start);
timeout = int.min (timeout*2, 30); timeout = int.min (timeout*2, 30);
} }
warning (@"Closing stream: $name");
} }
void on_message (int i, Bytes bytes) { void on_message (int i, Bytes bytes) {
@ -134,44 +131,54 @@ public class Tootle.Streams : Object {
if (!settings.live_updates) if (!settings.live_updates)
return; return;
string event; string e;
Json.Object root; Json.Object root;
decode (bytes, out event, out root); decode (bytes, out e, out root);
// c.subscribers.@foreach (s => { // c.subscribers.@foreach (s => {
// warning ("%s: %s for %s", c.name, event, get_subscriber_name (s)); // warning ("%s: %s for %s", c.name, e, get_subscriber_name (s));
// return false; // return false;
// }); // });
switch (event) { switch (e) {
case "update": case "update":
var entity = new API.Status (sanitize (root)); var obj = new API.Status (sanitize (root));
c.subscribers.@foreach (s => { c.subscribers.@foreach (s => {
if (s.accepts (ref event)) if (s.accepts (ref e))
s.on_status_added (entity); s.on_status_added (obj);
return false; return true;
}); });
break; break;
case "delete": case "delete":
var id = int64.parse (root.get_string_member ("payload")); var id = int64.parse (root.get_string_member ("payload"));
c.subscribers.@foreach (s => { c.subscribers.@foreach (s => {
if (s.accepts (ref event)) if (s.accepts (ref e))
s.on_status_removed (id); s.on_status_removed (id);
return false; return true;
}); });
break; break;
case "notification": case "notification":
var entity = new API.Notification (sanitize (root)); var obj = new API.Notification (sanitize (root));
c.subscribers.@foreach (s => { c.subscribers.@foreach (s => {
if (s.accepts (ref event)) if (s.accepts (ref e))
s.on_notification (entity); s.on_notification (obj);
return false; return true;
}); });
break; break;
default: default:
warning (@"Unknown websocket event: \"$event\". Ignoring."); warning (@"Unknown websocket event: \"$e\". Ignoring.");
break; break;
} }
} }
public void force_delete (int64 id) {
warning (@"Force removing status id $id");
connections.get_values ().@foreach (c => {
c.subscribers.@foreach (s => {
s.on_status_removed (id);
return false;
});
});
}
} }

View File

@ -1,18 +1,22 @@
using Gtk; using Gtk;
using Gdk; using Gdk;
public class Tootle.Views.Notifications : Views.Base, IAccountListener { public class Tootle.Views.Notifications : Views.Base, IAccountListener, IStreamListener { //TODO: make this a timeline
protected InstanceAccount? account = null; protected InstanceAccount? account = null;
protected int64 last_id = 0; protected int64 last_id = 0;
protected bool force_dot = false; protected bool force_dot = false;
protected string? stream;
public Notifications () { public Notifications () {
app.refresh.connect (on_refresh); app.refresh.connect (on_refresh);
status_button.clicked.connect (on_refresh); status_button.clicked.connect (on_refresh);
streams.notification.connect (prepend);
connect_account (); connect_account ();
} }
~Notifications () {
streams.unsubscribe (stream, this);
}
private bool has_unread () { private bool has_unread () {
if (account == null) if (account == null)
@ -85,6 +89,7 @@ public class Tootle.Views.Notifications : Views.Base, IAccountListener {
public virtual void on_account_changed (InstanceAccount? acc) { public virtual void on_account_changed (InstanceAccount? acc) {
account = acc; account = acc;
streams.unsubscribe (stream, this);
if (account == null) { if (account == null) {
last_id = 0; last_id = 0;
force_dot = false; force_dot = false;
@ -92,10 +97,24 @@ public class Tootle.Views.Notifications : Views.Base, IAccountListener {
else { else {
last_id = account.last_seen_notification; last_id = account.last_seen_notification;
force_dot = account.has_unread_notifications; force_dot = account.has_unread_notifications;
streams.subscribe (get_stream_url (), this, out stream);
} }
on_refresh (); on_refresh ();
} }
public virtual string? get_stream_url () {
return account != null ? @"$(account.instance)/api/v1/streaming/?stream=user&access_token=$(account.token)" : null;
}
public override bool accepts (ref string event) {
warning (event);
return true;
}
public override void on_notification (API.Notification n) {
prepend (n);
}
public bool request () { public bool request () {
if (account != null) { if (account != null) {
account.cached_notifications.@foreach (notification => { account.cached_notifications.@foreach (notification => {

View File

@ -132,14 +132,6 @@ public class Tootle.Views.Timeline : Views.Base, IAccountListener, IStreamListen
on_refresh (); on_refresh ();
} }
protected override bool accepts (ref string event) {
var allowed_public = true;
if (is_public)
allowed_public = settings.live_updates_public;
return settings.live_updates && allowed_public;
}
protected override void on_bottom_reached () { protected override void on_bottom_reached () {
if (is_last_page) { if (is_last_page) {
info ("Last page reached"); info ("Last page reached");
@ -148,4 +140,20 @@ public class Tootle.Views.Timeline : Views.Base, IAccountListener, IStreamListen
request (); request ();
} }
public override bool accepts (ref string event) {
var allowed_public = true;
if (is_public)
allowed_public = settings.live_updates_public;
return settings.live_updates && allowed_public;
}
public override void on_status_removed (int64 id) {
content.get_children ().@foreach (w => {
var sw = w as Widgets.Status;
if (sw.status.id == id)
sw.destroy ();
});
}
} }

View File

@ -25,10 +25,4 @@ public class Tootle.Widgets.Notification : Widgets.Status {
header_label.label = kind.get_desc (notification.account); header_label.label = kind.get_desc (notification.account);
} }
protected override void on_status_removed (int64 id) {
if (id == notification.status.id)
notification.dismiss ();
base.on_status_removed (id);
}
} }

View File

@ -85,7 +85,6 @@ public class Tootle.Widgets.Status : EventBox {
construct { construct {
button_press_event.connect (on_clicked); button_press_event.connect (on_clicked);
streams.status_removed.connect (on_status_removed);
content.activate_link.connect (on_toggle_spoiler); content.activate_link.connect (on_toggle_spoiler);
notify["kind"].connect (on_kind_changed); notify["kind"].connect (on_kind_changed);
@ -157,15 +156,9 @@ public class Tootle.Widgets.Status : EventBox {
~Status () { ~Status () {
button_press_event.disconnect (on_clicked); button_press_event.disconnect (on_clicked);
streams.status_removed.disconnect (on_status_removed);
notify["kind"].disconnect (on_kind_changed); notify["kind"].disconnect (on_kind_changed);
} }
protected virtual void on_status_removed (int64 id) {
if (id == status.id)
destroy ();
}
protected bool on_toggle_spoiler (string uri) { protected bool on_toggle_spoiler (string uri) {
if (uri == "tootle://toggle") { if (uri == "tootle://toggle") {
revealer.reveal_child = !revealer.reveal_child; revealer.reveal_child = !revealer.reveal_child;