Sync: use the CoroutineSequencer but need more tests

This commit is contained in:
ganfra 2019-12-13 15:30:57 +01:00
parent eab94b4f03
commit fe2be90002
12 changed files with 68 additions and 45 deletions

View File

@ -38,8 +38,7 @@ internal object MatrixModule {
return MatrixCoroutineDispatchers(io = Dispatchers.IO, return MatrixCoroutineDispatchers(io = Dispatchers.IO,
computation = Dispatchers.Default, computation = Dispatchers.Default,
main = Dispatchers.Main, main = Dispatchers.Main,
crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher(), crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher()
sync = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
) )
} }

View File

@ -44,6 +44,7 @@ import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.api.session.user.UserService import im.vector.matrix.android.api.session.user.UserService
import im.vector.matrix.android.internal.crypto.DefaultCryptoService import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.database.LiveEntityObserver import im.vector.matrix.android.internal.database.LiveEntityObserver
import im.vector.matrix.android.internal.session.sync.SyncTaskSequencer
import im.vector.matrix.android.internal.session.sync.SyncTokenStore import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.job.SyncThread import im.vector.matrix.android.internal.session.sync.job.SyncThread
import im.vector.matrix.android.internal.session.sync.job.SyncWorker import im.vector.matrix.android.internal.session.sync.job.SyncWorker
@ -74,6 +75,7 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
private val syncThreadProvider: Provider<SyncThread>, private val syncThreadProvider: Provider<SyncThread>,
private val contentUrlResolver: ContentUrlResolver, private val contentUrlResolver: ContentUrlResolver,
private val syncTokenStore: SyncTokenStore, private val syncTokenStore: SyncTokenStore,
private val syncTaskSequencer: SyncTaskSequencer,
private val contentUploadProgressTracker: ContentUploadStateTracker, private val contentUploadProgressTracker: ContentUploadStateTracker,
private val initialSyncProgressService: Lazy<InitialSyncProgressService>, private val initialSyncProgressService: Lazy<InitialSyncProgressService>,
private val homeServerCapabilitiesService: Lazy<HomeServerCapabilitiesService>) private val homeServerCapabilitiesService: Lazy<HomeServerCapabilitiesService>)
@ -143,6 +145,7 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
cryptoService.get().close() cryptoService.get().close()
isOpen = false isOpen = false
EventBus.getDefault().unregister(this) EventBus.getDefault().unregister(this)
syncTaskSequencer.close()
} }
override fun syncState(): LiveData<SyncState> { override fun syncState(): LiveData<SyncState> {

View File

@ -45,10 +45,15 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
private val initialSyncProgressService: DefaultInitialSyncProgressService, private val initialSyncProgressService: DefaultInitialSyncProgressService,
private val syncTokenStore: SyncTokenStore, private val syncTokenStore: SyncTokenStore,
private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask, private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask,
private val userStore: UserStore private val userStore: UserStore,
private val syncTaskSequencer: SyncTaskSequencer
) : SyncTask { ) : SyncTask {
override suspend fun execute(params: SyncTask.Params) { override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post {
doSync(params)
}
private suspend fun doSync(params: SyncTask.Params) {
Timber.v("Sync task started on Thread: ${Thread.currentThread().name}") Timber.v("Sync task started on Thread: ${Thread.currentThread().name}")
// Maybe refresh the home server capabilities data we know // Maybe refresh the home server capabilities data we know
getHomeServerCapabilitiesTask.execute(Unit) getHomeServerCapabilitiesTask.execute(Unit)

View File

@ -0,0 +1,24 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.task.ChannelCoroutineSequencer
import javax.inject.Inject
@SessionScope
internal class SyncTaskSequencer @Inject constructor() : ChannelCoroutineSequencer<Unit>()

View File

@ -63,7 +63,7 @@ abstract class SyncService : Service() {
Timber.i("Received a start while was already syncing... ignore") Timber.i("Received a start while was already syncing... ignore")
} else { } else {
isRunning.set(true) isRunning.set(true)
serviceScope.launch(coroutineDispatchers.sync) { serviceScope.launch(coroutineDispatchers.io) {
doSync() doSync()
} }
} }

View File

@ -30,8 +30,7 @@ import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import kotlinx.coroutines.delay import kotlinx.coroutines.*
import kotlinx.coroutines.runBlocking
import timber.log.Timber import timber.log.Timber
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
@ -42,14 +41,13 @@ private const val DEFAULT_LONG_POOL_TIMEOUT = 30_000L
internal class SyncThread @Inject constructor(private val syncTask: SyncTask, internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private val networkConnectivityChecker: NetworkConnectivityChecker, private val networkConnectivityChecker: NetworkConnectivityChecker,
private val backgroundDetectionObserver: BackgroundDetectionObserver, private val backgroundDetectionObserver: BackgroundDetectionObserver)
private val taskExecutor: TaskExecutor : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
private var state: SyncState = SyncState.IDLE private var state: SyncState = SyncState.IDLE
private var liveState = MutableLiveData<SyncState>() private var liveState = MutableLiveData<SyncState>()
private val lock = Object() private val lock = Object()
private var cancelableTask: Cancelable? = null private val syncScope = CoroutineScope(SupervisorJob())
private var isStarted = false private var isStarted = false
@ -74,14 +72,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (isStarted) { if (isStarted) {
Timber.v("Pause sync...") Timber.v("Pause sync...")
isStarted = false isStarted = false
cancelableTask?.cancel() syncScope.coroutineContext.cancelChildren()
} }
} }
fun kill() = synchronized(lock) { fun kill() = synchronized(lock) {
Timber.v("Kill sync...") Timber.v("Kill sync...")
updateStateTo(SyncState.KILLING) updateStateTo(SyncState.KILLING)
cancelableTask?.cancel() syncScope.coroutineContext.cancelChildren()
lock.notify() lock.notify()
} }
@ -101,7 +99,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
isStarted = true isStarted = true
networkConnectivityChecker.register(this) networkConnectivityChecker.register(this)
backgroundDetectionObserver.register(this) backgroundDetectionObserver.register(this)
while (state != SyncState.KILLING) { while (state != SyncState.KILLING) {
Timber.v("Entering loop, state: $state") Timber.v("Entering loop, state: $state")
if (!networkConnectivityChecker.hasInternetAccess()) { if (!networkConnectivityChecker.hasInternetAccess()) {
@ -122,9 +119,12 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
val timeout = state.let { if (it is SyncState.RUNNING && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT } val timeout = state.let { if (it is SyncState.RUNNING && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
Timber.v("Execute sync request with timeout $timeout") Timber.v("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout) val params = SyncTask.Params(timeout)
runBlocking { val sync = syncScope.launch {
doSync(params) doSync(params)
} }
runBlocking {
sync.join()
}
Timber.v("...Continue") Timber.v("...Continue")
} }
} }

View File

@ -18,19 +18,14 @@ package im.vector.matrix.android.internal.session.sync.job
import android.content.Context import android.content.Context
import androidx.work.* import androidx.work.*
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.failure.isTokenError import im.vector.matrix.android.api.failure.isTokenError
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.worker.WorkManagerUtil import im.vector.matrix.android.internal.worker.WorkManagerUtil
import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import timber.log.Timber import timber.log.Timber
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import javax.inject.Inject import javax.inject.Inject
@ -50,7 +45,6 @@ internal class SyncWorker(context: Context,
@Inject lateinit var syncTask: SyncTask @Inject lateinit var syncTask: SyncTask
@Inject lateinit var taskExecutor: TaskExecutor @Inject lateinit var taskExecutor: TaskExecutor
@Inject lateinit var coroutineDispatchers: MatrixCoroutineDispatchers
@Inject lateinit var networkConnectivityChecker: NetworkConnectivityChecker @Inject lateinit var networkConnectivityChecker: NetworkConnectivityChecker
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
@ -72,7 +66,7 @@ internal class SyncWorker(context: Context,
) )
} }
private suspend fun doSync(timeout: Long) = withContext(coroutineDispatchers.sync) { private suspend fun doSync(timeout: Long) {
val taskParams = SyncTask.Params(timeout) val taskParams = SyncTask.Params(timeout)
syncTask.execute(taskParams) syncTask.execute(taskParams)
} }

View File

@ -16,32 +16,36 @@
package im.vector.matrix.android.internal.task package im.vector.matrix.android.internal.task
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import java.util.concurrent.Executors import java.util.concurrent.Executors
/**
* This class intends to be used for ensure suspendable methods are played sequentially all the way long.
*/
internal interface CoroutineSequencer<T> { internal interface CoroutineSequencer<T> {
/**
* @param block the suspendable block to execute
* @return the result of the block
*/
suspend fun post(block: suspend () -> T): T suspend fun post(block: suspend () -> T): T
fun cancel()
/**
* Cancel all and close, so you won't be able to post anything else after
*/
fun close() fun close()
} }
internal class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> { internal open class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
private data class Message<T>( private data class Message<T>(
val block: suspend () -> T, val block: suspend () -> T,
val deferred: CompletableDeferred<T> val deferred: CompletableDeferred<T>
) )
private val messageChannel: Channel<Message<T>> = Channel() private var messageChannel: Channel<Message<T>> = Channel()
private val coroutineScope = CoroutineScope(SupervisorJob()) private val coroutineScope = CoroutineScope(SupervisorJob())
// This will ensure
private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
init { init {
@ -62,13 +66,8 @@ internal class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
} }
override fun close() { override fun close() {
messageChannel.cancel()
coroutineScope.coroutineContext.cancelChildren() coroutineScope.coroutineContext.cancelChildren()
} messageChannel.close()
override fun cancel() {
close()
launchCoroutine()
} }
override suspend fun post(block: suspend () -> T): T { override suspend fun post(block: suspend () -> T): T {
@ -78,6 +77,8 @@ internal class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
return try { return try {
deferred.await() deferred.await()
} catch (cancellation: CancellationException) { } catch (cancellation: CancellationException) {
// In case of cancellation, we stop the current coroutine context
// and relaunch one to consume next messages
coroutineScope.coroutineContext.cancelChildren() coroutineScope.coroutineContext.cancelChildren()
launchCoroutine() launchCoroutine()
throw cancellation throw cancellation

View File

@ -85,6 +85,5 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
TaskThread.IO -> coroutineDispatchers.io TaskThread.IO -> coroutineDispatchers.io
TaskThread.CALLER -> EmptyCoroutineContext TaskThread.CALLER -> EmptyCoroutineContext
TaskThread.CRYPTO -> coroutineDispatchers.crypto TaskThread.CRYPTO -> coroutineDispatchers.crypto
TaskThread.SYNC -> coroutineDispatchers.sync
} }
} }

View File

@ -21,6 +21,5 @@ internal enum class TaskThread {
COMPUTATION, COMPUTATION,
IO, IO,
CALLER, CALLER,
CRYPTO, CRYPTO
SYNC
} }

View File

@ -22,6 +22,5 @@ internal data class MatrixCoroutineDispatchers(
val io: CoroutineDispatcher, val io: CoroutineDispatcher,
val computation: CoroutineDispatcher, val computation: CoroutineDispatcher,
val main: CoroutineDispatcher, val main: CoroutineDispatcher,
val crypto: CoroutineDispatcher, val crypto: CoroutineDispatcher
val sync: CoroutineDispatcher
) )

View File

@ -26,7 +26,7 @@ import org.junit.Assert.assertEquals
import org.junit.Test import org.junit.Test
import java.util.concurrent.Executors import java.util.concurrent.Executors
class MatrixCoroutineSequencersTest { class CoroutineSequencersTest {
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()