マストドン3.3.0でストリーミングイベント中のストリーム購読を示す部分の判定に失敗していた

This commit is contained in:
tateisu 2020-12-27 22:01:41 +09:00
parent 0f716a1da8
commit 5dba1dacef
4 changed files with 20 additions and 9 deletions

View File

@ -590,6 +590,7 @@ class Column(
}
override fun onTimelineItem(item: TimelineItem, channelId: String?, stream: JsonArray?) {
if(StreamManager.traceDelivery) log.v("${access_info.acct} onTimelineItem")
if (!canHandleStreamingMessage()) return
when (item) {
@ -2780,13 +2781,13 @@ class Column(
fun canStartStreaming() = when {
// 未初期化なら何もしない
!bFirstInitialized -> {
log.v("canStartStreaming: column is not initialized.")
if(StreamManager.traceDelivery) log.v("canStartStreaming: column is not initialized.")
false
}
// 初期ロード中なら何もしない
bInitialLoading -> {
log.v("canStartStreaming: is in initial loading.")
if(StreamManager.traceDelivery) log.v("canStartStreaming: is in initial loading.")
false
}

View File

@ -81,6 +81,7 @@ class StreamConnection(
item:TimelineItem?=null,
block: (callback: StreamCallback) -> Unit
) {
if(StreamManager.traceDelivery) log.v("$name eachCallback spec=${spec?.name}")
if (spec != null) {
eachCallbackForSpec(spec,channelId,stream,item,block)
}else {
@ -103,6 +104,7 @@ class StreamConnection(
private fun fireTimelineItem(item: TimelineItem?, channelId: String? = null,stream:JsonArray?=null) {
item?:return
if(StreamManager.traceDelivery) log.v("$name fireTimelineItem")
eachCallback(channelId,stream,item=item) { it.onTimelineItem(item, channelId,stream) }
}
@ -184,7 +186,7 @@ class StreamConnection(
fireTimelineItem(acctGroup.parser.notification(body), channelId)
}
else -> log.v("$name ignore streaming event $type")
else -> log.w("$name ignore streaming event $type")
}
}
@ -195,7 +197,7 @@ class StreamConnection(
when (val event = obj.string("event")) {
null, "" ->
log.d("$name onMessage: missing event parameter")
log.d("$name handleMastodonMessage: missing event parameter")
"filters_changed" ->
Column.onFiltersChanged(manager.context, acctGroup.account)
@ -254,7 +256,7 @@ class StreamConnection(
override fun onMessage(webSocket: WebSocket, text: String) {
manager.enqueue {
log.v("$name WebSocket onMessage.")
if(StreamManager.traceDelivery) log.v("$name WebSocket onMessage.")
try {
val obj = text.decodeJsonObject()
when {
@ -263,7 +265,7 @@ class StreamConnection(
}
} catch (ex: Throwable) {
log.trace(ex)
log.e("data=$text")
log.e("$name onMessage error. data=$text")
}
}
}

View File

@ -23,15 +23,21 @@ class StreamGroup(val spec: StreamSpec) {
fun eachCallback(channelId: String?, stream: JsonArray?, item: TimelineItem?, block: (callback: StreamCallback) -> Unit) {
// skip if channel id is provided and not match
if (channelId?.isNotEmpty() == true && channelId != spec.channelId) return
if (channelId?.isNotEmpty() == true && channelId != spec.channelId){
if(StreamManager.traceDelivery) log.v("${spec.name} channelId not match.")
return
}
val strStream = stream?.joinToString { "," }
val strStream = stream?.joinToString( "," )
destinations.values.forEach { dst ->
try {
if (strStream != null && item != null) {
val column = dst.refColumn.get() ?: return@forEach
if (!dst.spec.streamFilter(column, strStream, item)) return@forEach
if (!dst.spec.streamFilter(column, strStream, item)){
if(StreamManager.traceDelivery) log.v("${spec.name} streamFilter not match. strStream=$strStream")
return@forEach
}
}
dst.refCallback.get()?.let { block(it) }
} catch (ex: Throwable) {

View File

@ -23,6 +23,8 @@ class StreamManager(val appState: AppState) {
companion object {
private val log = LogCategory("StreamManager")
val traceDelivery = "false".toBoolean()
// 画面ONの間は定期的に状況を更新する
const val updateInterval = 5000L