ストリーミングインジケータの表示がおかしい、カラムを復帰してから数秒待たないとストリーミングイベントの配信がおかしい等の問題を修正

This commit is contained in:
tateisu 2020-12-27 16:10:58 +09:00
parent 5adf857661
commit 04621ec97e
8 changed files with 132 additions and 81 deletions

View File

@ -576,14 +576,15 @@ class Column(
val streamCallback = object : StreamCallback {
override fun onListeningStateChanged(status: StreamStatus) {
if (!canHandleStreamingMessage()) return
override fun onStreamStatusChanged(status: StreamStatus) {
log.d("onStreamStatusChanged status=${status}, bFirstInitialized=$bFirstInitialized, bInitialLoading=$bInitialLoading, column=${access_info.acct}/${getColumnName(true)}")
if (status == StreamStatus.Open) {
if (status == StreamStatus.Subscribed) {
updateMisskeyCapture()
}
runOnMainLooperForStreamingEvent {
if(is_dispose.get()) return@runOnMainLooperForStreamingEvent
fireShowColumnStatus()
}
}

View File

@ -779,16 +779,17 @@ class ColumnViewHolder(
}
)
}
when (column.getStreamingStatus()) {
StreamIndicatorState.NONE -> {
}
val streamStatus =column.getStreamingStatus()
log.d("procShowColumnStatus: streamStatus=${streamStatus}, column=${column.access_info.acct}/${column.getColumnName(true)}")
StreamIndicatorState.REGISTERED -> {
when (streamStatus) {
StreamStatus.Missing,StreamStatus.Closed-> {
}
StreamStatus.Connecting,StreamStatus.Open->{
sb.appendColorShadeIcon(activity, R.drawable.ic_pulse, "Streaming")
sb.append("?")
}
StreamIndicatorState.LISTENING -> {
StreamStatus.Subscribed->{
sb.appendColorShadeIcon(activity, R.drawable.ic_pulse, "Streaming")
}
}

View File

@ -7,8 +7,9 @@ import jp.juggler.subwaytooter.api.entity.TootAnnouncement
import jp.juggler.util.JsonArray
interface StreamCallback {
fun onStreamStatusChanged(status: StreamStatus)
fun onTimelineItem(item: TimelineItem, channelId: String?,stream: JsonArray?)
fun onListeningStateChanged(status: StreamStatus)
fun onNoteUpdated(ev: MisskeyNoteUpdate, channelId: String?)
fun onAnnouncementUpdate(item: TootAnnouncement)
fun onAnnouncementDelete(id: EntityId)

View File

@ -49,14 +49,14 @@ class StreamConnection(
get() = _status.get()
set(value) {
_status.set(value)
eachCallback { it.onListeningStateChanged(value) }
eachCallback { it.onStreamStatusChanged(value) }
}
private val socket = AtomicReference<WebSocket>(null)
private var lastAliveSend = 0L
private val subscription = ConcurrentHashMap<StreamSpec, StreamGroup>()
private val subscriptions = ConcurrentHashMap<StreamSpec, StreamGroup>()
// Misskeyの投稿キャプチャ
private val capturedId = HashSet<EntityId>()
@ -64,22 +64,15 @@ class StreamConnection(
///////////////////////////////////////////////////////////////////
// methods
fun dispose() {
isDisposed.set(true)
socket.get()?.cancel()
socket.set(null)
}
fun getStreamingStatus(streamSpec: StreamSpec?) = when {
streamSpec == null -> null
status != StreamStatus.Open || null == socket.get() ->
StreamIndicatorState.REGISTERED
subscription[streamSpec] == null ->
StreamIndicatorState.REGISTERED
else -> StreamIndicatorState.LISTENING
private fun eachCallbackForSpec(
spec:StreamSpec,
channelId: String? = null,
stream:JsonArray? = null,
item:TimelineItem?=null,
block: (callback: StreamCallback) -> Unit
) {
if (isDisposed.get()) return
acctGroup.keyGroups[spec]?.eachCallback(channelId,stream,item,block)
}
private fun eachCallback(
@ -88,14 +81,26 @@ class StreamConnection(
item:TimelineItem?=null,
block: (callback: StreamCallback) -> Unit
) {
if (isDisposed.get()) return
if (spec == null) {
acctGroup.keyGroups.values.forEach { it.eachCallback(channelId,stream,item,block) }
} else {
acctGroup.keyGroups[spec]?.eachCallback(channelId,stream,item,block)
if (spec != null) {
eachCallbackForSpec(spec,channelId,stream,item,block)
}else {
if (isDisposed.get()) return
acctGroup.keyGroups.values.forEach { it.eachCallback(channelId, stream, item, block) }
}
}
fun dispose() {
status = StreamStatus.Closed
isDisposed.set(true)
socket.get()?.cancel()
socket.set(null)
}
fun getStreamStatus(streamSpec: StreamSpec) :StreamStatus = when {
subscriptions[streamSpec] != null -> StreamStatus.Subscribed
else -> status
}
private fun fireTimelineItem(item: TimelineItem?, channelId: String? = null,stream:JsonArray?=null) {
item?:return
eachCallback(channelId,stream,item=item) { it.onTimelineItem(item, channelId,stream) }
@ -106,7 +111,7 @@ class StreamConnection(
}
private fun fireDeleteId(id: EntityId) {
if (Pref.bpDontRemoveDeletedToot(manager.appState.pref)) return
if (Pref.bpDontRemoveDeletedToot.invoke(manager.appState.pref)) return
val tl_host = acctGroup.account.apiHost
manager.appState.columnList.forEach {
runOnMainLooper {
@ -313,8 +318,10 @@ class StreamConnection(
}
private fun unsubscribe(spec:StreamSpec) {
subscription.remove(spec)
try {
subscriptions.remove(spec)
eachCallbackForSpec(spec) { it.onStreamStatusChanged(getStreamStatus(spec) ) }
val jsonObject = if (acctGroup.account.isMastodon) {
/*
Mastodonの場合
@ -365,7 +372,8 @@ class StreamConnection(
} catch (ex: Throwable) {
log.e(ex, "send failed.")
} finally {
subscription[spec] = group
subscriptions[spec] = group
eachCallbackForSpec(spec) { it.onStreamStatusChanged(getStreamStatus(spec)) }
}
}
@ -377,18 +385,18 @@ class StreamConnection(
postMisskeyAlive()
if (spec != null) {
val group = acctGroup.keyGroups[spec]
if (group != null) subscribeIfChanged(group, subscription[spec])
if (group != null) subscribeIfChanged(group, subscriptions[spec])
} else {
val existsIds = HashSet<StreamSpec>()
// 購読するべきものを購読する
acctGroup.keyGroups.entries.forEach {
existsIds.add(it.key)
subscribeIfChanged(it.value, subscription[it.key])
subscribeIfChanged(it.value, subscriptions[it.key])
}
// 購読するべきでないものを購読解除する
subscription.entries.toList().forEach {
subscriptions.entries.toList().forEach {
if (!existsIds.contains(it.key)) unsubscribe(it.key)
}
}
@ -425,7 +433,7 @@ class StreamConnection(
else -> Unit //fall thru
}
subscription.clear()
subscriptions.clear()
socket.set(null)
synchronized(capturedId) {

View File

@ -1,21 +0,0 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.Column
// ストリーミング接続の状態
enum class StreamStatus {
Closed,
Connecting,
Open,
}
// インジケータの状態
enum class StreamIndicatorState {
NONE,
REGISTERED, // registered, but not listening
LISTENING,
}
fun Column.getStreamingStatus() =
app_state.streamManager.getStreamingStatus(access_info, internalId)
?: StreamIndicatorState.NONE

View File

@ -3,6 +3,7 @@ package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.api.TootParser
import jp.juggler.subwaytooter.api.entity.TootInstance
import jp.juggler.subwaytooter.table.SavedAccount
import jp.juggler.util.LogCategory
import java.util.concurrent.ConcurrentHashMap
@ -12,6 +13,9 @@ class StreamGroupAcct(
val account: SavedAccount,
var ti: TootInstance
) {
companion object{
private val log = LogCategory("StreamGroupAcct")
}
val parser: TootParser = TootParser(manager.appState.context, linkHelper = account)
val keyGroups = ConcurrentHashMap<StreamSpec, StreamGroup>()
@ -36,6 +40,21 @@ class StreamGroupAcct(
group.destinations[ dst.columnInternalId] = dst
}
private fun updateDestinations(){
destinations = ConcurrentHashMap<Int, StreamRelation>().apply {
keyGroups.values.forEach { group ->
group.destinations.values.forEach { relation ->
put(relation.columnInternalId, relation)
}
}
}
}
// マージではなく、作られてデータを追加された直後に呼ばれる
fun initialize() {
updateDestinations()
}
fun merge(newServer: StreamGroupAcct) {
// 新スペックの値をコピー
@ -58,11 +77,7 @@ class StreamGroupAcct(
}
}
this.destinations = ConcurrentHashMap<Int, StreamRelation>().apply {
keyGroups.values.forEach { group ->
group.destinations.values.forEach { spec -> put(spec.columnInternalId, spec) }
}
}
updateDestinations()
}
// このオブジェクトはもう使われなくなる
@ -76,14 +91,27 @@ class StreamGroupAcct(
keyGroups.clear()
}
private fun findConnection(streamSpec: StreamSpec?) =
mergedConnection ?: connections.values.firstOrNull { it.spec == streamSpec }
private fun findConnection(spec: StreamSpec) :StreamConnection? =
when(val mergedConnection = this.mergedConnection){
null ->when( val conn = connections[spec]){
null->{
log.w("findConnection: missing connection for ${spec.name}")
null
}
else -> conn
}
else-> mergedConnection
}
// ストリーミング接続インジケータ
fun getStreamingStatus(columnInternalId: Int): StreamIndicatorState? {
val spec = destinations[columnInternalId]?.spec
return findConnection(spec)?.getStreamingStatus(spec)
}
fun getStreamStatus(columnInternalId: Int): StreamStatus =
when( val spec = destinations[columnInternalId]?.spec ){
null ->{
log.w("getStreamStatus: missing destination for ${account.acct.pretty}")
StreamStatus.Missing
}
else-> findConnection(spec)?.getStreamStatus(spec) ?: StreamStatus.Missing
}
suspend fun updateConnection() {
@ -135,5 +163,7 @@ class StreamGroupAcct(
fun getConnection(internalId: Int)=
mergedConnection ?: destinations[internalId]?.spec?.let{ connections[it]}
}

View File

@ -13,7 +13,6 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.collections.HashMap
@ -76,14 +75,19 @@ class StreamManager(val appState: AppState) {
val accessInfo = column.access_info
if (column.is_dispose.get() || column.dont_streaming || accessInfo.isNA) continue
val server = prepareAcctGroup(accessInfo) ?: continue
val acctGroup = prepareAcctGroup(accessInfo) ?: continue
val streamSpec = column.getStreamDestination()
if (streamSpec != null)
server.addSpec(streamSpec)
acctGroup.addSpec(streamSpec)
}
}
if( newMap.size != acctGroups.size){
log.d("updateConnection: acctGroups.size changed. ${acctGroups.size} => ${newMap.size}")
}
// 新構成にないサーバは破棄する
acctGroups.entries.toList().forEach {
if (!newMap.containsKey(it.key)) {
@ -95,7 +99,7 @@ class StreamManager(val appState: AppState) {
// 追加.変更されたサーバをマージする
newMap.entries.forEach {
when (val current = acctGroups[it.key]) {
null -> acctGroups[it.key] = it.value
null -> acctGroups[it.key] = it.value.apply{ initialize() }
else -> current.merge(it.value)
}
}
@ -123,7 +127,7 @@ class StreamManager(val appState: AppState) {
//////////////////////////////////////////////////
// methods
fun enqueue(block: suspend () -> Unit) = runBlocking { queue.send(block) }
fun enqueue(block: suspend () -> Unit) = GlobalScope.launch(Dispatchers.Default) { queue.send(block) }
// UIスレッドから呼ばれる
fun updateStreamingColumns() {
@ -144,11 +148,23 @@ class StreamManager(val appState: AppState) {
// カラムヘッダの表示更新から、インジケータを取得するために呼ばれる
// UIスレッドから呼ばれる
fun getStreamingStatus(accessInfo: SavedAccount, columnInternalId: Int) =
acctGroups[accessInfo.acct]?.getStreamingStatus(columnInternalId)
fun getStreamStatus(column: Column) :StreamStatus=
when (val acctGroup =acctGroups[column.access_info.acct]) {
null -> {
log.w("getStreamStatus: missing acctGroup for ${column.access_info.acct}")
StreamStatus.Missing
}
else -> acctGroup.getStreamStatus(column.internalId)
}
fun getConnection(column: Column) =
acctGroups[column.access_info.acct]?.getConnection(column.internalId)
fun getConnection(column: Column) :StreamConnection? =
when (val acctGroup =acctGroups[column.access_info.acct]) {
null -> {
log.w("getConnection: missing acctGroup for ${column.access_info.acct}")
null
}
else -> acctGroup.getConnection(column.internalId)
}
////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,15 @@
package jp.juggler.subwaytooter.streaming
import jp.juggler.subwaytooter.Column
// ストリーミング接続の状態
enum class StreamStatus {
Missing,
Closed,
Connecting,
Open,
Subscribed
}
fun Column.getStreamingStatus() =
app_state.streamManager.getStreamStatus(this)