diff --git a/src/Services/Streams/IStreamListener.vala b/src/Services/Streams/IStreamListener.vala new file mode 100644 index 0000000..7033829 --- /dev/null +++ b/src/Services/Streams/IStreamListener.vala @@ -0,0 +1,7 @@ +public interface Tootle.IStreamListener : GLib.Object { + + public signal void on_status_removed (string id); + public signal void on_status_added (API.Status s); + public signal void on_notification (API.Notification n); + +} diff --git a/src/Services/Streams/Streams.vala b/src/Services/Streams/Streams.vala new file mode 100644 index 0000000..355ad34 --- /dev/null +++ b/src/Services/Streams/Streams.vala @@ -0,0 +1,184 @@ +using Soup; +using Gee; + +public class Tootle.Streams : Object { + + protected HashTable connections { + get; + set; + default = new HashTable (GLib.str_hash, GLib.str_equal); + } + + protected class Connection : Object { + public ArrayList subscribers; + protected WebsocketConnection socket; + protected Message msg; + + protected bool closing = false; + protected int timeout = 1; + + public string name { + owned get { + var url = msg.get_uri ().to_string (false); + return url.slice (0, url.last_index_of ("&access_token")); + } + } + + public Connection (string url) { + this.subscribers = new ArrayList (); + this.msg = new Message ("GET", url); + } + + public bool start () { + message (@"Opening stream: $name"); + network.session.websocket_connect_async.begin (msg, null, null, null, (obj, res) => { + socket = network.session.websocket_connect_async.end (res); + socket.error.connect (on_error); + socket.closed.connect (on_closed); + socket.message.connect (on_message); + }); + return false; + } + + public void add (IStreamListener s) { + info ("%s > %s", get_subscriber_name (s), name); + subscribers.add (s); + } + + public void remove (IStreamListener s) { + if (subscribers.contains (s)) { + info ("%s X %s", get_subscriber_name (s), name); + subscribers.remove (s); + } + + if (subscribers.size <= 0) { + info (@"Closing: $name"); + closing = true; + socket.close (0, null); + } + } + + void on_error (Error e) { + if (!closing) + warning (@"Error in $name: $(e.message)"); + } + + void on_closed () { + if (!closing) { + warning (@"DISCONNECTED: $name. Reconnecting in $timeout seconds."); + GLib.Timeout.add_seconds (timeout, start); + timeout = int.min (timeout*2, 6); + } + message (@"Closing stream: $name"); + } + + void on_message (int i, Bytes bytes) { + try { + emit (bytes, this); + } + catch (Error e) { + warning (@"Couldn't handle websocket event. Reason: $(e.message)"); + } + } + } + + public void subscribe (string? url, IStreamListener s, out string cookie) { + if (url == null) + return; + + if (connections.contains (url)) { + connections[url].add (s); + } + else { + var con = new Connection (url); + connections[url] = con; + con.add (s); + con.start (); + } + cookie = url; + } + + public void unsubscribe (string? cookie, IStreamListener s) { + var url = cookie; + if (url == null) + return; + + if (connections.contains (url)) + connections.@get (url).remove (s); + } + + static string get_subscriber_name (Object s) { + return s.get_type ().name (); + } + + static void decode (Bytes bytes, out Json.Node root, out Json.Object obj, out string event) throws Error { + var msg = (string) bytes.get_data (); + var parser = new Json.Parser (); + parser.load_from_data (msg, -1); + root = parser.steal_root (); + obj = root.get_object (); + event = obj.get_string_member ("event"); + } + + static Json.Node payload (Json.Object obj) { + var payload = obj.get_string_member ("payload"); + var data = Soup.URI.decode (payload); + var parser = new Json.Parser (); + parser.load_from_data (data, -1); + return parser.steal_root (); + } + + static void emit (Bytes bytes, Connection c) throws Error { + if (!settings.live_updates) + return; + + Json.Node root; + Json.Object root_obj; + string ev; + decode (bytes, out root, out root_obj, out ev); + + // c.subscribers.@foreach (s => { + // message (@"$(c.name): $ev for $(get_subscriber_name (s))"); + // return true; + // }); + + switch (ev) { + case "update": + var node = payload (root_obj); + var status = Entity.from_json (typeof (API.Status), node) as API.Status; + c.subscribers.@foreach (s => { + s.on_status_added (status); + return true; + }); + break; + case "delete": + var id = root_obj.get_string_member ("payload"); + c.subscribers.@foreach (s => { + s.on_status_removed (id); + return true; + }); + break; + case "notification": + var node = payload (root_obj); + var notif = Entity.from_json (typeof (API.Notification), node) as API.Notification; + c.subscribers.@foreach (s => { + s.on_notification (notif); + return true; + }); + break; + default: + warning (@"Unknown websocket event: \"$ev\". Ignoring."); + break; + } + } + + public void force_delete (string id) { + connections.get_values ().@foreach (c => { + c.subscribers.@foreach (s => { + s.on_status_removed (id); + return true; + }); + }); + } + +}