diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index 67169cdc3c..01923b103f 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -30,6 +30,8 @@ import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.BackgroundDetectionObserver +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import timber.log.Timber import java.net.SocketTimeoutException import java.util.concurrent.CountDownLatch @@ -99,9 +101,9 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, isStarted = true networkConnectivityChecker.register(this) backgroundDetectionObserver.register(this) + while (state != SyncState.KILLING) { Timber.v("Entering loop, state: $state") - if (!networkConnectivityChecker.hasInternetAccess()) { Timber.v("No network. Waiting...") updateStateTo(SyncState.NO_NETWORK) @@ -116,57 +118,13 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, if (state !is SyncState.RUNNING) { updateStateTo(SyncState.RUNNING(afterPause = true)) } - // No timeout after a pause val timeout = state.let { if (it is SyncState.RUNNING && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT } - Timber.v("Execute sync request with timeout $timeout") - val latch = CountDownLatch(1) val params = SyncTask.Params(timeout) - - cancelableTask = syncTask.configureWith(params) { - this.callbackThread = TaskThread.SYNC - this.executionThread = TaskThread.SYNC - this.callback = object : MatrixCallback { - override fun onSuccess(data: Unit) { - Timber.v("onSuccess") - latch.countDown() - } - - override fun onFailure(failure: Throwable) { - if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) { - // Timeout are not critical - Timber.v("Timeout") - } else if (failure is Failure.Cancelled) { - Timber.v("Cancelled") - } else if (failure is Failure.ServerError - && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { - // No token or invalid token, stop the thread - Timber.w(failure) - updateStateTo(SyncState.KILLING) - } else { - Timber.e(failure) - - if (failure !is Failure.NetworkConnection || failure.cause is JsonEncodingException) { - // Wait 10s before retrying - Timber.v("Wait 10s") - sleep(RETRY_WAIT_TIME_MS) - } - } - - latch.countDown() - } - } + runBlocking { + doSync(params) } - .executeBy(taskExecutor) - - latch.await() - state.let { - if (it is SyncState.RUNNING && it.afterPause) { - updateStateTo(SyncState.RUNNING(afterPause = false)) - } - } - Timber.v("...Continue") } } @@ -176,6 +134,37 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, networkConnectivityChecker.unregister(this) } + private suspend fun doSync(params: SyncTask.Params) { + try { + syncTask.execute(params) + } catch (failure: Throwable) { + if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) { + // Timeout are not critical + Timber.v("Timeout") + } else if (failure is Failure.Cancelled) { + Timber.v("Cancelled") + } else if (failure is Failure.ServerError + && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { + // No token or invalid token, stop the thread + Timber.w(failure) + updateStateTo(SyncState.KILLING) + } else { + Timber.e(failure) + if (failure !is Failure.NetworkConnection || failure.cause is JsonEncodingException) { + // Wait 10s before retrying + Timber.v("Wait 10s") + delay(RETRY_WAIT_TIME_MS) + } + } + } finally { + state.let { + if (it is SyncState.RUNNING && it.afterPause) { + updateStateTo(SyncState.RUNNING(afterPause = false)) + } + } + } + } + private fun updateStateTo(newState: SyncState) { Timber.v("Update state from $state to $newState") state = newState diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt new file mode 100644 index 0000000000..c0802f9fae --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt @@ -0,0 +1,93 @@ +/* + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.matrix.android.internal.task + +import im.vector.matrix.android.internal.di.MatrixScope +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import java.util.concurrent.Executors +import javax.inject.Inject + + +@MatrixScope +internal class MatrixCoroutineSequencers @Inject constructor() { + + private val sequencers = HashMap() + + suspend fun post(name: String, block: suspend CoroutineScope.() -> Any): Any { + val sequencer = sequencers.getOrPut(name) { + ChannelCoroutineSequencer() + } + return sequencer.post(block) + } + + fun cancel(name: String) { + sequencers.remove(name)?.cancel() + } + + fun cancelAll() { + sequencers.values.forEach { + it.cancel() + } + sequencers.clear() + } + +} + +internal interface CoroutineSequencer { + suspend fun post(block: suspend CoroutineScope.() -> Any): Any + fun cancel() +} + +internal class ChannelCoroutineSequencer : CoroutineSequencer { + + private data class Message( + val block: suspend CoroutineScope.() -> Any, + val deferred: CompletableDeferred + ) + + private val messageChannel: Channel = Channel() + private val coroutineScope = CoroutineScope(SupervisorJob()) + private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + init { + coroutineScope.launch(singleDispatcher) { + for (message in messageChannel) { + try { + val result = message.block(this) + message.deferred.complete(result) + } catch (exception: Throwable) { + message.deferred.completeExceptionally(exception) + } + } + } + } + + override fun cancel() { + messageChannel.cancel() + coroutineScope.coroutineContext.cancelChildren() + } + + override suspend fun post(block: suspend CoroutineScope.() -> Any): Any { + val deferred = CompletableDeferred() + val message = Message(block, deferred) + messageChannel.send(message) + return deferred.await() + } + +} + diff --git a/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt new file mode 100644 index 0000000000..10cfd92fc5 --- /dev/null +++ b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.matrix.android.internal.task + +import kotlinx.coroutines.* +import org.junit.Test +import java.util.concurrent.Executors + +class MatrixCoroutineSequencersTest { + + @Test + fun sequencer_should_run_sequential() { + val sequencer = MatrixCoroutineSequencers() + val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + val jobs = listOf( + GlobalScope.launch(dispatcher) { + sequencer.post("Sequencer1") { suspendingMethod("#3") } + }, + GlobalScope.launch(dispatcher) { + sequencer.post("Sequencer1") { suspendingMethod("#4") } + }, + GlobalScope.launch(dispatcher) { + sequencer.post("Sequencer2") { suspendingMethod("#5") } + }, + GlobalScope.launch(dispatcher) { + sequencer.post("Sequencer2") { suspendingMethod("#6") } + }, + GlobalScope.launch(dispatcher) { + sequencer.post("Sequencer2") { suspendingMethod("#7") } + } + ) + Thread.sleep(5500) + sequencer.cancelAll() + runBlocking { + jobs.joinAll() + } + } + + private suspend fun suspendingMethod(name: String): String = withContext(Dispatchers.Default) { + println("BLOCKING METHOD $name STARTS") + delay(3000) + println("BLOCKING METHOD $name ENDS") + name + } + +}