From c0f085cdf8c950e69b34655addd4f5dfa3efe0b0 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Thu, 4 Jul 2019 14:46:59 +0200 Subject: [PATCH] SyncTask now handles by itself the sync token --- .../session/sync/SyncResponseHandler.kt | 1 - .../android/internal/session/sync/SyncTask.kt | 20 ++++++------ .../internal/session/sync/job/SyncService.kt | 31 ++++++------------- .../internal/session/sync/job/SyncThread.kt | 25 +++++---------- .../internal/session/sync/job/SyncWorker.kt | 13 ++------ 5 files changed, 32 insertions(+), 58 deletions(-) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt index eab7b91e4d..d21b1de3b0 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt @@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync import arrow.core.Try import im.vector.matrix.android.internal.crypto.CryptoManager -import im.vector.matrix.android.internal.session.SessionScope import im.vector.matrix.android.internal.session.sync.model.SyncResponse import timber.log.Timber import javax.inject.Inject diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt index 78f949de33..e29f2eeac5 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt @@ -29,9 +29,9 @@ import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.Task import javax.inject.Inject -internal interface SyncTask : Task { +internal interface SyncTask : Task { - data class Params(val token: String?, var timeout: Long = 30_000L) + data class Params(var timeout: Long = 30_000L) } @@ -39,15 +39,17 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, private val credentials: Credentials, private val filterRepository: FilterRepository, private val syncResponseHandler: SyncResponseHandler, - private val sessionParamsStore: SessionParamsStore + private val sessionParamsStore: SessionParamsStore, + private val syncTokenStore: SyncTokenStore ) : SyncTask { - override suspend fun execute(params: SyncTask.Params): Try { + override suspend fun execute(params: SyncTask.Params): Try { val requestParams = HashMap() var timeout = 0L - if (params.token != null) { - requestParams["since"] = params.token + val token = syncTokenStore.getLastToken() + if (token != null) { + requestParams["since"] = token timeout = params.timeout } requestParams["timeout"] = timeout.toString() @@ -65,9 +67,9 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, // Transmit the throwable throwable.failure() }.flatMap { syncResponse -> - syncResponseHandler.handleResponse(syncResponse, params.token, false) + syncResponseHandler.handleResponse(syncResponse, token, false) + }.map { + syncTokenStore.saveToken(it.nextBatch) } } - - } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt index 7aa38b472d..3fe850d675 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt @@ -26,8 +26,6 @@ import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -35,9 +33,6 @@ import timber.log.Timber import java.net.SocketTimeoutException import java.util.* -private const val DEFAULT_LONG_POOL_TIMEOUT = 10_000L -private const val BACKGROUND_LONG_POOL_TIMEOUT = 0L - /** * Can execute periodic sync task. * An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver @@ -49,7 +44,6 @@ open class SyncService : Service() { private var mIsSelfDestroyed: Boolean = false private var cancelableTask: Cancelable? = null - private lateinit var syncTokenStore: SyncTokenStore private lateinit var syncTask: SyncTask private lateinit var networkConnectivityChecker: NetworkConnectivityChecker private lateinit var taskExecutor: TaskExecutor @@ -57,18 +51,12 @@ open class SyncService : Service() { var timer = Timer() - var nextBatchDelay = 0L - var timeout = 10_000L - override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Timber.i("onStartCommand ${intent}") - nextBatchDelay = 60_000L - timeout = 0 intent?.let { val userId = it.getStringExtra(EXTRA_USER_ID) val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId) ?: return@let - syncTokenStore = sessionComponent.syncTokenStore() syncTask = sessionComponent.syncTask() networkConnectivityChecker = sessionComponent.networkConnectivityChecker() taskExecutor = sessionComponent.taskExecutor() @@ -105,7 +93,6 @@ open class SyncService : Service() { } fun doSync(once: Boolean = false) { - var nextBatch = syncTokenStore.getLastToken() if (!networkConnectivityChecker.isConnected()) { Timber.v("Sync is Paused. Waiting...") //TODO Retry in ? @@ -113,24 +100,22 @@ open class SyncService : Service() { override fun run() { doSync() } - }, 5_000L) + }, NO_NETWORK_DELAY) } else { - Timber.v("Execute sync request with token $nextBatch and timeout $timeout") - val params = SyncTask.Params(nextBatch, timeout) + Timber.v("Execute sync request with timeout 0") + val params = SyncTask.Params(TIME_OUT) cancelableTask = syncTask.configureWith(params) .callbackOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: SyncResponse) { + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { cancelableTask = null - nextBatch = data.nextBatch - syncTokenStore.saveToken(nextBatch) if (!once) { timer.schedule(object : TimerTask() { override fun run() { doSync() } - }, nextBatchDelay) + }, NEXT_BATCH_DELAY) } else { //stop stopMe() @@ -180,6 +165,10 @@ open class SyncService : Service() { companion object { const val EXTRA_USER_ID = "EXTRA_USER_ID" + + const val TIME_OUT = 0L + const val NEXT_BATCH_DELAY = 60_000L + const val NO_NETWORK_DELAY = 5_000L } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index ba4c009aa9..56d590d7ba 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -25,10 +25,7 @@ import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.session.sync.SyncState import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker -import im.vector.matrix.android.internal.session.SessionScope import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -44,7 +41,6 @@ private const val DEFAULT_LONG_POOL_DELAY = 0L internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private val networkConnectivityChecker: NetworkConnectivityChecker, - private val syncTokenStore: SyncTokenStore, private val backgroundDetectionObserver: BackgroundDetectionObserver, private val taskExecutor: TaskExecutor ) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener { @@ -52,7 +48,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private var state: SyncState = SyncState.IDLE private var liveState = MutableLiveData() private val lock = Object() - private var nextBatch = syncTokenStore.getLastToken() private var cancelableTask: Cancelable? = null init { @@ -62,8 +57,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, fun restart() = synchronized(lock) { if (state is SyncState.PAUSED) { Timber.v("Resume sync...") - // Retrieve the last token, it may have been deleted in case of a clear cache - nextBatch = syncTokenStore.getLastToken() updateStateTo(SyncState.RUNNING(catchingUp = true)) lock.notify() } @@ -100,22 +93,20 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, lock.wait() } } else { - Timber.v("Execute sync request with token $nextBatch and timeout $DEFAULT_LONG_POOL_TIMEOUT") + Timber.v("Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT") val latch = CountDownLatch(1) - val params = SyncTask.Params(nextBatch, DEFAULT_LONG_POOL_TIMEOUT) + val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT) cancelableTask = syncTask.configureWith(params) .callbackOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: SyncResponse) { - nextBatch = data.nextBatch - syncTokenStore.saveToken(nextBatch) + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { latch.countDown() } override fun onFailure(failure: Throwable) { if (failure is Failure.NetworkConnection - && failure.cause is SocketTimeoutException) { + && failure.cause is SocketTimeoutException) { // Timeout are not critical Timber.v("Timeout") } else { @@ -123,13 +114,13 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, } if (failure !is Failure.NetworkConnection - || failure.cause is JsonEncodingException) { + || failure.cause is JsonEncodingException) { // Wait 10s before retrying sleep(RETRY_WAIT_TIME_MS) } if (failure is Failure.ServerError - && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { + && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { // No token or invalid token, stop the thread updateStateTo(SyncState.KILLING) } @@ -140,7 +131,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, }) .executeBy(taskExecutor) - latch.await() + latch.await() if (state is SyncState.RUNNING) { updateStateTo(SyncState.RUNNING(catchingUp = false)) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt index d0c6118285..8a741eb04f 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt @@ -21,8 +21,6 @@ import com.squareup.moshi.JsonClass import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -47,8 +45,6 @@ internal class SyncWorker(context: Context, val automaticallyRetry: Boolean = false ) - @Inject - lateinit var syncTokenStore: SyncTokenStore @Inject lateinit var syncTask: SyncTask @Inject @@ -62,15 +58,12 @@ internal class SyncWorker(context: Context, val latch = CountDownLatch(1) - val nextBatch = syncTokenStore.getLastToken() - Timber.i("Sync work last token $nextBatch") - val taskParams = SyncTask.Params(nextBatch, 0) + val taskParams = SyncTask.Params(0) cancelableTask = syncTask.configureWith(taskParams) .callbackOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: SyncResponse) { - syncTokenStore.saveToken(nextBatch) + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { latch.countDown() }