Workaround to fetch all the pending toDevice events from a Synapse homeserver

This commit is contained in:
Benoit Marty 2021-12-02 00:23:41 +01:00
parent fa256cca25
commit 59db27ba97
2 changed files with 17 additions and 4 deletions

1
changelog.d/4612.misc Normal file
View File

@ -0,0 +1 @@
Workaround to fetch all the pending toDevice events from a Synapse homeserver

View File

@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.logger.LoggerTag
@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var isStarted = false
private var isTokenValid = true
private var retryNoNetworkTask: TimerTask? = null
private var previousSyncResponseHasToDevice = false
private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls ->
if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) {
@ -172,11 +174,16 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
updateStateTo(SyncState.Running(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
val timeout = when {
previousSyncResponseHasToDevice -> 0L
.also { Timber.tag(loggerTag.value).d("Force timeout to 0") }
state.let { it is SyncState.Running && it.afterPause } -> 0L
else -> DEFAULT_LONG_POOL_TIMEOUT
}
Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout, SyncPresence.Online)
val sync = syncScope.launch {
doSync(params)
previousSyncResponseHasToDevice = doSync(params)
}
runBlocking {
sync.join()
@ -203,10 +210,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
}
}
private suspend fun doSync(params: SyncTask.Params) {
try {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(params: SyncTask.Params): Boolean {
return try {
val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse)
syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) {
canReachServer = false
@ -229,6 +240,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
delay(RETRY_WAIT_TIME_MS)
}
}
false
} finally {
state.let {
if (it is SyncState.Running && it.afterPause) {