Fix / Push worker could launch concurrent syncs

This commit is contained in:
Valere 2019-07-04 14:04:36 +02:00
parent a79a6443e7
commit 10bc2297d4

View File

@ -18,18 +18,18 @@ package im.vector.matrix.android.internal.session.sync.job
import android.content.Context import android.content.Context
import androidx.work.* import androidx.work.*
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.auth.SessionParamsStore import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.filter.FilterRepository
import im.vector.matrix.android.internal.session.sync.SyncAPI
import im.vector.matrix.android.internal.session.sync.SyncResponseHandler
import im.vector.matrix.android.internal.session.sync.SyncTokenStore import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import timber.log.Timber import timber.log.Timber
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import javax.inject.Inject import javax.inject.Inject
@ -47,12 +47,12 @@ internal class SyncWorker(context: Context,
val automaticallyRetry: Boolean = false val automaticallyRetry: Boolean = false
) )
@Inject lateinit var syncAPI: SyncAPI @Inject
@Inject lateinit var filterRepository: FilterRepository lateinit var syncTokenStore: SyncTokenStore
@Inject lateinit var syncResponseHandler: SyncResponseHandler @Inject
@Inject lateinit var sessionParamsStore: SessionParamsStore lateinit var syncTask: SyncTask
@Inject lateinit var syncTokenStore: SyncTokenStore @Inject
lateinit var taskExecutor: TaskExecutor
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.i("Sync work starting") Timber.i("Sync work starting")
@ -60,37 +60,37 @@ internal class SyncWorker(context: Context,
val sessionComponent = getSessionComponent(params.userId) ?: return Result.success() val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
val requestParams = HashMap<String, String>()
requestParams["timeout"] = params.timeout.toString()
requestParams["filter"] = filterRepository.getFilter()
val token = syncTokenStore.getLastToken()?.also { requestParams["since"] = it }
Timber.i("Sync work last token $token")
return executeRequest<SyncResponse> { val latch = CountDownLatch(1)
apiCall = syncAPI.sync(requestParams) val nextBatch = syncTokenStore.getLastToken()
}.fold( Timber.i("Sync work last token $nextBatch")
{ val taskParams = SyncTask.Params(nextBatch, 0)
if (it is Failure.ServerError cancelableTask = syncTask.configureWith(taskParams)
&& it.error.code == MatrixError.UNKNOWN_TOKEN) { .callbackOn(TaskThread.CALLER)
sessionParamsStore.delete(params.userId) .executeOn(TaskThread.CALLER)
Result.failure() .dispatchTo(object : MatrixCallback<SyncResponse> {
} else { override fun onSuccess(data: SyncResponse) {
Timber.i("Sync work failed $it") syncTokenStore.saveToken(nextBatch)
Result.retry() latch.countDown()
} }
},
{ override fun onFailure(failure: Throwable) {
Timber.i("Sync work success next batch ${it.nextBatch}") Timber.e(failure)
if (!isStopped) { latch.countDown()
syncResponseHandler.handleResponse(it, token, false)
syncTokenStore.saveToken(it.nextBatch)
} }
if (params.automaticallyRetry) Result.retry() else Result.success()
} })
) .executeBy(taskExecutor)
latch.await()
return Result.success()
} }
companion object { companion object {
private var cancelableTask: Cancelable? = null
fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) { fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) {
val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false)) val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false))
val workRequest = OneTimeWorkRequestBuilder<SyncWorker>() val workRequest = OneTimeWorkRequestBuilder<SyncWorker>()
@ -116,6 +116,7 @@ internal class SyncWorker(context: Context,
} }
fun stopAnyBackgroundSync(context: Context) { fun stopAnyBackgroundSync(context: Context) {
cancelableTask?.cancel()
WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP") WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP")
} }
} }