SyncTask now handles by itself the sync token

This commit is contained in:
Benoit Marty 2019-07-04 14:46:59 +02:00
parent 10bc2297d4
commit c0f085cdf8
5 changed files with 32 additions and 58 deletions

View File

@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync
import arrow.core.Try import arrow.core.Try
import im.vector.matrix.android.internal.crypto.CryptoManager import im.vector.matrix.android.internal.crypto.CryptoManager
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject

View File

@ -29,9 +29,9 @@ import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.task.Task import im.vector.matrix.android.internal.task.Task
import javax.inject.Inject import javax.inject.Inject
internal interface SyncTask : Task<SyncTask.Params, SyncResponse> { internal interface SyncTask : Task<SyncTask.Params, Unit> {
data class Params(val token: String?, var timeout: Long = 30_000L) data class Params(var timeout: Long = 30_000L)
} }
@ -39,15 +39,17 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
private val credentials: Credentials, private val credentials: Credentials,
private val filterRepository: FilterRepository, private val filterRepository: FilterRepository,
private val syncResponseHandler: SyncResponseHandler, private val syncResponseHandler: SyncResponseHandler,
private val sessionParamsStore: SessionParamsStore private val sessionParamsStore: SessionParamsStore,
private val syncTokenStore: SyncTokenStore
) : SyncTask { ) : SyncTask {
override suspend fun execute(params: SyncTask.Params): Try<SyncResponse> { override suspend fun execute(params: SyncTask.Params): Try<Unit> {
val requestParams = HashMap<String, String>() val requestParams = HashMap<String, String>()
var timeout = 0L var timeout = 0L
if (params.token != null) { val token = syncTokenStore.getLastToken()
requestParams["since"] = params.token if (token != null) {
requestParams["since"] = token
timeout = params.timeout timeout = params.timeout
} }
requestParams["timeout"] = timeout.toString() requestParams["timeout"] = timeout.toString()
@ -65,9 +67,9 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
// Transmit the throwable // Transmit the throwable
throwable.failure() throwable.failure()
}.flatMap { syncResponse -> }.flatMap { syncResponse ->
syncResponseHandler.handleResponse(syncResponse, params.token, false) syncResponseHandler.handleResponse(syncResponse, token, false)
}.map {
syncTokenStore.saveToken(it.nextBatch)
} }
} }
} }

View File

@ -26,8 +26,6 @@ import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.task.configureWith
@ -35,9 +33,6 @@ import timber.log.Timber
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.util.* import java.util.*
private const val DEFAULT_LONG_POOL_TIMEOUT = 10_000L
private const val BACKGROUND_LONG_POOL_TIMEOUT = 0L
/** /**
* Can execute periodic sync task. * Can execute periodic sync task.
* An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver * An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver
@ -49,7 +44,6 @@ open class SyncService : Service() {
private var mIsSelfDestroyed: Boolean = false private var mIsSelfDestroyed: Boolean = false
private var cancelableTask: Cancelable? = null private var cancelableTask: Cancelable? = null
private lateinit var syncTokenStore: SyncTokenStore
private lateinit var syncTask: SyncTask private lateinit var syncTask: SyncTask
private lateinit var networkConnectivityChecker: NetworkConnectivityChecker private lateinit var networkConnectivityChecker: NetworkConnectivityChecker
private lateinit var taskExecutor: TaskExecutor private lateinit var taskExecutor: TaskExecutor
@ -57,18 +51,12 @@ open class SyncService : Service() {
var timer = Timer() var timer = Timer()
var nextBatchDelay = 0L
var timeout = 10_000L
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
Timber.i("onStartCommand ${intent}") Timber.i("onStartCommand ${intent}")
nextBatchDelay = 60_000L
timeout = 0
intent?.let { intent?.let {
val userId = it.getStringExtra(EXTRA_USER_ID) val userId = it.getStringExtra(EXTRA_USER_ID)
val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId) val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId)
?: return@let ?: return@let
syncTokenStore = sessionComponent.syncTokenStore()
syncTask = sessionComponent.syncTask() syncTask = sessionComponent.syncTask()
networkConnectivityChecker = sessionComponent.networkConnectivityChecker() networkConnectivityChecker = sessionComponent.networkConnectivityChecker()
taskExecutor = sessionComponent.taskExecutor() taskExecutor = sessionComponent.taskExecutor()
@ -105,7 +93,6 @@ open class SyncService : Service() {
} }
fun doSync(once: Boolean = false) { fun doSync(once: Boolean = false) {
var nextBatch = syncTokenStore.getLastToken()
if (!networkConnectivityChecker.isConnected()) { if (!networkConnectivityChecker.isConnected()) {
Timber.v("Sync is Paused. Waiting...") Timber.v("Sync is Paused. Waiting...")
//TODO Retry in ? //TODO Retry in ?
@ -113,24 +100,22 @@ open class SyncService : Service() {
override fun run() { override fun run() {
doSync() doSync()
} }
}, 5_000L) }, NO_NETWORK_DELAY)
} else { } else {
Timber.v("Execute sync request with token $nextBatch and timeout $timeout") Timber.v("Execute sync request with timeout 0")
val params = SyncTask.Params(nextBatch, timeout) val params = SyncTask.Params(TIME_OUT)
cancelableTask = syncTask.configureWith(params) cancelableTask = syncTask.configureWith(params)
.callbackOn(TaskThread.CALLER) .callbackOn(TaskThread.CALLER)
.executeOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER)
.dispatchTo(object : MatrixCallback<SyncResponse> { .dispatchTo(object : MatrixCallback<Unit> {
override fun onSuccess(data: SyncResponse) { override fun onSuccess(data: Unit) {
cancelableTask = null cancelableTask = null
nextBatch = data.nextBatch
syncTokenStore.saveToken(nextBatch)
if (!once) { if (!once) {
timer.schedule(object : TimerTask() { timer.schedule(object : TimerTask() {
override fun run() { override fun run() {
doSync() doSync()
} }
}, nextBatchDelay) }, NEXT_BATCH_DELAY)
} else { } else {
//stop //stop
stopMe() stopMe()
@ -180,6 +165,10 @@ open class SyncService : Service() {
companion object { companion object {
const val EXTRA_USER_ID = "EXTRA_USER_ID" const val EXTRA_USER_ID = "EXTRA_USER_ID"
const val TIME_OUT = 0L
const val NEXT_BATCH_DELAY = 60_000L
const val NO_NETWORK_DELAY = 5_000L
} }
} }

View File

@ -25,10 +25,7 @@ import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.session.sync.SyncState import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.task.configureWith
@ -44,7 +41,6 @@ private const val DEFAULT_LONG_POOL_DELAY = 0L
internal class SyncThread @Inject constructor(private val syncTask: SyncTask, internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private val networkConnectivityChecker: NetworkConnectivityChecker, private val networkConnectivityChecker: NetworkConnectivityChecker,
private val syncTokenStore: SyncTokenStore,
private val backgroundDetectionObserver: BackgroundDetectionObserver, private val backgroundDetectionObserver: BackgroundDetectionObserver,
private val taskExecutor: TaskExecutor private val taskExecutor: TaskExecutor
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener { ) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
@ -52,7 +48,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var state: SyncState = SyncState.IDLE private var state: SyncState = SyncState.IDLE
private var liveState = MutableLiveData<SyncState>() private var liveState = MutableLiveData<SyncState>()
private val lock = Object() private val lock = Object()
private var nextBatch = syncTokenStore.getLastToken()
private var cancelableTask: Cancelable? = null private var cancelableTask: Cancelable? = null
init { init {
@ -62,8 +57,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
fun restart() = synchronized(lock) { fun restart() = synchronized(lock) {
if (state is SyncState.PAUSED) { if (state is SyncState.PAUSED) {
Timber.v("Resume sync...") Timber.v("Resume sync...")
// Retrieve the last token, it may have been deleted in case of a clear cache
nextBatch = syncTokenStore.getLastToken()
updateStateTo(SyncState.RUNNING(catchingUp = true)) updateStateTo(SyncState.RUNNING(catchingUp = true))
lock.notify() lock.notify()
} }
@ -100,22 +93,20 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
lock.wait() lock.wait()
} }
} else { } else {
Timber.v("Execute sync request with token $nextBatch and timeout $DEFAULT_LONG_POOL_TIMEOUT") Timber.v("Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT")
val latch = CountDownLatch(1) val latch = CountDownLatch(1)
val params = SyncTask.Params(nextBatch, DEFAULT_LONG_POOL_TIMEOUT) val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT)
cancelableTask = syncTask.configureWith(params) cancelableTask = syncTask.configureWith(params)
.callbackOn(TaskThread.CALLER) .callbackOn(TaskThread.CALLER)
.executeOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER)
.dispatchTo(object : MatrixCallback<SyncResponse> { .dispatchTo(object : MatrixCallback<Unit> {
override fun onSuccess(data: SyncResponse) { override fun onSuccess(data: Unit) {
nextBatch = data.nextBatch
syncTokenStore.saveToken(nextBatch)
latch.countDown() latch.countDown()
} }
override fun onFailure(failure: Throwable) { override fun onFailure(failure: Throwable) {
if (failure is Failure.NetworkConnection if (failure is Failure.NetworkConnection
&& failure.cause is SocketTimeoutException) { && failure.cause is SocketTimeoutException) {
// Timeout are not critical // Timeout are not critical
Timber.v("Timeout") Timber.v("Timeout")
} else { } else {
@ -123,13 +114,13 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
} }
if (failure !is Failure.NetworkConnection if (failure !is Failure.NetworkConnection
|| failure.cause is JsonEncodingException) { || failure.cause is JsonEncodingException) {
// Wait 10s before retrying // Wait 10s before retrying
sleep(RETRY_WAIT_TIME_MS) sleep(RETRY_WAIT_TIME_MS)
} }
if (failure is Failure.ServerError if (failure is Failure.ServerError
&& (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) {
// No token or invalid token, stop the thread // No token or invalid token, stop the thread
updateStateTo(SyncState.KILLING) updateStateTo(SyncState.KILLING)
} }
@ -140,7 +131,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
}) })
.executeBy(taskExecutor) .executeBy(taskExecutor)
latch.await() latch.await()
if (state is SyncState.RUNNING) { if (state is SyncState.RUNNING) {
updateStateTo(SyncState.RUNNING(catchingUp = false)) updateStateTo(SyncState.RUNNING(catchingUp = false))
} }

View File

@ -21,8 +21,6 @@ import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.task.configureWith
@ -47,8 +45,6 @@ internal class SyncWorker(context: Context,
val automaticallyRetry: Boolean = false val automaticallyRetry: Boolean = false
) )
@Inject
lateinit var syncTokenStore: SyncTokenStore
@Inject @Inject
lateinit var syncTask: SyncTask lateinit var syncTask: SyncTask
@Inject @Inject
@ -62,15 +58,12 @@ internal class SyncWorker(context: Context,
val latch = CountDownLatch(1) val latch = CountDownLatch(1)
val nextBatch = syncTokenStore.getLastToken() val taskParams = SyncTask.Params(0)
Timber.i("Sync work last token $nextBatch")
val taskParams = SyncTask.Params(nextBatch, 0)
cancelableTask = syncTask.configureWith(taskParams) cancelableTask = syncTask.configureWith(taskParams)
.callbackOn(TaskThread.CALLER) .callbackOn(TaskThread.CALLER)
.executeOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER)
.dispatchTo(object : MatrixCallback<SyncResponse> { .dispatchTo(object : MatrixCallback<Unit> {
override fun onSuccess(data: SyncResponse) { override fun onSuccess(data: Unit) {
syncTokenStore.saveToken(nextBatch)
latch.countDown() latch.countDown()
} }