Workaround to fetch all the pending toDevice events from a Synapse homeserver (#4614)

Workaround to fetch all the pending toDevice events from a Synapse homeserver

Co-authored-by: Valere <valeref@matrix.org>
This commit is contained in:
Benoit Marty 2021-12-30 10:28:52 +01:00 committed by GitHub
parent 55c0f1fcb3
commit 3abba1932f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 21 deletions

1
changelog.d/4612.misc Normal file
View File

@ -0,0 +1 @@
Workaround to fetch all the pending toDevice events from a Synapse homeserver

View File

@ -429,7 +429,17 @@ internal class DefaultCryptoService @Inject constructor(
val currentCount = syncResponse.deviceOneTimeKeysCount.signedCurve25519 ?: 0 val currentCount = syncResponse.deviceOneTimeKeysCount.signedCurve25519 ?: 0
oneTimeKeysUploader.updateOneTimeKeyCount(currentCount) oneTimeKeysUploader.updateOneTimeKeyCount(currentCount)
} }
if (isStarted()) { // There is a limit of to_device events returned per sync.
// If we are in a case of such limited to_device sync we can't try to generate/upload
// new otk now, because there might be some pending olm pre-key to_device messages that would fail if we rotate
// the old otk too early. In this case we want to wait for the pending to_device before doing anything
// As per spec:
// If there is a large queue of send-to-device messages, the server should limit the number sent in each /sync response.
// 100 messages is recommended as a reasonable limit.
// The limit is not part of the spec, so it's probably safer to handle that when there are no more to_device ( so we are sure
// that there are no pending to_device
val toDevices = syncResponse.toDevice?.events.orEmpty()
if (isStarted() && toDevices.isEmpty()) {
// Make sure we process to-device messages before generating new one-time-keys #2782 // Make sure we process to-device messages before generating new one-time-keys #2782
deviceListManager.refreshOutdatedDeviceLists() deviceListManager.refreshOutdatedDeviceLists()
// The presence of device_unused_fallback_key_types indicates that the server supports fallback keys. // The presence of device_unused_fallback_key_types indicates that the server supports fallback keys.

View File

@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.Failure import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.isTokenError import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.logger.LoggerTag import org.matrix.android.sdk.api.logger.LoggerTag
@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var isStarted = false private var isStarted = false
private var isTokenValid = true private var isTokenValid = true
private var retryNoNetworkTask: TimerTask? = null private var retryNoNetworkTask: TimerTask? = null
private var previousSyncResponseHasToDevice = false
private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls -> private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls ->
if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) { if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) {
@ -171,12 +173,15 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (state !is SyncState.Running) { if (state !is SyncState.Running) {
updateStateTo(SyncState.Running(afterPause = true)) updateStateTo(SyncState.Running(afterPause = true))
} }
// No timeout after a pause val timeout = when {
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT } previousSyncResponseHasToDevice -> 0L /* Force timeout to 0 */
state.let { it is SyncState.Running && it.afterPause } -> 0L /* No timeout after a pause */
else -> DEFAULT_LONG_POOL_TIMEOUT
}
Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout") Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout, SyncPresence.Online) val params = SyncTask.Params(timeout, SyncPresence.Online)
val sync = syncScope.launch { val sync = syncScope.launch {
doSync(params) previousSyncResponseHasToDevice = doSync(params)
} }
runBlocking { runBlocking {
sync.join() sync.join()
@ -203,10 +208,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
} }
} }
private suspend fun doSync(params: SyncTask.Params) { /**
try { * Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(params: SyncTask.Params): Boolean {
return try {
val syncResponse = syncTask.execute(params) val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse) _syncFlow.emit(syncResponse)
syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} catch (failure: Throwable) { } catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) { if (failure is Failure.NetworkConnection) {
canReachServer = false canReachServer = false
@ -229,6 +238,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
delay(RETRY_WAIT_TIME_MS) delay(RETRY_WAIT_TIME_MS)
} }
} }
false
} finally { } finally {
state.let { state.let {
if (it is SyncState.Running && it.afterPause) { if (it is SyncState.Running && it.afterPause) {

View File

@ -20,6 +20,7 @@ import androidx.work.BackoffPolicy
import androidx.work.ExistingWorkPolicy import androidx.work.ExistingWorkPolicy
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.isTokenError import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.internal.SessionManager import org.matrix.android.sdk.internal.SessionManager
import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.di.WorkManagerProvider
@ -34,8 +35,8 @@ import timber.log.Timber
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import javax.inject.Inject import javax.inject.Inject
private const val DEFAULT_LONG_POOL_TIMEOUT = 6L private const val DEFAULT_LONG_POOL_TIMEOUT_SECONDS = 6L
private const val DEFAULT_DELAY_TIMEOUT = 30_000L private const val DEFAULT_DELAY_MILLIS = 30_000L
/** /**
* Possible previous worker: None * Possible previous worker: None
@ -47,9 +48,12 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters,
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT, // In seconds
val delay: Long = DEFAULT_DELAY_TIMEOUT, val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT_SECONDS,
// In milliseconds
val delay: Long = DEFAULT_DELAY_MILLIS,
val periodic: Boolean = false, val periodic: Boolean = false,
val forceImmediate: Boolean = false,
override val lastFailureMessage: String? = null override val lastFailureMessage: String? = null
) : SessionWorkerParams ) : SessionWorkerParams
@ -65,13 +69,26 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters,
Timber.i("Sync work starting") Timber.i("Sync work starting")
return runCatching { return runCatching {
doSync(params.timeout) doSync(if (params.forceImmediate) 0 else params.timeout)
}.fold( }.fold(
{ { hasToDeviceEvents ->
Result.success().also { Result.success().also {
if (params.periodic) { if (params.periodic) {
// we want to schedule another one after delay // we want to schedule another one after a delay, or immediately if hasToDeviceEvents
automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay) automaticallyBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = params.timeout,
delayInSeconds = params.delay,
forceImmediate = hasToDeviceEvents
)
} else if (hasToDeviceEvents) {
// Previous response has toDevice events, request an immediate sync request
requireBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = 0
)
} }
} }
}, },
@ -92,16 +109,29 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters,
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
} }
private suspend fun doSync(timeout: Long) { /**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(timeout: Long): Boolean {
val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline) val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline)
syncTask.execute(taskParams) val syncResponse = syncTask.execute(taskParams)
return syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} }
companion object { companion object {
private const val BG_SYNC_WORK_NAME = "BG_SYNCP" private const val BG_SYNC_WORK_NAME = "BG_SYNCP"
fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) { fun requireBackgroundSync(workManagerProvider: WorkManagerProvider,
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, 0L, false)) sessionId: String,
serverTimeoutInSeconds: Long = 0) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = 0L,
periodic = false
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>() val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints) .setConstraints(WorkManagerProvider.workConstraints)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS) .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
@ -111,13 +141,24 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters,
.enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest) .enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
} }
fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0, delayInSeconds: Long = 30) { fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider,
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, true)) sessionId: String,
serverTimeoutInSeconds: Long = 0,
delayInSeconds: Long = 30,
forceImmediate: Boolean = false) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = delayInSeconds,
forceImmediate = forceImmediate
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>() val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints) .setConstraints(WorkManagerProvider.workConstraints)
.setInputData(data) .setInputData(data)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS) .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
.setInitialDelay(delayInSeconds, TimeUnit.SECONDS) .setInitialDelay(if (forceImmediate) 0 else delayInSeconds, TimeUnit.SECONDS)
.build() .build()
// Avoid risking multiple chains of syncs by replacing the existing chain // Avoid risking multiple chains of syncs by replacing the existing chain
workManagerProvider.workManager workManagerProvider.workManager