package jp.juggler.subwaytooter import android.content.Context import android.content.SharedPreferences import android.os.Handler import java.net.ProtocolException import java.util.LinkedList import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import java.util.regex.Pattern import jp.juggler.subwaytooter.api.TootApiClient import jp.juggler.subwaytooter.api.TootApiResult import jp.juggler.subwaytooter.api.TootTask import jp.juggler.subwaytooter.api.TootTaskRunner import jp.juggler.subwaytooter.api.TootParser import jp.juggler.subwaytooter.api.entity.TimelineItem import jp.juggler.subwaytooter.api.entity.TootPayload import jp.juggler.subwaytooter.table.SavedAccount import jp.juggler.subwaytooter.util.LogCategory import jp.juggler.subwaytooter.util.WordTrieTree import jp.juggler.subwaytooter.util.runOnMainLooper import jp.juggler.subwaytooter.util.toJsonObject import okhttp3.Response import okhttp3.WebSocket import okhttp3.WebSocketListener internal class StreamReader( val context : Context, private val handler : Handler, val pref : SharedPreferences ) { internal interface StreamCallback{ fun onTimelineItem(item : TimelineItem) fun onListeningStateChanged() } companion object { val log = LogCategory("StreamReader") @Suppress("HasPlatformType") val reAuthorizeError = Pattern.compile("authorize", Pattern.CASE_INSENSITIVE) } private val reader_list = LinkedList() private inner class Reader( internal val access_info : SavedAccount, internal val end_point : String, highlight_trie : WordTrieTree? ) : WebSocketListener() { internal val bDisposed = AtomicBoolean() internal val bListening = AtomicBoolean() internal val socket = AtomicReference(null) internal val callback_list = LinkedList() internal val parser : TootParser init { this.parser = TootParser(context, access_info, highlightTrie = highlight_trie) } internal fun dispose() { bDisposed.set(true) socket.get()?.cancel() } internal val proc_reconnect : Runnable = Runnable { if(bDisposed.get()) return@Runnable startRead() } @Synchronized internal fun setHighlightTrie(highlight_trie : WordTrieTree) { this.parser.highlightTrie = highlight_trie } @Synchronized internal fun addCallback(stream_callback : StreamCallback) { for(c in callback_list) { if(c === stream_callback) return } callback_list.add(stream_callback) } @Synchronized internal fun removeCallback(stream_callback : StreamCallback) { val it = callback_list.iterator() while(it.hasNext()) { val c = it.next() if(c === stream_callback) it.remove() } } fun containsCallback(streamCallback : StreamCallback) : Boolean { return callback_list.contains(streamCallback) } @Synchronized fun fireListeningChanged(){ for(c in callback_list) { try{ c.onListeningStateChanged() }catch(ex:Throwable){ log.trace(ex) } } } /** * Invoked when a web socket has been accepted by the remote peer and may begin transmitting * messages. */ override fun onOpen(webSocket : WebSocket, response : Response) { log.d("WebSocket onOpen. url=%s .", webSocket.request().url()) } /** * Invoked when a text (type `0x1`) message has been received. */ override fun onMessage(webSocket : WebSocket, text : String) { // warning.d( "WebSocket onMessage. url=%s, message=%s", webSocket.request().url(), text ); try { val obj = text.toJsonObject() val event = obj.optString("event") if(event == null || event.isEmpty()) { log.d("onMessage: missing event parameter") return } val payload = TootPayload.parsePayload(parser, event, obj, text) runOnMainLooper { synchronized(this) { if(bDisposed.get()) return@runOnMainLooper when(event) { "delete" -> { if(payload is Long) { val tl_host = access_info.host for(column in App1.getAppState(context).column_list) { try { column.onStatusRemoved(tl_host, payload) } catch(ex : Throwable) { log.trace(ex) } } } else { log.d("payload is not long. $payload") } } else -> { if(payload is TimelineItem) { for(callback in callback_list) { try { callback.onTimelineItem(payload) } catch(ex : Throwable) { log.trace(ex) } } } else { log.d("payload is not TimelineItem. $payload") } } } } } } catch(ex : Throwable) { log.trace(ex) } } /** * Invoked when the peer has indicated that no more incoming messages will be transmitted. */ override fun onClosing(webSocket : WebSocket, code : Int, reason : String?) { log.d( "WebSocket onClosing. code=%s,reason=%s,url=%s .", code, reason, webSocket.request().url() ) webSocket.cancel() bListening.set(false) handler.removeCallbacks(proc_reconnect) handler.postDelayed(proc_reconnect, 10000L) fireListeningChanged() } /** * Invoked when both peers have indicated that no more messages will be transmitted and the * connection has been successfully released. No further calls to this listener will be made. */ override fun onClosed(webSocket : WebSocket, code : Int, reason : String?) { log.d( "WebSocket onClosed. code=%s,reason=%s,url=%s .", code, reason, webSocket.request().url() ) bListening.set(false) handler.removeCallbacks(proc_reconnect) handler.postDelayed(proc_reconnect, 10000L) fireListeningChanged() } /** * Invoked when a web socket has been closed due to an error reading from or writing to the * network. Both outgoing and incoming messages may have been lost. No further calls to this * listener will be made. */ override fun onFailure(webSocket : WebSocket, ex : Throwable, response : Response?) { log.e(ex, "WebSocket onFailure. url=%s .", webSocket.request().url()) bListening.set(false) handler.removeCallbacks(proc_reconnect) fireListeningChanged() if(ex is ProtocolException) { val msg = ex.message if(msg != null && reAuthorizeError.matcher(msg).find()) { log.e("seems old instance that does not support streaming public timeline without access token. don't retry...") return } } handler.postDelayed(proc_reconnect, 10000L) } internal fun startRead() { if(bDisposed.get()) { log.d("startRead: disposed.") return } else if(bListening.get()) { log.d("startRead: already listening.") return } bListening.set(true) fireListeningChanged() TootTaskRunner(context).run(access_info, object : TootTask { override fun background(client : TootApiClient) : TootApiResult? { val result = client.webSocket(end_point, this@Reader) if(result == null) { log.d("startRead: cancelled.") bListening.set(false) fireListeningChanged() } else { val ws = result.data as? WebSocket if(ws != null) { socket.set(ws) } else { val error = result.error log.d("startRead: error. $error") } } return result } override fun handleResult(result : TootApiResult?) { } }) } } private fun prepareReader( accessInfo : SavedAccount, endPoint : String, highlightTrie : WordTrieTree? ) : Reader { synchronized(reader_list) { // アカウントとエンドポイントが同じリーダーがあればそれを使う for(reader in reader_list) { if(reader.access_info.db_id == accessInfo.db_id && reader.end_point == endPoint) { if(highlightTrie != null) reader.setHighlightTrie(highlightTrie) return reader } } // リーダーを作成する val reader = Reader(accessInfo, endPoint, highlightTrie) reader_list.add(reader) return reader } } // onResume や ロード完了ののタイミングで登録される fun register( accessInfo : SavedAccount, endPoint : String, highlightTrie : WordTrieTree?, streamCallback : StreamCallback ) { val reader = prepareReader(accessInfo, endPoint, highlightTrie) reader.addCallback(streamCallback) if(! reader.bListening.get()) { reader.startRead() } } // カラム破棄やリロードのタイミングで呼ばれる fun unregister( accessInfo : SavedAccount, endPoint : String, streamCallback : StreamCallback ) { synchronized(reader_list) { val it = reader_list.iterator() while(it.hasNext()) { val reader = it.next() if(reader.access_info.db_id == accessInfo.db_id && reader.end_point == endPoint) { log.d("unregister: removeCallback $endPoint") reader.removeCallback(streamCallback) if(reader.callback_list.isEmpty()) { log.d("unregister: dispose $endPoint") reader.dispose() it.remove() } } } } } // onPauseのタイミングで全てのStreaming接続を破棄する fun stopAll() { synchronized(reader_list) { for(reader in reader_list) { reader.dispose() } reader_list.clear() } } fun getStreamingStatus( accessInfo : SavedAccount, endPoint : String, streamCallback : StreamCallback ) : StreamingIndicatorState { synchronized(reader_list) { for( reader in reader_list){ if(reader.access_info.db_id == accessInfo.db_id && reader.end_point == endPoint && reader.containsCallback( streamCallback ) ) { return when( reader.bListening.get() ){ true -> StreamingIndicatorState.LISTENING else -> StreamingIndicatorState.REGISTERED } } } } return StreamingIndicatorState.NONE } }