Merge pull request #6341 from artkoenig/develop
Fixed issues with reporting sync state events from different threads
This commit is contained in:
commit
c28e7c86d4
|
@ -0,0 +1 @@
|
|||
Fixed issues with reporting sync state events from different threads
|
|
@ -60,9 +60,9 @@ interface SyncService {
|
|||
fun getSyncStateLive(): LiveData<SyncState>
|
||||
|
||||
/**
|
||||
* Get the [SyncRequestState] as a LiveData.
|
||||
* Get the [SyncRequestState] as a SharedFlow.
|
||||
*/
|
||||
fun getSyncRequestStateLive(): LiveData<SyncRequestState>
|
||||
fun getSyncRequestStateFlow(): SharedFlow<SyncRequestState>
|
||||
|
||||
/**
|
||||
* This method returns a flow of SyncResponse. New value will be pushed through the sync thread.
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import androidx.lifecycle.LiveData
|
||||
import org.matrix.android.sdk.api.session.sync.SyncRequestState
|
||||
import org.matrix.android.sdk.api.session.sync.SyncService
|
||||
import org.matrix.android.sdk.internal.di.SessionId
|
||||
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||
|
@ -75,9 +73,7 @@ internal class DefaultSyncService @Inject constructor(
|
|||
|
||||
override fun getSyncState() = getSyncThread().currentState()
|
||||
|
||||
override fun getSyncRequestStateLive(): LiveData<SyncRequestState> {
|
||||
return syncRequestStateTracker.syncRequestState
|
||||
}
|
||||
override fun getSyncRequestStateFlow() = syncRequestStateTracker.syncRequestState
|
||||
|
||||
override fun hasAlreadySynced(): Boolean {
|
||||
return syncTokenStore.getLastToken() != null
|
||||
|
|
|
@ -16,23 +16,26 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import androidx.lifecycle.MutableLiveData
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.android.sdk.api.session.sync.InitialSyncStep
|
||||
import org.matrix.android.sdk.api.session.sync.SyncRequestState
|
||||
import org.matrix.android.sdk.internal.session.SessionScope
|
||||
import javax.inject.Inject
|
||||
|
||||
@SessionScope
|
||||
internal class SyncRequestStateTracker @Inject constructor() :
|
||||
ProgressReporter {
|
||||
internal class SyncRequestStateTracker @Inject constructor(
|
||||
private val coroutineScope: CoroutineScope
|
||||
) : ProgressReporter {
|
||||
|
||||
val syncRequestState = MutableLiveData<SyncRequestState>()
|
||||
val syncRequestState = MutableSharedFlow<SyncRequestState>()
|
||||
|
||||
private var rootTask: TaskInfo? = null
|
||||
|
||||
// Only to be used for incremental sync
|
||||
fun setSyncRequestState(newSyncRequestState: SyncRequestState.IncrementalSyncRequestState) {
|
||||
syncRequestState.postValue(newSyncRequestState)
|
||||
emitSyncState(newSyncRequestState)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -42,7 +45,9 @@ internal class SyncRequestStateTracker @Inject constructor() :
|
|||
initialSyncStep: InitialSyncStep,
|
||||
totalProgress: Int
|
||||
) {
|
||||
endAll()
|
||||
if (rootTask != null) {
|
||||
endAll()
|
||||
}
|
||||
rootTask = TaskInfo(initialSyncStep, totalProgress, null, 1F)
|
||||
reportProgress(0F)
|
||||
}
|
||||
|
@ -71,7 +76,7 @@ internal class SyncRequestStateTracker @Inject constructor() :
|
|||
// Update the progress of the leaf and all its parents
|
||||
leaf.setProgress(progress)
|
||||
// Then update the live data using leaf wording and root progress
|
||||
syncRequestState.postValue(SyncRequestState.InitialSyncProgressing(leaf.initialSyncStep, root.currentProgress.toInt()))
|
||||
emitSyncState(SyncRequestState.InitialSyncProgressing(leaf.initialSyncStep, root.currentProgress.toInt()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -86,13 +91,19 @@ internal class SyncRequestStateTracker @Inject constructor() :
|
|||
// And close it
|
||||
endedTask.parent.child = null
|
||||
} else {
|
||||
syncRequestState.postValue(SyncRequestState.Idle)
|
||||
emitSyncState(SyncRequestState.Idle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun endAll() {
|
||||
rootTask = null
|
||||
syncRequestState.postValue(SyncRequestState.Idle)
|
||||
emitSyncState(SyncRequestState.Idle)
|
||||
}
|
||||
|
||||
private fun emitSyncState(state: SyncRequestState) {
|
||||
coroutineScope.launch {
|
||||
syncRequestState.emit(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package im.vector.app
|
|||
|
||||
import androidx.lifecycle.DefaultLifecycleObserver
|
||||
import androidx.lifecycle.LifecycleOwner
|
||||
import androidx.lifecycle.asFlow
|
||||
import arrow.core.Option
|
||||
import im.vector.app.core.di.ActiveSessionHolder
|
||||
import im.vector.app.core.utils.BehaviorDataSource
|
||||
|
@ -114,8 +113,7 @@ class AppStateHandler @Inject constructor(
|
|||
}
|
||||
|
||||
private fun observeSyncStatus(session: Session) {
|
||||
session.syncService().getSyncRequestStateLive()
|
||||
.asFlow()
|
||||
session.syncService().getSyncRequestStateFlow()
|
||||
.filterIsInstance<SyncRequestState.IncrementalSyncDone>()
|
||||
.map { session.spaceService().getRootSpaceSummaries().size }
|
||||
.distinctUntilChanged()
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package im.vector.app
|
||||
|
||||
import android.content.SharedPreferences
|
||||
import androidx.lifecycle.asFlow
|
||||
import im.vector.app.core.di.ActiveSessionHolder
|
||||
import im.vector.app.features.rageshake.BugReporter
|
||||
import im.vector.app.features.rageshake.ReportType
|
||||
|
@ -261,8 +260,7 @@ class AutoRageShaker @Inject constructor(
|
|||
this.currentActiveSessionId = sessionId
|
||||
|
||||
hasSynced = session.syncService().hasAlreadySynced()
|
||||
session.syncService().getSyncRequestStateLive()
|
||||
.asFlow()
|
||||
session.syncService().getSyncRequestStateFlow()
|
||||
.onEach {
|
||||
hasSynced = it !is SyncRequestState.InitialSyncProgressing
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
package im.vector.app.features.analytics.accountdata
|
||||
|
||||
import androidx.lifecycle.asFlow
|
||||
import com.airbnb.mvrx.MavericksViewModelFactory
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
|
@ -66,7 +65,7 @@ class AnalyticsAccountDataViewModel @AssistedInject constructor(
|
|||
|
||||
private fun observeInitSync() {
|
||||
combine(
|
||||
session.syncService().getSyncRequestStateLive().asFlow(),
|
||||
session.syncService().getSyncRequestStateFlow(),
|
||||
analytics.getUserConsent(),
|
||||
analytics.getAnalyticsId()
|
||||
) { status, userConsent, analyticsId ->
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
package im.vector.app.features.home
|
||||
|
||||
import androidx.lifecycle.asFlow
|
||||
import com.airbnb.mvrx.Mavericks
|
||||
import com.airbnb.mvrx.MavericksViewModelFactory
|
||||
import com.airbnb.mvrx.ViewModelContext
|
||||
|
@ -218,8 +217,7 @@ class HomeActivityViewModel @AssistedInject constructor(
|
|||
private fun observeInitialSync() {
|
||||
val session = activeSessionHolder.getSafeActiveSession() ?: return
|
||||
|
||||
session.syncService().getSyncRequestStateLive()
|
||||
.asFlow()
|
||||
session.syncService().getSyncRequestStateFlow()
|
||||
.onEach { status ->
|
||||
when (status) {
|
||||
is SyncRequestState.Idle -> {
|
||||
|
|
|
@ -198,8 +198,7 @@ class HomeDetailViewModel @AssistedInject constructor(
|
|||
copy(syncState = syncState)
|
||||
}
|
||||
|
||||
session.syncService().getSyncRequestStateLive()
|
||||
.asFlow()
|
||||
session.syncService().getSyncRequestStateFlow()
|
||||
.filterIsInstance<SyncRequestState.IncrementalSyncRequestState>()
|
||||
.setOnEach {
|
||||
copy(incrementalSyncRequestState = it)
|
||||
|
|
|
@ -18,7 +18,6 @@ package im.vector.app.features.home.room.detail
|
|||
|
||||
import android.net.Uri
|
||||
import androidx.annotation.IdRes
|
||||
import androidx.lifecycle.asFlow
|
||||
import com.airbnb.mvrx.Async
|
||||
import com.airbnb.mvrx.Fail
|
||||
import com.airbnb.mvrx.Loading
|
||||
|
@ -1152,8 +1151,7 @@ class TimelineViewModel @AssistedInject constructor(
|
|||
copy(syncState = syncState)
|
||||
}
|
||||
|
||||
session.syncService().getSyncRequestStateLive()
|
||||
.asFlow()
|
||||
session.syncService().getSyncRequestStateFlow()
|
||||
.filterIsInstance<SyncRequestState.IncrementalSyncRequestState>()
|
||||
.setOnEach {
|
||||
copy(incrementalSyncRequestState = it)
|
||||
|
|
Loading…
Reference in New Issue