mirror of
https://github.com/Dimillian/IceCubesApp.git
synced 2025-01-28 13:29:18 +01:00
137 lines
3.9 KiB
Swift
137 lines
3.9 KiB
Swift
import Foundation
|
|
import Models
|
|
import Network
|
|
|
|
@MainActor
|
|
public class StreamWatcher: ObservableObject {
|
|
private var client: Client?
|
|
private var task: URLSessionWebSocketTask?
|
|
private var watchedStreams: [Stream] = []
|
|
|
|
private let decoder = JSONDecoder()
|
|
private let encoder = JSONEncoder()
|
|
|
|
public enum Stream: String {
|
|
case publicTimeline = "public"
|
|
case user
|
|
case direct
|
|
}
|
|
|
|
@Published public var events: [any StreamEvent] = []
|
|
@Published public var unreadNotificationsCount: Int = 0
|
|
@Published public var latestEvent: (any StreamEvent)?
|
|
|
|
public init() {
|
|
decoder.keyDecodingStrategy = .convertFromSnakeCase
|
|
}
|
|
|
|
public func setClient(client: Client) {
|
|
if self.client != nil {
|
|
stopWatching()
|
|
}
|
|
self.client = client
|
|
connect()
|
|
}
|
|
|
|
private func connect() {
|
|
task = client?.makeWebSocketTask(endpoint: Streaming.streaming)
|
|
task?.resume()
|
|
receiveMessage()
|
|
}
|
|
|
|
public func watch(streams: [Stream]) {
|
|
if client?.isAuth == false {
|
|
return
|
|
}
|
|
if task == nil {
|
|
connect()
|
|
}
|
|
watchedStreams = streams
|
|
streams.forEach { stream in
|
|
sendMessage(message: StreamMessage(type: "subscribe", stream: stream.rawValue))
|
|
}
|
|
}
|
|
|
|
public func stopWatching() {
|
|
task?.cancel()
|
|
task = nil
|
|
}
|
|
|
|
private func sendMessage(message: StreamMessage) {
|
|
task?.send(.data(try! encoder.encode(message)),
|
|
completionHandler: { _ in })
|
|
}
|
|
|
|
private func receiveMessage() {
|
|
task?.receive(completionHandler: { result in
|
|
switch result {
|
|
case let .success(message):
|
|
switch message {
|
|
case let .string(string):
|
|
do {
|
|
guard let data = string.data(using: .utf8) else {
|
|
print("Error decoding streaming event string")
|
|
return
|
|
}
|
|
let rawEvent = try self.decoder.decode(RawStreamEvent.self, from: data)
|
|
if let event = self.rawEventToEvent(rawEvent: rawEvent) {
|
|
Task { @MainActor in
|
|
self.events.append(event)
|
|
self.latestEvent = event
|
|
if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct {
|
|
self.unreadNotificationsCount += 1
|
|
}
|
|
}
|
|
}
|
|
} catch {
|
|
print("Error decoding streaming event: \(error.localizedDescription)")
|
|
}
|
|
|
|
default:
|
|
break
|
|
}
|
|
|
|
self.receiveMessage()
|
|
|
|
case .failure:
|
|
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(10)) { [weak self] in
|
|
guard let self = self else { return }
|
|
self.stopWatching()
|
|
self.connect()
|
|
self.watch(streams: self.watchedStreams)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
private func rawEventToEvent(rawEvent: RawStreamEvent) -> (any StreamEvent)? {
|
|
guard let payloadData = rawEvent.payload.data(using: .utf8) else {
|
|
return nil
|
|
}
|
|
do {
|
|
switch rawEvent.event {
|
|
case "update":
|
|
let status = try decoder.decode(Status.self, from: payloadData)
|
|
return StreamEventUpdate(status: status)
|
|
case "status.update":
|
|
let status = try decoder.decode(Status.self, from: payloadData)
|
|
return StreamEventStatusUpdate(status: status)
|
|
case "delete":
|
|
return StreamEventDelete(status: rawEvent.payload)
|
|
case "notification":
|
|
let notification = try decoder.decode(Notification.self, from: payloadData)
|
|
return StreamEventNotification(notification: notification)
|
|
case "conversation":
|
|
let conversation = try decoder.decode(Conversation.self, from: payloadData)
|
|
return StreamEventConversation(conversation: conversation)
|
|
default:
|
|
return nil
|
|
}
|
|
} catch {
|
|
print("Error decoding streaming event to final event: \(error.localizedDescription)")
|
|
print("Raw data: \(rawEvent.payload)")
|
|
return nil
|
|
}
|
|
}
|
|
}
|