diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt index 6b2af8fcfd..d0c6118285 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt @@ -18,18 +18,18 @@ package im.vector.matrix.android.internal.session.sync.job import android.content.Context import androidx.work.* import com.squareup.moshi.JsonClass -import im.vector.matrix.android.api.failure.Failure -import im.vector.matrix.android.api.failure.MatrixError -import im.vector.matrix.android.internal.auth.SessionParamsStore -import im.vector.matrix.android.internal.network.executeRequest -import im.vector.matrix.android.internal.session.filter.FilterRepository -import im.vector.matrix.android.internal.session.sync.SyncAPI -import im.vector.matrix.android.internal.session.sync.SyncResponseHandler +import im.vector.matrix.android.api.MatrixCallback +import im.vector.matrix.android.api.util.Cancelable +import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTokenStore import im.vector.matrix.android.internal.session.sync.model.SyncResponse +import im.vector.matrix.android.internal.task.TaskExecutor +import im.vector.matrix.android.internal.task.TaskThread +import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.getSessionComponent import timber.log.Timber +import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -47,12 +47,12 @@ internal class SyncWorker(context: Context, val automaticallyRetry: Boolean = false ) - @Inject lateinit var syncAPI: SyncAPI - @Inject lateinit var filterRepository: FilterRepository - @Inject lateinit var syncResponseHandler: SyncResponseHandler - @Inject lateinit var sessionParamsStore: SessionParamsStore - @Inject lateinit var syncTokenStore: SyncTokenStore - + @Inject + lateinit var syncTokenStore: SyncTokenStore + @Inject + lateinit var syncTask: SyncTask + @Inject + lateinit var taskExecutor: TaskExecutor override suspend fun doWork(): Result { Timber.i("Sync work starting") @@ -60,37 +60,37 @@ internal class SyncWorker(context: Context, val sessionComponent = getSessionComponent(params.userId) ?: return Result.success() sessionComponent.inject(this) - val requestParams = HashMap() - requestParams["timeout"] = params.timeout.toString() - requestParams["filter"] = filterRepository.getFilter() - val token = syncTokenStore.getLastToken()?.also { requestParams["since"] = it } - Timber.i("Sync work last token $token") - return executeRequest { - apiCall = syncAPI.sync(requestParams) - }.fold( - { - if (it is Failure.ServerError - && it.error.code == MatrixError.UNKNOWN_TOKEN) { - sessionParamsStore.delete(params.userId) - Result.failure() - } else { - Timber.i("Sync work failed $it") - Result.retry() + val latch = CountDownLatch(1) + val nextBatch = syncTokenStore.getLastToken() + Timber.i("Sync work last token $nextBatch") + val taskParams = SyncTask.Params(nextBatch, 0) + cancelableTask = syncTask.configureWith(taskParams) + .callbackOn(TaskThread.CALLER) + .executeOn(TaskThread.CALLER) + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: SyncResponse) { + syncTokenStore.saveToken(nextBatch) + latch.countDown() } - }, - { - Timber.i("Sync work success next batch ${it.nextBatch}") - if (!isStopped) { - syncResponseHandler.handleResponse(it, token, false) - syncTokenStore.saveToken(it.nextBatch) + + override fun onFailure(failure: Throwable) { + Timber.e(failure) + latch.countDown() } - if (params.automaticallyRetry) Result.retry() else Result.success() - } - ) + + }) + .executeBy(taskExecutor) + + latch.await() + return Result.success() } companion object { + + + private var cancelableTask: Cancelable? = null + fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) { val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false)) val workRequest = OneTimeWorkRequestBuilder() @@ -116,6 +116,7 @@ internal class SyncWorker(context: Context, } fun stopAnyBackgroundSync(context: Context) { + cancelableTask?.cancel() WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP") } }