From 4a74f5851626d9ae0f6578c0f8c22b3312cabfe3 Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 5 Aug 2019 17:27:40 +0200 Subject: [PATCH] Task: use a builder with DSL and introduce Constraints (only boolean connectedToNetwork at the moment) --- .../android/internal/crypto/CryptoManager.kt | 28 +- .../crypto/OutgoingRoomKeyRequestManager.kt | 10 +- .../internal/crypto/keysbackup/KeysBackup.kt | 350 +++++++++--------- .../DefaultSasVerificationService.kt | 19 +- .../SASVerificationTransaction.kt | 32 +- .../network/NetworkConnectivityChecker.kt | 25 +- .../session/cache/DefaultCacheService.kt | 7 +- .../notification/DefaultPushRuleService.kt | 5 +- .../session/pushers/DefaultPusherService.kt | 8 +- .../session/pushers/RemovePusherTask.kt | 12 +- .../room/DefaultRoomDirectoryService.kt | 16 +- .../session/room/DefaultRoomService.kt | 10 +- .../membership/DefaultMembershipService.kt | 24 +- .../session/room/read/DefaultReadService.kt | 26 +- .../room/relation/DefaultRelationService.kt | 46 +-- .../session/room/state/DefaultStateService.kt | 6 +- .../session/room/timeline/DefaultTimeline.kt | 38 +- .../session/signout/DefaultSignOutService.kt | 7 +- .../internal/session/sync/job/SyncService.kt | 97 +++-- .../internal/session/sync/job/SyncThread.kt | 60 +-- .../session/user/DefaultUserService.kt | 7 +- .../android/internal/task/ConfigurableTask.kt | 65 ++-- .../android/internal/task/TaskConstraints.kt | 22 ++ .../android/internal/task/TaskExecutor.kt | 12 +- 24 files changed, 528 insertions(+), 404 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskConstraints.kt diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/CryptoManager.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/CryptoManager.kt index 136f97081f..96d337bd2d 100755 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/CryptoManager.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/CryptoManager.kt @@ -72,7 +72,6 @@ import im.vector.matrix.android.internal.session.room.membership.RoomMembers import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.configureWith -import im.vector.matrix.android.internal.task.toConfigurableTask import im.vector.matrix.android.internal.util.JsonCanonicalizer import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import im.vector.matrix.android.internal.util.fetchCopied @@ -167,22 +166,25 @@ internal class CryptoManager @Inject constructor( override fun setDeviceName(deviceId: String, deviceName: String, callback: MatrixCallback) { setDeviceNameTask - .configureWith(SetDeviceNameTask.Params(deviceId, deviceName)) - .dispatchTo(callback) + .configureWith(SetDeviceNameTask.Params(deviceId, deviceName)) { + this.callback = callback + } .executeBy(taskExecutor) } override fun deleteDevice(deviceId: String, callback: MatrixCallback) { deleteDeviceTask - .configureWith(DeleteDeviceTask.Params(deviceId)) - .dispatchTo(callback) + .configureWith(DeleteDeviceTask.Params(deviceId)) { + this.callback = callback + } .executeBy(taskExecutor) } override fun deleteDeviceWithUserPassword(deviceId: String, authSession: String?, password: String, callback: MatrixCallback) { deleteDeviceWithUserPasswordTask - .configureWith(DeleteDeviceWithUserPasswordTask.Params(deviceId, authSession, password)) - .dispatchTo(callback) + .configureWith(DeleteDeviceWithUserPasswordTask.Params(deviceId, authSession, password)) { + this.callback = callback + } .executeBy(taskExecutor) } @@ -196,8 +198,9 @@ internal class CryptoManager @Inject constructor( override fun getDevicesList(callback: MatrixCallback) { getDevicesTask - .toConfigurableTask() - .dispatchTo(callback) + .configureWith { + this.callback = callback + } .executeBy(taskExecutor) } @@ -283,7 +286,7 @@ internal class CryptoManager @Inject constructor( /** * Close the crypto */ - fun close() = runBlocking(coroutineDispatchers.crypto){ + fun close() = runBlocking(coroutineDispatchers.crypto) { olmDevice.release() cryptoStore.close() outgoingRoomKeyRequestManager.stop() @@ -1046,8 +1049,9 @@ internal class CryptoManager @Inject constructor( override fun clearCryptoCache(callback: MatrixCallback) { clearCryptoDataTask - .toConfigurableTask() - .dispatchTo(callback) + .configureWith { + this.callback = callback + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/OutgoingRoomKeyRequestManager.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/OutgoingRoomKeyRequestManager.kt index a13ae75bca..c0702f70e5 100755 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/OutgoingRoomKeyRequestManager.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/OutgoingRoomKeyRequestManager.kt @@ -299,10 +299,12 @@ internal class OutgoingRoomKeyRequestManager @Inject constructor( // TODO Change this two hard coded key to something better contentMap.setObject(recipient["userId"], recipient["deviceId"], message) } - sendToDeviceTask.configureWith(SendToDeviceTask.Params(EventType.ROOM_KEY_REQUEST, contentMap, transactionId)) - .dispatchTo(callback) - .executeOn(TaskThread.CALLER) - .callbackOn(TaskThread.CALLER) + sendToDeviceTask + .configureWith(SendToDeviceTask.Params(EventType.ROOM_KEY_REQUEST, contentMap, transactionId)) { + this.callback = callback + this.callbackThread = TaskThread.CALLER + this.executionThread = TaskThread.CALLER + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/keysbackup/KeysBackup.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/keysbackup/KeysBackup.kt index 2e9f84bd58..a69a732567 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/keysbackup/KeysBackup.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/keysbackup/KeysBackup.kt @@ -51,7 +51,10 @@ import im.vector.matrix.android.internal.crypto.store.db.model.KeysBackupDataEnt import im.vector.matrix.android.internal.di.MoshiProvider import im.vector.matrix.android.internal.extensions.foldToCallback import im.vector.matrix.android.internal.session.SessionScope -import im.vector.matrix.android.internal.task.* +import im.vector.matrix.android.internal.task.Task +import im.vector.matrix.android.internal.task.TaskExecutor +import im.vector.matrix.android.internal.task.TaskThread +import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.JsonCanonicalizer import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import kotlinx.coroutines.GlobalScope @@ -200,31 +203,32 @@ internal class KeysBackup @Inject constructor( keysBackupStateManager.state = KeysBackupState.Enabling createKeysBackupVersionTask - .configureWith(createKeysBackupVersionBody) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(info: KeysVersion) { - // Reset backup markers. - cryptoStore.resetBackupMarkers() + .configureWith(createKeysBackupVersionBody) { + this.callback = object : MatrixCallback { + override fun onSuccess(info: KeysVersion) { + // Reset backup markers. + cryptoStore.resetBackupMarkers() - val keyBackupVersion = KeysVersionResult() - keyBackupVersion.algorithm = createKeysBackupVersionBody.algorithm - keyBackupVersion.authData = createKeysBackupVersionBody.authData - keyBackupVersion.version = info.version + val keyBackupVersion = KeysVersionResult() + keyBackupVersion.algorithm = createKeysBackupVersionBody.algorithm + keyBackupVersion.authData = createKeysBackupVersionBody.authData + keyBackupVersion.version = info.version - // We can consider that the server does not have keys yet - keyBackupVersion.count = 0 - keyBackupVersion.hash = null + // We can consider that the server does not have keys yet + keyBackupVersion.count = 0 + keyBackupVersion.hash = null - enableKeysBackup(keyBackupVersion) + enableKeysBackup(keyBackupVersion) - callback.onSuccess(info) + callback.onSuccess(info) + } + + override fun onFailure(failure: Throwable) { + keysBackupStateManager.state = KeysBackupState.Disabled + callback.onFailure(failure) + } } - - override fun onFailure(failure: Throwable) { - keysBackupStateManager.state = KeysBackupState.Disabled - callback.onFailure(failure) - } - }) + } .executeBy(taskExecutor) } @@ -239,27 +243,29 @@ internal class KeysBackup @Inject constructor( keysBackupStateManager.state = KeysBackupState.Unknown } - deleteBackupTask.configureWith(DeleteBackupTask.Params(version)) - .dispatchTo(object : MatrixCallback { - private fun eventuallyRestartBackup() { - // Do not stay in KeysBackupState.Unknown but check what is available on the homeserver - if (state == KeysBackupState.Unknown) { - checkAndStartKeysBackup() + deleteBackupTask + .configureWith(DeleteBackupTask.Params(version)) { + this.callback = object : MatrixCallback { + private fun eventuallyRestartBackup() { + // Do not stay in KeysBackupState.Unknown but check what is available on the homeserver + if (state == KeysBackupState.Unknown) { + checkAndStartKeysBackup() + } + } + + override fun onSuccess(data: Unit) { + eventuallyRestartBackup() + + uiHandler.post { callback?.onSuccess(Unit) } + } + + override fun onFailure(failure: Throwable) { + eventuallyRestartBackup() + + uiHandler.post { callback?.onFailure(failure) } } } - - override fun onSuccess(data: Unit) { - eventuallyRestartBackup() - - uiHandler.post { callback?.onSuccess(Unit) } - } - - override fun onFailure(failure: Throwable) { - eventuallyRestartBackup() - - uiHandler.post { callback?.onFailure(failure) } - } - }) + } .executeBy(taskExecutor) } } @@ -355,9 +361,10 @@ internal class KeysBackup @Inject constructor( return getKeysBackupTrustBg(params) } } - .configureWith(keysBackupVersion) - .dispatchTo(callback) - .executeOn(TaskThread.COMPUTATION) + .configureWith(keysBackupVersion) { + this.callback = callback + this.executionThread = TaskThread.COMPUTATION + } .executeBy(taskExecutor) } @@ -487,27 +494,28 @@ internal class KeysBackup @Inject constructor( // And send it to the homeserver updateKeysBackupVersionTask - .configureWith(UpdateKeysBackupVersionTask.Params(keysBackupVersion.version!!, updateKeysBackupVersionBody)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: Unit) { - // Relaunch the state machine on this updated backup version - val newKeysBackupVersion = KeysVersionResult() + .configureWith(UpdateKeysBackupVersionTask.Params(keysBackupVersion.version!!, updateKeysBackupVersionBody)) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: Unit) { + // Relaunch the state machine on this updated backup version + val newKeysBackupVersion = KeysVersionResult() - newKeysBackupVersion.version = keysBackupVersion.version - newKeysBackupVersion.algorithm = keysBackupVersion.algorithm - newKeysBackupVersion.count = keysBackupVersion.count - newKeysBackupVersion.hash = keysBackupVersion.hash - newKeysBackupVersion.authData = updateKeysBackupVersionBody.authData + newKeysBackupVersion.version = keysBackupVersion.version + newKeysBackupVersion.algorithm = keysBackupVersion.algorithm + newKeysBackupVersion.count = keysBackupVersion.count + newKeysBackupVersion.hash = keysBackupVersion.hash + newKeysBackupVersion.authData = updateKeysBackupVersionBody.authData - checkAndStartWithKeysBackupVersion(newKeysBackupVersion) + checkAndStartWithKeysBackupVersion(newKeysBackupVersion) - callback.onSuccess(data) + callback.onSuccess(data) + } + + override fun onFailure(failure: Throwable) { + callback.onFailure(failure) + } } - - override fun onFailure(failure: Throwable) { - callback.onFailure(failure) - } - }) + } .executeBy(taskExecutor) } } @@ -753,49 +761,52 @@ internal class KeysBackup @Inject constructor( if (roomId != null && sessionId != null) { // Get key for the room and for the session getRoomSessionDataTask - .configureWith(GetRoomSessionDataTask.Params(roomId, sessionId, version)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: KeyBackupData) { - // Convert to KeysBackupData - val keysBackupData = KeysBackupData() - keysBackupData.roomIdToRoomKeysBackupData = HashMap() - val roomKeysBackupData = RoomKeysBackupData() - roomKeysBackupData.sessionIdToKeyBackupData = HashMap() - roomKeysBackupData.sessionIdToKeyBackupData[sessionId] = data - keysBackupData.roomIdToRoomKeysBackupData[roomId] = roomKeysBackupData + .configureWith(GetRoomSessionDataTask.Params(roomId, sessionId, version)) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: KeyBackupData) { + // Convert to KeysBackupData + val keysBackupData = KeysBackupData() + keysBackupData.roomIdToRoomKeysBackupData = HashMap() + val roomKeysBackupData = RoomKeysBackupData() + roomKeysBackupData.sessionIdToKeyBackupData = HashMap() + roomKeysBackupData.sessionIdToKeyBackupData[sessionId] = data + keysBackupData.roomIdToRoomKeysBackupData[roomId] = roomKeysBackupData - callback.onSuccess(keysBackupData) - } + callback.onSuccess(keysBackupData) + } - override fun onFailure(failure: Throwable) { - callback.onFailure(failure) + override fun onFailure(failure: Throwable) { + callback.onFailure(failure) + } } - }) + } .executeBy(taskExecutor) } else if (roomId != null) { // Get all keys for the room getRoomSessionsDataTask - .configureWith(GetRoomSessionsDataTask.Params(roomId, version)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: RoomKeysBackupData) { - // Convert to KeysBackupData - val keysBackupData = KeysBackupData() - keysBackupData.roomIdToRoomKeysBackupData = HashMap() - keysBackupData.roomIdToRoomKeysBackupData[roomId] = data + .configureWith(GetRoomSessionsDataTask.Params(roomId, version)) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: RoomKeysBackupData) { + // Convert to KeysBackupData + val keysBackupData = KeysBackupData() + keysBackupData.roomIdToRoomKeysBackupData = HashMap() + keysBackupData.roomIdToRoomKeysBackupData[roomId] = data - callback.onSuccess(keysBackupData) - } + callback.onSuccess(keysBackupData) + } - override fun onFailure(failure: Throwable) { - callback.onFailure(failure) + override fun onFailure(failure: Throwable) { + callback.onFailure(failure) + } } - }) + } .executeBy(taskExecutor) } else { // Get all keys getSessionsDataTask - .configureWith(GetSessionsDataTask.Params(version)) - .dispatchTo(callback) + .configureWith(GetSessionsDataTask.Params(version)) { + this.callback = callback + } .executeBy(taskExecutor) } } @@ -850,45 +861,47 @@ internal class KeysBackup @Inject constructor( override fun getVersion(version: String, callback: MatrixCallback) { getKeysBackupVersionTask - .configureWith(version) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: KeysVersionResult) { - callback.onSuccess(data) - } + .configureWith(version) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: KeysVersionResult) { + callback.onSuccess(data) + } - override fun onFailure(failure: Throwable) { - if (failure is Failure.ServerError - && failure.error.code == MatrixError.NOT_FOUND) { - // Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup - callback.onSuccess(null) - } else { - // Transmit the error - callback.onFailure(failure) + override fun onFailure(failure: Throwable) { + if (failure is Failure.ServerError + && failure.error.code == MatrixError.NOT_FOUND) { + // Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup + callback.onSuccess(null) + } else { + // Transmit the error + callback.onFailure(failure) + } } } - }) + } .executeBy(taskExecutor) } override fun getCurrentVersion(callback: MatrixCallback) { getKeysBackupLastVersionTask - .toConfigurableTask() - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: KeysVersionResult) { - callback.onSuccess(data) - } + .configureWith { + this.callback = object : MatrixCallback { + override fun onSuccess(data: KeysVersionResult) { + callback.onSuccess(data) + } - override fun onFailure(failure: Throwable) { - if (failure is Failure.ServerError - && failure.error.code == MatrixError.NOT_FOUND) { - // Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup - callback.onSuccess(null) - } else { - // Transmit the error - callback.onFailure(failure) + override fun onFailure(failure: Throwable) { + if (failure is Failure.ServerError + && failure.error.code == MatrixError.NOT_FOUND) { + // Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup + callback.onSuccess(null) + } else { + // Transmit the error + callback.onFailure(failure) + } } } - }) + } .executeBy(taskExecutor) } @@ -1231,69 +1244,72 @@ internal class KeysBackup @Inject constructor( Timber.v("backupKeys: 4 - Sending request") - // Make the request - storeSessionDataTask - .configureWith(StoreSessionsDataTask.Params(keysBackupVersion!!.version!!, keysBackupData)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: BackupKeysResult) { - uiHandler.post { - Timber.v("backupKeys: 5a - Request complete") + val sendingRequestCallback = object : MatrixCallback { + override fun onSuccess(data: BackupKeysResult) { + uiHandler.post { + Timber.v("backupKeys: 5a - Request complete") - // Mark keys as backed up - cryptoStore.markBackupDoneForInboundGroupSessions(olmInboundGroupSessionWrappers) + // Mark keys as backed up + cryptoStore.markBackupDoneForInboundGroupSessions(olmInboundGroupSessionWrappers) - if (olmInboundGroupSessionWrappers.size < KEY_BACKUP_SEND_KEYS_MAX_COUNT) { - Timber.v("backupKeys: All keys have been backed up") - onServerDataRetrieved(data.count, data.hash) + if (olmInboundGroupSessionWrappers.size < KEY_BACKUP_SEND_KEYS_MAX_COUNT) { + Timber.v("backupKeys: All keys have been backed up") + onServerDataRetrieved(data.count, data.hash) - // Note: Changing state will trigger the call to backupAllGroupSessionsCallback.onSuccess() - keysBackupStateManager.state = KeysBackupState.ReadyToBackUp - } else { - Timber.v("backupKeys: Continue to back up keys") - keysBackupStateManager.state = KeysBackupState.WillBackUp + // Note: Changing state will trigger the call to backupAllGroupSessionsCallback.onSuccess() + keysBackupStateManager.state = KeysBackupState.ReadyToBackUp + } else { + Timber.v("backupKeys: Continue to back up keys") + keysBackupStateManager.state = KeysBackupState.WillBackUp - backupKeys() - } - } + backupKeys() } + } + } - override fun onFailure(failure: Throwable) { - if (failure is Failure.ServerError) { - uiHandler.post { - Timber.e(failure, "backupKeys: backupKeys failed.") + override fun onFailure(failure: Throwable) { + if (failure is Failure.ServerError) { + uiHandler.post { + Timber.e(failure, "backupKeys: backupKeys failed.") - when (failure.error.code) { - MatrixError.NOT_FOUND, - MatrixError.WRONG_ROOM_KEYS_VERSION -> { - // Backup has been deleted on the server, or we are not using the last backup version - keysBackupStateManager.state = KeysBackupState.WrongBackUpVersion - backupAllGroupSessionsCallback?.onFailure(failure) - resetBackupAllGroupSessionsListeners() - resetKeysBackupData() - keysBackupVersion = null - - // Do not stay in KeysBackupState.WrongBackUpVersion but check what is available on the homeserver - checkAndStartKeysBackup() - } - else -> - // Come back to the ready state so that we will retry on the next received key - keysBackupStateManager.state = KeysBackupState.ReadyToBackUp - } - } - } else { - uiHandler.post { + when (failure.error.code) { + MatrixError.NOT_FOUND, + MatrixError.WRONG_ROOM_KEYS_VERSION -> { + // Backup has been deleted on the server, or we are not using the last backup version + keysBackupStateManager.state = KeysBackupState.WrongBackUpVersion backupAllGroupSessionsCallback?.onFailure(failure) resetBackupAllGroupSessionsListeners() + resetKeysBackupData() + keysBackupVersion = null - Timber.e("backupKeys: backupKeys failed.") - - // Retry a bit later - keysBackupStateManager.state = KeysBackupState.ReadyToBackUp - maybeBackupKeys() + // Do not stay in KeysBackupState.WrongBackUpVersion but check what is available on the homeserver + checkAndStartKeysBackup() } + else -> + // Come back to the ready state so that we will retry on the next received key + keysBackupStateManager.state = KeysBackupState.ReadyToBackUp } } - }) + } else { + uiHandler.post { + backupAllGroupSessionsCallback?.onFailure(failure) + resetBackupAllGroupSessionsListeners() + + Timber.e("backupKeys: backupKeys failed.") + + // Retry a bit later + keysBackupStateManager.state = KeysBackupState.ReadyToBackUp + maybeBackupKeys() + } + } + } + } + + // Make the request + storeSessionDataTask + .configureWith(StoreSessionsDataTask.Params(keysBackupVersion!!.version!!, keysBackupData)){ + this.callback = sendingRequestCallback + } .executeBy(taskExecutor) } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/DefaultSasVerificationService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/DefaultSasVerificationService.kt index 357328053a..6eea9ab543 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/DefaultSasVerificationService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/DefaultSasVerificationService.kt @@ -37,6 +37,7 @@ import im.vector.matrix.android.internal.crypto.model.rest.* import im.vector.matrix.android.internal.crypto.store.IMXCryptoStore import im.vector.matrix.android.internal.crypto.tasks.SendToDeviceTask import im.vector.matrix.android.internal.session.SessionScope +import im.vector.matrix.android.internal.task.TaskConstraints import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers @@ -415,16 +416,18 @@ internal class DefaultSasVerificationService @Inject constructor(private val cre val contentMap = MXUsersDevicesMap() contentMap.setObject(userId, userDevice, cancelMessage) - sendToDeviceTask.configureWith(SendToDeviceTask.Params(EventType.KEY_VERIFICATION_CANCEL, contentMap, transactionId)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: Unit) { - Timber.v("## SAS verification [$transactionId] canceled for reason ${code.value}") - } + sendToDeviceTask + .configureWith(SendToDeviceTask.Params(EventType.KEY_VERIFICATION_CANCEL, contentMap, transactionId)) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: Unit) { + Timber.v("## SAS verification [$transactionId] canceled for reason ${code.value}") + } - override fun onFailure(failure: Throwable) { - Timber.e(failure, "## SAS verification [$transactionId] failed to cancel.") + override fun onFailure(failure: Throwable) { + Timber.e(failure, "## SAS verification [$transactionId] failed to cancel.") + } } - }) + } .executeBy(taskExecutor) } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/SASVerificationTransaction.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/SASVerificationTransaction.kt index 8b179c1d1b..de4b997410 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/SASVerificationTransaction.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/crypto/verification/SASVerificationTransaction.kt @@ -287,23 +287,25 @@ internal abstract class SASVerificationTransaction( val contentMap = MXUsersDevicesMap() contentMap.setObject(otherUserId, otherDeviceId, keyToDevice) - sendToDeviceTask.configureWith(SendToDeviceTask.Params(type, contentMap, transactionId)) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: Unit) { - Timber.v("## SAS verification [$transactionId] toDevice type '$type' success.") - if (onDone != null) { - onDone() - } else { - state = nextState + sendToDeviceTask + .configureWith(SendToDeviceTask.Params(type, contentMap, transactionId)) { + this.callback = object : MatrixCallback { + override fun onSuccess(data: Unit) { + Timber.v("## SAS verification [$transactionId] toDevice type '$type' success.") + if (onDone != null) { + onDone() + } else { + state = nextState + } + } + + override fun onFailure(failure: Throwable) { + Timber.e("## SAS verification [$transactionId] failed to send toDevice in state : $state") + + cancel(onErrorReason) } } - - override fun onFailure(failure: Throwable) { - Timber.e("## SAS verification [$transactionId] failed to send toDevice in state : $state") - - cancel(onErrorReason) - } - }) + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt index 9b5da44bbb..05dbe4c016 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt @@ -24,6 +24,8 @@ import timber.log.Timber import java.util.* import javax.inject.Inject import kotlin.collections.ArrayList +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine @MatrixScope internal class NetworkConnectivityChecker @Inject constructor(context: Context) { @@ -34,24 +36,41 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context) .build(context) private val merlinsBeard = MerlinsBeard.Builder().build(context) - private val listeners = Collections.synchronizedList(ArrayList()) + private val listeners = ArrayList() init { merlin.bind() merlin.registerDisconnectable { Timber.v("On Disconnect") - listeners.forEach { + val localListeners = Collections.synchronizedList(listeners) + localListeners.forEach { it.onDisconnect() } } merlin.registerConnectable { Timber.v("On Connect") - listeners.forEach { + val localListeners = Collections.synchronizedList(listeners) + localListeners.forEach { it.onConnect() } } } + suspend fun waitUntilConnected() { + if (isConnected()) { + return + } else { + suspendCoroutine { continuation -> + register(object : Listener { + override fun onConnect() { + unregister(this) + continuation.resume(Unit) + } + }) + } + } + } + fun register(listener: Listener) { listeners.add(listener) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/cache/DefaultCacheService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/cache/DefaultCacheService.kt index 336a7a63e3..12684965b7 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/cache/DefaultCacheService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/cache/DefaultCacheService.kt @@ -20,7 +20,7 @@ import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.session.cache.CacheService import im.vector.matrix.android.internal.di.SessionDatabase import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.task.toConfigurableTask +import im.vector.matrix.android.internal.task.configureWith import javax.inject.Inject internal class DefaultCacheService @Inject constructor(@SessionDatabase @@ -30,8 +30,9 @@ internal class DefaultCacheService @Inject constructor(@SessionDatabase override fun clearCache(callback: MatrixCallback) { taskExecutor.cancelAll() clearCacheTask - .toConfigurableTask() - .dispatchTo(callback) + .configureWith { + this.callback = callback + } .executeBy(taskExecutor) } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/notification/DefaultPushRuleService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/notification/DefaultPushRuleService.kt index 7cddebdeaf..83b89701b3 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/notification/DefaultPushRuleService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/notification/DefaultPushRuleService.kt @@ -83,9 +83,10 @@ internal class DefaultPushRuleService @Inject constructor( override fun updatePushRuleEnableStatus(kind: String, pushRule: PushRule, enabled: Boolean, callback: MatrixCallback): Cancelable { return updatePushRuleEnableStatusTask - .configureWith(UpdatePushRuleEnableStatusTask.Params(kind, pushRule, enabled)) + .configureWith(UpdatePushRuleEnableStatusTask.Params(kind, pushRule, enabled)) { + this.callback = callback + } // TODO Fetch the rules - .dispatchTo(callback) .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/DefaultPusherService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/DefaultPusherService.kt index 5d9af0d34a..1c03a4f741 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/DefaultPusherService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/DefaultPusherService.kt @@ -29,7 +29,6 @@ import im.vector.matrix.android.internal.database.model.PusherEntity import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.configureWith -import im.vector.matrix.android.internal.task.toConfigurableTask import im.vector.matrix.android.internal.worker.WorkManagerUtil import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder import im.vector.matrix.android.internal.worker.WorkerParamsFactory @@ -50,7 +49,7 @@ internal class DefaultPusherService @Inject constructor( override fun refreshPushers() { getPusherTask - .toConfigurableTask() + .configureWith() .executeBy(taskExecutor) } @@ -85,8 +84,9 @@ internal class DefaultPusherService @Inject constructor( override fun removeHttpPusher(pushkey: String, appId: String, callback: MatrixCallback) { val params = RemovePusherTask.Params(sessionParam.credentials.userId, pushkey, appId) removePusherTask - .configureWith(params) - .dispatchTo(callback) + .configureWith(params) { + this.callback = callback + } //.enableRetry() ?? .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/RemovePusherTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/RemovePusherTask.kt index 722700fcf2..0ed7175e9a 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/RemovePusherTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/pushers/RemovePusherTask.kt @@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.pushers import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.api.session.pushers.PusherState -import im.vector.matrix.android.internal.database.awaitTransaction import im.vector.matrix.android.internal.database.mapper.asDomain import im.vector.matrix.android.internal.database.model.PusherEntity import im.vector.matrix.android.internal.database.query.where @@ -40,12 +39,13 @@ internal class DefaultRemovePusherTask @Inject constructor( ) : RemovePusherTask { override suspend fun execute(params: RemovePusherTask.Params) { - val existing = Realm.getInstance(monarchy.realmConfiguration).use { realm -> + monarchy.awaitTransaction { realm -> val existingEntity = PusherEntity.where(realm, params.userId, params.pushKey).findFirst() - realm.awaitTransaction { - existingEntity?.state = PusherState.UNREGISTERING - } - existingEntity?.asDomain() + existingEntity?.state = PusherState.UNREGISTERING + } + + val existing = Realm.getInstance(monarchy.realmConfiguration).use { realm -> + PusherEntity.where(realm, params.userId, params.pushKey).findFirst()?.asDomain() } ?: throw Exception("No existing pusher") val deleteBody = JsonPusher( diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomDirectoryService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomDirectoryService.kt index 158802f86c..4aeace70ed 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomDirectoryService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomDirectoryService.kt @@ -27,7 +27,6 @@ import im.vector.matrix.android.internal.session.room.directory.GetThirdPartyPro import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.configureWith -import im.vector.matrix.android.internal.task.toConfigurableTask import javax.inject.Inject internal class DefaultRoomDirectoryService @Inject constructor(private val getPublicRoomTask: GetPublicRoomTask, @@ -39,22 +38,25 @@ internal class DefaultRoomDirectoryService @Inject constructor(private val getPu publicRoomsParams: PublicRoomsParams, callback: MatrixCallback): Cancelable { return getPublicRoomTask - .configureWith(GetPublicRoomTask.Params(server, publicRoomsParams)) - .dispatchTo(callback) + .configureWith(GetPublicRoomTask.Params(server, publicRoomsParams)) { + this.callback = callback + } .executeBy(taskExecutor) } override fun joinRoom(roomId: String, callback: MatrixCallback): Cancelable { return joinRoomTask - .configureWith(JoinRoomTask.Params(roomId)) - .dispatchTo(callback) + .configureWith(JoinRoomTask.Params(roomId)) { + this.callback = callback + } .executeBy(taskExecutor) } override fun getThirdPartyProtocol(callback: MatrixCallback>): Cancelable { return getThirdPartyProtocolsTask - .toConfigurableTask() - .dispatchTo(callback) + .configureWith { + this.callback = callback + } .executeBy(taskExecutor) } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt index 2dabb64ea3..bd5462b151 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt @@ -46,8 +46,9 @@ internal class DefaultRoomService @Inject constructor(private val monarchy: Mona override fun createRoom(createRoomParams: CreateRoomParams, callback: MatrixCallback): Cancelable { return createRoomTask - .configureWith(createRoomParams) - .dispatchTo(callback) + .configureWith(createRoomParams) { + this.callback = callback + } .executeBy(taskExecutor) } @@ -74,8 +75,9 @@ internal class DefaultRoomService @Inject constructor(private val monarchy: Mona override fun joinRoom(roomId: String, viaServers: List, callback: MatrixCallback): Cancelable { return joinRoomTask - .configureWith(JoinRoomTask.Params(roomId, viaServers)) - .dispatchTo(callback) + .configureWith(JoinRoomTask.Params(roomId, viaServers)) { + this.callback = callback + } .executeBy(taskExecutor) } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/DefaultMembershipService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/DefaultMembershipService.kt index 85cc7dab51..7fca1e4269 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/DefaultMembershipService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/DefaultMembershipService.kt @@ -44,8 +44,10 @@ internal class DefaultMembershipService @Inject constructor(private val roomId: override fun loadRoomMembersIfNeeded(matrixCallback: MatrixCallback): Cancelable { val params = LoadRoomMembersTask.Params(roomId, Membership.LEAVE) - return loadRoomMembersTask.configureWith(params) - .dispatchTo(matrixCallback) + return loadRoomMembersTask + .configureWith(params) { + this.callback = matrixCallback + } .executeBy(taskExecutor) } @@ -77,22 +79,28 @@ internal class DefaultMembershipService @Inject constructor(private val roomId: override fun invite(userId: String, callback: MatrixCallback): Cancelable { val params = InviteTask.Params(roomId, userId) - return inviteTask.configureWith(params) - .dispatchTo(callback) + return inviteTask + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } override fun join(viaServers: List, callback: MatrixCallback): Cancelable { val params = JoinRoomTask.Params(roomId, viaServers) - return joinTask.configureWith(params) - .dispatchTo(callback) + return joinTask + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } override fun leave(callback: MatrixCallback): Cancelable { val params = LeaveRoomTask.Params(roomId) - return leaveRoomTask.configureWith(params) - .dispatchTo(callback) + return leaveRoomTask + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/read/DefaultReadService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/read/DefaultReadService.kt index 7830ce0e66..2e30c12ef6 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/read/DefaultReadService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/read/DefaultReadService.kt @@ -37,17 +37,29 @@ internal class DefaultReadService @Inject constructor(private val roomId: String override fun markAllAsRead(callback: MatrixCallback) { val params = SetReadMarkersTask.Params(roomId, markAllAsRead = true) - setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor) + setReadMarkersTask + .configureWith(params) { + this.callback = callback + } + .executeBy(taskExecutor) } override fun setReadReceipt(eventId: String, callback: MatrixCallback) { val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = null, readReceiptEventId = eventId) - setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor) + setReadMarkersTask + .configureWith(params) { + this.callback = callback + } + .executeBy(taskExecutor) } override fun setReadMarker(fullyReadEventId: String, callback: MatrixCallback) { val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = fullyReadEventId, readReceiptEventId = null) - setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor) + setReadMarkersTask + .configureWith(params) { + this.callback = callback + } + .executeBy(taskExecutor) } @@ -55,13 +67,13 @@ internal class DefaultReadService @Inject constructor(private val roomId: String var isEventRead = false monarchy.doWithRealm { val readReceipt = ReadReceiptEntity.where(it, roomId, credentials.userId).findFirst() - ?: return@doWithRealm + ?: return@doWithRealm val liveChunk = ChunkEntity.findLastLiveChunkFromRoom(it, roomId) - ?: return@doWithRealm + ?: return@doWithRealm val readReceiptIndex = liveChunk.timelineEvents.find(readReceipt.eventId)?.root?.displayIndex - ?: Int.MIN_VALUE + ?: Int.MIN_VALUE val eventToCheckIndex = liveChunk.timelineEvents.find(eventId)?.root?.displayIndex - ?: Int.MAX_VALUE + ?: Int.MAX_VALUE isEventRead = eventToCheckIndex <= readReceiptIndex } return isEventRead diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/relation/DefaultRelationService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/relation/DefaultRelationService.kt index cc6213451a..b5a0e50e12 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/relation/DefaultRelationService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/relation/DefaultRelationService.kt @@ -80,28 +80,30 @@ internal class DefaultRelationService @Inject constructor(private val context: C reaction, myUserId ) - findReactionEventForUndoTask.configureWith(params) - .enableRetry() - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: FindReactionEventForUndoTask.Result) { - if (data.redactEventId == null) { - Timber.w("Cannot find reaction to undo (not yet synced?)") - //TODO? - } - data.redactEventId?.let { toRedact -> + val callback = object : MatrixCallback { + override fun onSuccess(data: FindReactionEventForUndoTask.Result) { + if (data.redactEventId == null) { + Timber.w("Cannot find reaction to undo (not yet synced?)") + //TODO? + } + data.redactEventId?.let { toRedact -> - val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null).also { - saveLocalEcho(it) - } - val redactWork = createRedactEventWork(redactEvent, toRedact, null) - - TimelineSendEventWorkCommon.postWork(context, roomId, redactWork) - - } + val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null).also { + saveLocalEcho(it) } - }) - .executeBy(taskExecutor) + val redactWork = createRedactEventWork(redactEvent, toRedact, null) + TimelineSendEventWorkCommon.postWork(context, roomId, redactWork) + + } + } + } + findReactionEventForUndoTask + .configureWith(params) { + this.retryCount = Int.MAX_VALUE + this.callback = callback + } + .executeBy(taskExecutor) } //TODO duplicate with send service? @@ -167,8 +169,10 @@ internal class DefaultRelationService @Inject constructor(private val context: C override fun fetchEditHistory(eventId: String, callback: MatrixCallback>) { val params = FetchEditHistoryTask.Params(roomId, cryptoService.isRoomEncrypted(roomId), eventId) - fetchEditHistoryTask.configureWith(params) - .dispatchTo(callback) + fetchEditHistoryTask + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/state/DefaultStateService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/state/DefaultStateService.kt index 03beb8e7e2..60999b61d3 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/state/DefaultStateService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/state/DefaultStateService.kt @@ -51,8 +51,10 @@ internal class DefaultStateService @Inject constructor(private val roomId: Strin )) - sendStateTask.configureWith(params) - .dispatchTo(callback) + sendStateTask + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/DefaultTimeline.kt index 70a5b12df2..4259505131 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/DefaultTimeline.kt @@ -29,6 +29,7 @@ import im.vector.matrix.android.internal.database.query.findIncludingEvent import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.database.query.whereInRoom +import im.vector.matrix.android.internal.task.TaskConstraints import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.Debouncer @@ -133,7 +134,7 @@ internal class DefaultTimeline( builtEventsIdMap[eventId]?.let { builtIndex -> //Update the relation of existing event builtEvents[builtIndex]?.let { te -> - builtEvents[builtIndex] = eventEntity.asDomain() + builtEvents[builtIndex] = eventEntity.asDomain() hasChanged = true } } @@ -209,7 +210,7 @@ internal class DefaultTimeline( override fun pendingEventCount(): Int { var count = 0 Realm.getInstance(realmConfiguration).use { - count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0 + count = RoomEntity.where(it, roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0 } return count } @@ -217,7 +218,7 @@ internal class DefaultTimeline( override fun failedToDeliverEventCount(): Int { var count = 0 Realm.getInstance(realmConfiguration).use { - count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.filter { + count = RoomEntity.where(it, roomId).findFirst()?.sendingTimelineEvents?.filter { it.root?.sendState?.hasFailed() ?: false }?.count() ?: 0 } @@ -405,24 +406,27 @@ internal class DefaultTimeline( limit = limit) Timber.v("Should fetch $limit items $direction") - cancelableBag += paginationTask.configureWith(params) - .enableRetry() - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: TokenChunkEventPersistor.Result) { - if (data == TokenChunkEventPersistor.Result.SUCCESS) { - Timber.v("Success fetching $limit items $direction from pagination request") - } else { - // Database won't be updated, so we force pagination request - BACKGROUND_HANDLER.post { - executePaginationTask(direction, limit) + cancelableBag += paginationTask + .configureWith(params) { + this.retryCount = Int.MAX_VALUE + this.constraints = TaskConstraints(connectedToNetwork = true) + this.callback = object : MatrixCallback { + override fun onSuccess(data: TokenChunkEventPersistor.Result) { + if (data == TokenChunkEventPersistor.Result.SUCCESS) { + Timber.v("Success fetching $limit items $direction from pagination request") + } else { + // Database won't be updated, so we force pagination request + BACKGROUND_HANDLER.post { + executePaginationTask(direction, limit) + } } } - } - override fun onFailure(failure: Throwable) { - Timber.v("Failure fetching $limit items $direction from pagination request") + override fun onFailure(failure: Throwable) { + Timber.v("Failure fetching $limit items $direction from pagination request") + } } - }) + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/signout/DefaultSignOutService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/signout/DefaultSignOutService.kt index fff75d141a..2463a5ade5 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/signout/DefaultSignOutService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/signout/DefaultSignOutService.kt @@ -19,7 +19,7 @@ package im.vector.matrix.android.internal.session.signout import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.session.signout.SignOutService import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.task.toConfigurableTask +import im.vector.matrix.android.internal.task.configureWith import javax.inject.Inject internal class DefaultSignOutService @Inject constructor(private val signOutTask: SignOutTask, @@ -27,8 +27,9 @@ internal class DefaultSignOutService @Inject constructor(private val signOutTask override fun signOut(callback: MatrixCallback) { signOutTask - .toConfigurableTask() - .dispatchTo(callback) + .configureWith { + this.callback = callback + } .executeBy(taskExecutor) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt index 1af5688b24..148e25b3a7 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt @@ -104,58 +104,57 @@ open class SyncService : Service() { } else { Timber.v("Execute sync request with timeout 0") val params = SyncTask.Params(TIME_OUT) - cancelableTask = syncTask.configureWith(params) - .callbackOn(TaskThread.SYNC) - .executeOn(TaskThread.SYNC) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: Unit) { - cancelableTask = null - if (!once) { - timer.schedule(object : TimerTask() { - override fun run() { - doSync() - } - }, NEXT_BATCH_DELAY) - } else { - //stop - stopMe() + cancelableTask = syncTask + .configureWith(params) { + callbackThread = TaskThread.SYNC + executionThread = TaskThread.SYNC + callback = object : MatrixCallback { + override fun onSuccess(data: Unit) { + cancelableTask = null + if (!once) { + timer.schedule(object : TimerTask() { + override fun run() { + doSync() + } + }, NEXT_BATCH_DELAY) + } else { + //stop + stopMe() + } + } + + override fun onFailure(failure: Throwable) { + Timber.e(failure) + cancelableTask = null + if (failure is Failure.NetworkConnection + && failure.cause is SocketTimeoutException) { + // Timeout are not critical + timer.schedule(object : TimerTask() { + override fun run() { + doSync() + } + }, 5_000L) + } + + if (failure !is Failure.NetworkConnection + || failure.cause is JsonEncodingException) { + // Wait 10s before retrying + timer.schedule(object : TimerTask() { + override fun run() { + doSync() + } + }, 5_000L) + } + + if (failure is Failure.ServerError + && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { + // No token or invalid token, stop the thread + stopSelf() + } } } - - override fun onFailure(failure: Throwable) { - Timber.e(failure) - cancelableTask = null - if (failure is Failure.NetworkConnection - && failure.cause is SocketTimeoutException) { - // Timeout are not critical - timer.schedule(object : TimerTask() { - override fun run() { - doSync() - } - }, 5_000L) - } - - if (failure !is Failure.NetworkConnection - || failure.cause is JsonEncodingException) { - // Wait 10s before retrying - timer.schedule(object : TimerTask() { - override fun run() { - doSync() - } - }, 5_000L) - } - - if (failure is Failure.ServerError - && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { - // No token or invalid token, stop the thread - stopSelf() - } - - } - - }) + } .executeBy(taskExecutor) - } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index 6e609bd8b5..0b9365dc06 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -102,40 +102,42 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, Timber.v("[$this] Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT") val latch = CountDownLatch(1) val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT) - cancelableTask = syncTask.configureWith(params) - .callbackOn(TaskThread.SYNC) - .executeOn(TaskThread.SYNC) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: Unit) { - latch.countDown() + + cancelableTask = syncTask.configureWith(params) { + this.callbackThread = TaskThread.SYNC + this.executionThread = TaskThread.SYNC + this.callback = object : MatrixCallback { + + override fun onSuccess(data: Unit) { + latch.countDown() + } + + override fun onFailure(failure: Throwable) { + if (failure is Failure.NetworkConnection + && failure.cause is SocketTimeoutException) { + // Timeout are not critical + Timber.v("Timeout") + } else { + Timber.e(failure) } - override fun onFailure(failure: Throwable) { - if (failure is Failure.NetworkConnection - && failure.cause is SocketTimeoutException) { - // Timeout are not critical - Timber.v("Timeout") - } else { - Timber.e(failure) - } - - if (failure !is Failure.NetworkConnection - || failure.cause is JsonEncodingException) { - // Wait 10s before retrying - sleep(RETRY_WAIT_TIME_MS) - } - - if (failure is Failure.ServerError - && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { - // No token or invalid token, stop the thread - updateStateTo(SyncState.KILLING) - } - - latch.countDown() + if (failure !is Failure.NetworkConnection + || failure.cause is JsonEncodingException) { + // Wait 10s before retrying + sleep(RETRY_WAIT_TIME_MS) } - }) + if (failure is Failure.ServerError + && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { + // No token or invalid token, stop the thread + updateStateTo(SyncState.KILLING) + } + latch.countDown() + } + } + } .executeBy(taskExecutor) + latch.await() if (state is SyncState.RUNNING) { updateStateTo(SyncState.RUNNING(afterPause = false)) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/user/DefaultUserService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/user/DefaultUserService.kt index 8d47d401a7..2925997347 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/user/DefaultUserService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/user/DefaultUserService.kt @@ -61,7 +61,7 @@ internal class DefaultUserService @Inject constructor(private val monarchy: Mona override fun getUser(userId: String): User? { val userEntity = monarchy.fetchCopied { UserEntity.where(it, userId).findFirst() } - ?: return null + ?: return null return userEntity.asDomain() } @@ -113,8 +113,9 @@ internal class DefaultUserService @Inject constructor(private val monarchy: Mona callback: MatrixCallback>): Cancelable { val params = SearchUserTask.Params(limit, search, excludedUserIds) return searchUserTask - .configureWith(params) - .dispatchTo(callback) + .configureWith(params) { + this.callback = callback + } .executeBy(taskExecutor) } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt index ec1313932a..3b760fb02a 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt @@ -18,54 +18,63 @@ package im.vector.matrix.android.internal.task import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.util.Cancelable +import java.util.* -internal fun Task.configureWith(params: PARAMS): ConfigurableTask { - return ConfigurableTask(this, params) +internal fun Task.configureWith(params: PARAMS, init: (ConfigurableTask.Builder.() -> Unit) = {}): ConfigurableTask { + return ConfigurableTask.Builder(this, params).apply(init).build() } -/** - * Convert a Task to a ConfigurableTask without parameter - */ -internal fun Task.toConfigurableTask(): ConfigurableTask { - return ConfigurableTask(this, Unit) +internal fun Task.configureWith(init: (ConfigurableTask.Builder.() -> Unit) = {}): ConfigurableTask { + return configureWith(Unit, init) } internal data class ConfigurableTask( val task: Task, val params: PARAMS, - val callbackThread: TaskThread = TaskThread.MAIN, - val executionThread: TaskThread = TaskThread.IO, - val retryCount: Int = 0, - val callback: MatrixCallback = object : MatrixCallback {} + val id: UUID, + val callbackThread: TaskThread, + val executionThread: TaskThread, + val constraints: TaskConstraints, + val retryCount: Int, + val callback: MatrixCallback + ) : Task { + class Builder( + private val task: Task, + private val params: PARAMS, + var id: UUID = UUID.randomUUID(), + var callbackThread: TaskThread = TaskThread.MAIN, + var executionThread: TaskThread = TaskThread.IO, + var constraints: TaskConstraints = TaskConstraints(), + var retryCount: Int = 0, + var callback: MatrixCallback = object : MatrixCallback {} + ) { + + fun build() = ConfigurableTask( + task = task, + params = params, + id = id, + callbackThread = callbackThread, + executionThread = executionThread, + constraints = constraints, + retryCount = retryCount, + callback = callback + ) + } + + override suspend fun execute(params: PARAMS): RESULT { return task.execute(params) } - fun callbackOn(thread: TaskThread): ConfigurableTask { - return copy(callbackThread = thread) - } - - fun executeOn(thread: TaskThread): ConfigurableTask { - return copy(executionThread = thread) - } - - fun dispatchTo(matrixCallback: MatrixCallback): ConfigurableTask { - return copy(callback = matrixCallback) - } - - fun enableRetry(retryCount: Int = Int.MAX_VALUE): ConfigurableTask { - return copy(retryCount = retryCount) - } - fun executeBy(taskExecutor: TaskExecutor): Cancelable { return taskExecutor.execute(this) } override fun toString(): String { - return task.javaClass.name + return "${task.javaClass.name} with ID: $id" } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskConstraints.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskConstraints.kt new file mode 100644 index 0000000000..18733d6ebf --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskConstraints.kt @@ -0,0 +1,22 @@ +/* + + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + + */ +package im.vector.matrix.android.internal.task + +data class TaskConstraints( + val connectedToNetwork: Boolean = false +) \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt index 6109798796..41905b5bcc 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt @@ -21,6 +21,7 @@ import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.CancelableBag import im.vector.matrix.android.internal.di.MatrixScope import im.vector.matrix.android.internal.extensions.foldToCallback +import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.util.CancelableCoroutine import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import kotlinx.coroutines.GlobalScope @@ -32,7 +33,8 @@ import javax.inject.Inject import kotlin.coroutines.EmptyCoroutineContext @MatrixScope -internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers) { +internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers, + private val networkConnectivityChecker: NetworkConnectivityChecker) { private val cancelableBag = CancelableBag() @@ -41,8 +43,13 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers val job = GlobalScope.launch(task.callbackThread.toDispatcher()) { val resultOrFailure = runCatching { withContext(task.executionThread.toDispatcher()) { - Timber.v("Executing $task on ${Thread.currentThread().name}") + Timber.v("Enqueue task $task") retry(task.retryCount) { + if (task.constraints.connectedToNetwork) { + Timber.v("Waiting network for $task") + networkConnectivityChecker.waitUntilConnected() + } + Timber.v("Execute task $task on ${Thread.currentThread().name}") task.execute(task.params) } } @@ -74,6 +81,7 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers try { return block() } catch (e: Exception) { + Timber.v("Retry task after $currentDelay ms") delay(currentDelay) currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay) }