diff --git a/changelog.d/4612.misc b/changelog.d/4612.misc new file mode 100644 index 0000000000..43b5007b7e --- /dev/null +++ b/changelog.d/4612.misc @@ -0,0 +1 @@ +Workaround to fetch all the pending toDevice events from a Synapse homeserver \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt index 7d9c351410..9dd369f426 100755 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt @@ -429,7 +429,17 @@ internal class DefaultCryptoService @Inject constructor( val currentCount = syncResponse.deviceOneTimeKeysCount.signedCurve25519 ?: 0 oneTimeKeysUploader.updateOneTimeKeyCount(currentCount) } - if (isStarted()) { + // There is a limit of to_device events returned per sync. + // If we are in a case of such limited to_device sync we can't try to generate/upload + // new otk now, because there might be some pending olm pre-key to_device messages that would fail if we rotate + // the old otk too early. In this case we want to wait for the pending to_device before doing anything + // As per spec: + // If there is a large queue of send-to-device messages, the server should limit the number sent in each /sync response. + // 100 messages is recommended as a reasonable limit. + // The limit is not part of the spec, so it's probably safer to handle that when there are no more to_device ( so we are sure + // that there are no pending to_device + val toDevices = syncResponse.toDevice?.events.orEmpty() + if (isStarted() && toDevices.isEmpty()) { // Make sure we process to-device messages before generating new one-time-keys #2782 deviceListManager.refreshOutdatedDeviceLists() // The presence of device_unused_fallback_key_types indicates that the server supports fallback keys. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt index 3faa0c9488..b6ea7a68f7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt @@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.failure.Failure import org.matrix.android.sdk.api.failure.isTokenError import org.matrix.android.sdk.api.logger.LoggerTag @@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private var isStarted = false private var isTokenValid = true private var retryNoNetworkTask: TimerTask? = null + private var previousSyncResponseHasToDevice = false private val activeCallListObserver = Observer> { activeCalls -> if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) { @@ -171,12 +173,15 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, if (state !is SyncState.Running) { updateStateTo(SyncState.Running(afterPause = true)) } - // No timeout after a pause - val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT } + val timeout = when { + previousSyncResponseHasToDevice -> 0L /* Force timeout to 0 */ + state.let { it is SyncState.Running && it.afterPause } -> 0L /* No timeout after a pause */ + else -> DEFAULT_LONG_POOL_TIMEOUT + } Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout") val params = SyncTask.Params(timeout, SyncPresence.Online) val sync = syncScope.launch { - doSync(params) + previousSyncResponseHasToDevice = doSync(params) } runBlocking { sync.join() @@ -203,10 +208,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, } } - private suspend fun doSync(params: SyncTask.Params) { - try { + /** + * Will return true if the sync response contains some toDevice events. + */ + private suspend fun doSync(params: SyncTask.Params): Boolean { + return try { val syncResponse = syncTask.execute(params) _syncFlow.emit(syncResponse) + syncResponse.toDevice?.events?.isNotEmpty().orFalse() } catch (failure: Throwable) { if (failure is Failure.NetworkConnection) { canReachServer = false @@ -229,6 +238,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, delay(RETRY_WAIT_TIME_MS) } } + false } finally { state.let { if (it is SyncState.Running && it.afterPause) { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncWorker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncWorker.kt index 763cd55714..2f1241f4d8 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncWorker.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncWorker.kt @@ -20,6 +20,7 @@ import androidx.work.BackoffPolicy import androidx.work.ExistingWorkPolicy import androidx.work.WorkerParameters import com.squareup.moshi.JsonClass +import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.failure.isTokenError import org.matrix.android.sdk.internal.SessionManager import org.matrix.android.sdk.internal.di.WorkManagerProvider @@ -34,8 +35,8 @@ import timber.log.Timber import java.util.concurrent.TimeUnit import javax.inject.Inject -private const val DEFAULT_LONG_POOL_TIMEOUT = 6L -private const val DEFAULT_DELAY_TIMEOUT = 30_000L +private const val DEFAULT_LONG_POOL_TIMEOUT_SECONDS = 6L +private const val DEFAULT_DELAY_MILLIS = 30_000L /** * Possible previous worker: None @@ -47,9 +48,12 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters, @JsonClass(generateAdapter = true) internal data class Params( override val sessionId: String, - val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT, - val delay: Long = DEFAULT_DELAY_TIMEOUT, + // In seconds + val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT_SECONDS, + // In milliseconds + val delay: Long = DEFAULT_DELAY_MILLIS, val periodic: Boolean = false, + val forceImmediate: Boolean = false, override val lastFailureMessage: String? = null ) : SessionWorkerParams @@ -65,13 +69,26 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters, Timber.i("Sync work starting") return runCatching { - doSync(params.timeout) + doSync(if (params.forceImmediate) 0 else params.timeout) }.fold( - { + { hasToDeviceEvents -> Result.success().also { if (params.periodic) { - // we want to schedule another one after delay - automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay) + // we want to schedule another one after a delay, or immediately if hasToDeviceEvents + automaticallyBackgroundSync( + workManagerProvider = workManagerProvider, + sessionId = params.sessionId, + serverTimeoutInSeconds = params.timeout, + delayInSeconds = params.delay, + forceImmediate = hasToDeviceEvents + ) + } else if (hasToDeviceEvents) { + // Previous response has toDevice events, request an immediate sync request + requireBackgroundSync( + workManagerProvider = workManagerProvider, + sessionId = params.sessionId, + serverTimeoutInSeconds = 0 + ) } } }, @@ -92,16 +109,29 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters, return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) } - private suspend fun doSync(timeout: Long) { + /** + * Will return true if the sync response contains some toDevice events. + */ + private suspend fun doSync(timeout: Long): Boolean { val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline) - syncTask.execute(taskParams) + val syncResponse = syncTask.execute(taskParams) + return syncResponse.toDevice?.events?.isNotEmpty().orFalse() } companion object { private const val BG_SYNC_WORK_NAME = "BG_SYNCP" - fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) { - val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, 0L, false)) + fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, + sessionId: String, + serverTimeoutInSeconds: Long = 0) { + val data = WorkerParamsFactory.toData( + Params( + sessionId = sessionId, + timeout = serverTimeoutInSeconds, + delay = 0L, + periodic = false + ) + ) val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder() .setConstraints(WorkManagerProvider.workConstraints) .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS) @@ -111,13 +141,24 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters, .enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest) } - fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0, delayInSeconds: Long = 30) { - val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, true)) + fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, + sessionId: String, + serverTimeoutInSeconds: Long = 0, + delayInSeconds: Long = 30, + forceImmediate: Boolean = false) { + val data = WorkerParamsFactory.toData( + Params( + sessionId = sessionId, + timeout = serverTimeoutInSeconds, + delay = delayInSeconds, + forceImmediate = forceImmediate + ) + ) val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder() .setConstraints(WorkManagerProvider.workConstraints) .setInputData(data) .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS) - .setInitialDelay(delayInSeconds, TimeUnit.SECONDS) + .setInitialDelay(if (forceImmediate) 0 else delayInSeconds, TimeUnit.SECONDS) .build() // Avoid risking multiple chains of syncs by replacing the existing chain workManagerProvider.workManager