From 5dd46e82d75b5816df11c93883cfbb78acd40a5f Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 13 Dec 2019 18:21:44 +0100 Subject: [PATCH] Sync: make only one big transaction to avoid having bad states --- .../android/internal/di/MatrixModule.kt | 1 - .../internal/session/sync/GroupSyncHandler.kt | 18 +- .../internal/session/sync/RoomSyncHandler.kt | 48 ++--- .../session/sync/SyncResponseHandler.kt | 72 +++++-- .../android/internal/session/sync/SyncTask.kt | 2 - .../internal/session/sync/SyncTokenStore.kt | 9 +- .../sync/UserAccountDataSyncHandler.kt | 193 +++++++++++------- .../internal/session/sync/job/SyncThread.kt | 6 - .../internal/task/CoroutineSequencer.kt | 2 - .../internal/task/CoroutineSequencersTest.kt | 2 - .../vector/riotx/core/extensions/Session.kt | 2 - .../im/vector/riotx/features/MainActivity.kt | 1 - 12 files changed, 196 insertions(+), 160 deletions(-) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt index 8b37d01b70..04b8565546 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.android.asCoroutineDispatcher import kotlinx.coroutines.asCoroutineDispatcher import org.matrix.olm.OlmManager -import java.util.concurrent.Executors @Module internal object MatrixModule { diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/GroupSyncHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/GroupSyncHandler.kt index 2ca9b6cccc..392db0a73c 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/GroupSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/GroupSyncHandler.kt @@ -16,7 +16,6 @@ package im.vector.matrix.android.internal.session.sync -import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.R import im.vector.matrix.android.api.session.room.model.Membership import im.vector.matrix.android.internal.database.model.GroupEntity @@ -25,11 +24,10 @@ import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressServi import im.vector.matrix.android.internal.session.mapWithProgress import im.vector.matrix.android.internal.session.sync.model.GroupsSyncResponse import im.vector.matrix.android.internal.session.sync.model.InvitedGroupSync -import im.vector.matrix.android.internal.util.awaitTransaction import io.realm.Realm import javax.inject.Inject -internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarchy) { +internal class GroupSyncHandler @Inject constructor() { sealed class HandlingStrategy { data class JOINED(val data: Map) : HandlingStrategy() @@ -37,12 +35,14 @@ internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarc data class LEFT(val data: Map) : HandlingStrategy() } - suspend fun handle(roomsSyncResponse: GroupsSyncResponse, reporter: DefaultInitialSyncProgressService? = null) { - monarchy.awaitTransaction { realm -> - handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter) - handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter) - handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter) - } + fun handle( + realm: Realm, + roomsSyncResponse: GroupsSyncResponse, + reporter: DefaultInitialSyncProgressService? = null + ) { + handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter) + handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter) + handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter) } // PRIVATE METHODS ***************************************************************************** diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt index 4a003eb7d9..a080a5158e 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt @@ -16,9 +16,7 @@ package im.vector.matrix.android.internal.session.sync -import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.R -import im.vector.matrix.android.api.pushrules.RuleScope import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.EventType import im.vector.matrix.android.api.session.events.model.toModel @@ -34,31 +32,21 @@ import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoo import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService import im.vector.matrix.android.internal.session.mapWithProgress -import im.vector.matrix.android.internal.session.notification.DefaultPushRuleService -import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater import im.vector.matrix.android.internal.session.room.read.FullyReadContent import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection import im.vector.matrix.android.internal.session.sync.model.* import im.vector.matrix.android.internal.session.user.UserEntityFactory -import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.task.configureWith -import im.vector.matrix.android.internal.util.awaitTransaction import io.realm.Realm import io.realm.kotlin.createObject import timber.log.Timber import javax.inject.Inject -internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarchy, - private val readReceiptHandler: ReadReceiptHandler, +internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler, private val roomSummaryUpdater: RoomSummaryUpdater, private val roomTagHandler: RoomTagHandler, private val roomFullyReadHandler: RoomFullyReadHandler, - private val cryptoService: DefaultCryptoService, - private val tokenStore: SyncTokenStore, - private val pushRuleService: DefaultPushRuleService, - private val processForPushTask: ProcessEventForPushTask, - private val taskExecutor: TaskExecutor) { + private val cryptoService: DefaultCryptoService) { sealed class HandlingStrategy { data class JOINED(val data: Map) : HandlingStrategy() @@ -66,28 +54,16 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch data class LEFT(val data: Map) : HandlingStrategy() } - suspend fun handle(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService? = null) { + fun handle( + realm: Realm, + roomsSyncResponse: RoomsSyncResponse, + isInitialSync: Boolean, + reporter: DefaultInitialSyncProgressService? = null + ) { Timber.v("Execute transaction from $this") - monarchy.awaitTransaction { realm -> - handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) - } - // handle event for bing rule checks - checkPushRules(roomsSyncResponse) - } - - private fun checkPushRules(roomsSyncResponse: RoomsSyncResponse) { - Timber.v("[PushRules] --> checkPushRules") - if (tokenStore.getLastToken() == null) { - Timber.v("[PushRules] <-- No push rule check on initial sync") - return - } // nothing on initial sync - - val rules = pushRuleService.getPushRules(RuleScope.GLOBAL) - processForPushTask.configureWith(ProcessEventForPushTask.Params(roomsSyncResponse, rules)) - .executeBy(taskExecutor) - Timber.v("[PushRules] <-- Push task scheduled") + handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) + handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) + handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) } // PRIVATE METHODS ***************************************************************************** @@ -137,7 +113,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt() - ?: Int.MIN_VALUE + ?: Int.MIN_VALUE val untimelinedStateIndex = minStateIndex + 1 roomSync.state.events.forEach { event -> roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt index 1ae185b073..1454fdae7d 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt @@ -16,20 +16,30 @@ package im.vector.matrix.android.internal.session.sync +import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.R +import im.vector.matrix.android.api.pushrules.PushRuleService +import im.vector.matrix.android.api.pushrules.RuleScope import im.vector.matrix.android.internal.crypto.DefaultCryptoService import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService +import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask import im.vector.matrix.android.internal.session.reportSubtask +import im.vector.matrix.android.internal.session.sync.model.RoomsSyncResponse import im.vector.matrix.android.internal.session.sync.model.SyncResponse +import im.vector.matrix.android.internal.util.awaitTransaction import timber.log.Timber import javax.inject.Inject import kotlin.system.measureTimeMillis -internal class SyncResponseHandler @Inject constructor(private val roomSyncHandler: RoomSyncHandler, +internal class SyncResponseHandler @Inject constructor(private val monarchy: Monarchy, + private val roomSyncHandler: RoomSyncHandler, private val userAccountDataSyncHandler: UserAccountDataSyncHandler, private val groupSyncHandler: GroupSyncHandler, private val cryptoSyncHandler: CryptoSyncHandler, private val cryptoService: DefaultCryptoService, + private val tokenStore: SyncTokenStore, + private val processEventForPushTask: ProcessEventForPushTask, + private val pushRuleService: PushRuleService, private val initialSyncProgressService: DefaultInitialSyncProgressService) { suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?) { @@ -45,26 +55,27 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl }.also { Timber.v("Finish handling start cryptoService in $it ms") } - val measure = measureTimeMillis { - // Handle the to device events before the room ones - // to ensure to decrypt them properly - measureTimeMillis { - Timber.v("Handle toDevice") - reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) { - if (syncResponse.toDevice != null) { - cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter) - } - } - }.also { - Timber.v("Finish handling toDevice in $it ms") - } + // Handle the to device events before the room ones + // to ensure to decrypt them properly + measureTimeMillis { + Timber.v("Handle toDevice") + reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) { + if (syncResponse.toDevice != null) { + cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter) + } + } + }.also { + Timber.v("Finish handling toDevice in $it ms") + } + + // Start one big transaction + monarchy.awaitTransaction { realm -> measureTimeMillis { Timber.v("Handle rooms") - reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) { if (syncResponse.rooms != null) { - roomSyncHandler.handle(syncResponse.rooms, isInitialSync, reporter) + roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter) } } }.also { @@ -75,7 +86,7 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 100, 0.1f) { Timber.v("Handle groups") if (syncResponse.groups != null) { - groupSyncHandler.handle(syncResponse.groups, reporter) + groupSyncHandler.handle(realm, syncResponse.groups, reporter) } } }.also { @@ -85,15 +96,32 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl measureTimeMillis { reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) { Timber.v("Handle accountData") - userAccountDataSyncHandler.handle(syncResponse.accountData, syncResponse.rooms?.invite) + userAccountDataSyncHandler.handle(realm, syncResponse.accountData) } }.also { Timber.v("Finish handling accountData in $it ms") } - - Timber.v("On sync completed") - cryptoSyncHandler.onSyncCompleted(syncResponse) + tokenStore.saveToken(realm, syncResponse.nextBatch) } - Timber.v("Finish handling sync in $measure ms") + + // Everything else we need to do outside the transaction + syncResponse.rooms?.also { + checkPushRules(it, isInitialSync) + userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite) + } + Timber.v("On sync completed") + cryptoSyncHandler.onSyncCompleted(syncResponse) + } + + private suspend fun checkPushRules(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean) { + Timber.v("[PushRules] --> checkPushRules") + if (isInitialSync) { + Timber.v("[PushRules] <-- No push rule check on initial sync") + return + } // nothing on initial sync + + val rules = pushRuleService.getPushRules(RuleScope.GLOBAL) + processEventForPushTask.execute(ProcessEventForPushTask.Params(roomsSyncResponse, rules)) + Timber.v("[PushRules] <-- Push task scheduled") } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt index 688f159fb0..fbc419d3b9 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt @@ -21,7 +21,6 @@ import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.internal.auth.SessionParamsStore import im.vector.matrix.android.internal.di.UserId -import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.executeRequest import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService import im.vector.matrix.android.internal.session.filter.FilterRepository @@ -88,7 +87,6 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, throw throwable } syncResponseHandler.handleResponse(syncResponse, token) - syncTokenStore.saveToken(syncResponse.nextBatch) if (isInitialSync) { initialSyncProgressService.endAll() } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTokenStore.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTokenStore.kt index 350f2a1d83..d0af7d3ff5 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTokenStore.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTokenStore.kt @@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.internal.database.model.SyncEntity -import im.vector.matrix.android.internal.util.awaitTransaction import io.realm.Realm import javax.inject.Inject @@ -30,10 +29,8 @@ internal class SyncTokenStore @Inject constructor(private val monarchy: Monarchy } } - suspend fun saveToken(token: String?) { - monarchy.awaitTransaction { - val sync = SyncEntity(token) - it.insertOrUpdate(sync) - } + fun saveToken(realm: Realm, token: String?) { + val sync = SyncEntity(token) + realm.insertOrUpdate(sync) } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/UserAccountDataSyncHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/UserAccountDataSyncHandler.kt index 9cc3a5a3c6..3deb612756 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/UserAccountDataSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/UserAccountDataSyncHandler.kt @@ -17,44 +17,39 @@ package im.vector.matrix.android.internal.session.sync import com.zhuinden.monarchy.Monarchy +import im.vector.matrix.android.api.pushrules.RuleScope +import im.vector.matrix.android.api.pushrules.RuleSetKey import im.vector.matrix.android.api.session.events.model.toModel import im.vector.matrix.android.api.session.room.model.RoomMember +import im.vector.matrix.android.internal.database.mapper.PushRulesMapper import im.vector.matrix.android.internal.database.mapper.asDomain -import im.vector.matrix.android.internal.database.model.RoomSummaryEntity +import im.vector.matrix.android.internal.database.model.* import im.vector.matrix.android.internal.database.query.getDirectRooms +import im.vector.matrix.android.internal.database.query.getOrCreate import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.di.UserId -import im.vector.matrix.android.internal.session.pushers.SavePushRulesTask import im.vector.matrix.android.internal.session.room.membership.RoomMembers import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync import im.vector.matrix.android.internal.session.sync.model.accountdata.* import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper -import im.vector.matrix.android.internal.session.user.accountdata.SaveBreadcrumbsTask -import im.vector.matrix.android.internal.session.user.accountdata.SaveIgnoredUsersTask import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask -import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.task.configureWith -import im.vector.matrix.android.internal.util.awaitTransaction import io.realm.Realm +import io.realm.RealmList import timber.log.Timber import javax.inject.Inject internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy, @UserId private val userId: String, private val directChatsHelper: DirectChatsHelper, - private val updateUserAccountDataTask: UpdateUserAccountDataTask, - private val savePushRulesTask: SavePushRulesTask, - private val saveIgnoredUsersTask: SaveIgnoredUsersTask, - private val saveBreadcrumbsTask: SaveBreadcrumbsTask, - private val taskExecutor: TaskExecutor) { + private val updateUserAccountDataTask: UpdateUserAccountDataTask) { - suspend fun handle(accountData: UserAccountDataSync?, invites: Map?) { + fun handle(realm: Realm, accountData: UserAccountDataSync?) { accountData?.list?.forEach { when (it) { - is UserAccountDataDirectMessages -> handleDirectChatRooms(it) - is UserAccountDataPushRules -> handlePushRules(it) - is UserAccountDataIgnoredUsers -> handleIgnoredUsers(it) - is UserAccountDataBreadcrumbs -> handleBreadcrumbs(it) + is UserAccountDataDirectMessages -> handleDirectChatRooms(realm, it) + is UserAccountDataPushRules -> handlePushRules(realm, it) + is UserAccountDataIgnoredUsers -> handleIgnoredUsers(realm, it) + is UserAccountDataBreadcrumbs -> handleBreadcrumbs(realm, it) is UserAccountDataFallback -> Timber.d("Receive account data of unhandled type ${it.type}") else -> error("Missing code here!") } @@ -65,78 +60,134 @@ internal class UserAccountDataSyncHandler @Inject constructor(private val monarc // it.toString() // MoshiProvider.providesMoshi() // } - - monarchy.doWithRealm { realm -> - synchronizeWithServerIfNeeded(realm, invites) - } - } - - private suspend fun handlePushRules(userAccountDataPushRules: UserAccountDataPushRules) { - savePushRulesTask.execute(SavePushRulesTask.Params(userAccountDataPushRules.content)) - } - - private suspend fun handleDirectChatRooms(directMessages: UserAccountDataDirectMessages) { - monarchy.awaitTransaction { realm -> - val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm) - oldDirectRooms.forEach { - it.isDirect = false - it.directUserId = null - } - directMessages.content.forEach { - val userId = it.key - it.value.forEach { roomId -> - val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst() - if (roomSummaryEntity != null) { - roomSummaryEntity.isDirect = true - roomSummaryEntity.directUserId = userId - realm.insertOrUpdate(roomSummaryEntity) - } - } - } - } } // If we get some direct chat invites, we synchronize the user account data including those. - private fun synchronizeWithServerIfNeeded(realm: Realm, invites: Map?) { + suspend fun synchronizeWithServerIfNeeded(invites: Map) { if (invites.isNullOrEmpty()) return val directChats = directChatsHelper.getLocalUserAccount() var hasUpdate = false - invites.forEach { (roomId, _) -> - val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId) - val inviterId = myUserStateEvent?.sender - val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() } - val isDirect = myUserRoomMember?.isDirect - if (inviterId != null && inviterId != userId && isDirect == true) { - directChats - .getOrPut(inviterId, { arrayListOf() }) - .apply { - if (contains(roomId)) { - Timber.v("Direct chats already include room $roomId with user $inviterId") - } else { - add(roomId) - hasUpdate = true + monarchy.doWithRealm { realm -> + invites.forEach { (roomId, _) -> + val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId) + val inviterId = myUserStateEvent?.sender + val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() } + val isDirect = myUserRoomMember?.isDirect + if (inviterId != null && inviterId != userId && isDirect == true) { + directChats + .getOrPut(inviterId, { arrayListOf() }) + .apply { + if (contains(roomId)) { + Timber.v("Direct chats already include room $roomId with user $inviterId") + } else { + add(roomId) + hasUpdate = true + } } - } + } } } if (hasUpdate) { val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams( directMessages = directChats ) - updateUserAccountDataTask.configureWith(updateUserAccountParams).executeBy(taskExecutor) + updateUserAccountDataTask.execute(updateUserAccountParams) } } - private fun handleIgnoredUsers(userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) { - saveIgnoredUsersTask - .configureWith(SaveIgnoredUsersTask.Params(userAccountDataIgnoredUsers.content.ignoredUsers.keys.toList())) - .executeBy(taskExecutor) + + private fun handlePushRules(realm: Realm, userAccountDataPushRules: UserAccountDataPushRules) { + val pushRules = userAccountDataPushRules.content + realm.where(PushRulesEntity::class.java) + .findAll() + .deleteAllFromRealm() + + // Save only global rules for the moment + val globalRules = pushRules.global + + val content = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.CONTENT } + globalRules.content?.forEach { rule -> + content.pushRules.add(PushRulesMapper.map(rule)) + } + realm.insertOrUpdate(content) + + val override = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.OVERRIDE } + globalRules.override?.forEach { rule -> + PushRulesMapper.map(rule).also { + override.pushRules.add(it) + } + } + realm.insertOrUpdate(override) + + val rooms = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.ROOM } + globalRules.room?.forEach { rule -> + rooms.pushRules.add(PushRulesMapper.map(rule)) + } + realm.insertOrUpdate(rooms) + + val senders = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.SENDER } + globalRules.sender?.forEach { rule -> + senders.pushRules.add(PushRulesMapper.map(rule)) + } + realm.insertOrUpdate(senders) + + val underrides = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.UNDERRIDE } + globalRules.underride?.forEach { rule -> + underrides.pushRules.add(PushRulesMapper.map(rule)) + } + realm.insertOrUpdate(underrides) + } + + private fun handleDirectChatRooms(realm: Realm, directMessages: UserAccountDataDirectMessages) { + val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm) + oldDirectRooms.forEach { + it.isDirect = false + it.directUserId = null + } + directMessages.content.forEach { + val userId = it.key + it.value.forEach { roomId -> + val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst() + if (roomSummaryEntity != null) { + roomSummaryEntity.isDirect = true + roomSummaryEntity.directUserId = userId + realm.insertOrUpdate(roomSummaryEntity) + } + } + } + } + + private fun handleIgnoredUsers(realm: Realm, userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) { + val userIds = userAccountDataIgnoredUsers.content.ignoredUsers.keys + realm.where(IgnoredUserEntity::class.java) + .findAll() + .deleteAllFromRealm() + // And save the new received list + userIds.forEach { realm.createObject(IgnoredUserEntity::class.java).apply { userId = it } } // TODO If not initial sync, we should execute a init sync } - private fun handleBreadcrumbs(userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) { - saveBreadcrumbsTask - .configureWith(SaveBreadcrumbsTask.Params(userAccountDataBreadcrumbs.content.recentRoomIds)) - .executeBy(taskExecutor) + private fun handleBreadcrumbs(realm: Realm, userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) { + val recentRoomIds = userAccountDataBreadcrumbs.content.recentRoomIds + val entity = BreadcrumbsEntity.getOrCreate(realm) + + // And save the new received list + entity.recentRoomIds = RealmList().apply { addAll(recentRoomIds) } + + // Update the room summaries + // Reset all the indexes... + RoomSummaryEntity.where(realm) + .greaterThan(RoomSummaryEntityFields.BREADCRUMBS_INDEX, RoomSummaryEntity.NOT_IN_BREADCRUMBS) + .findAll() + .forEach { + it.breadcrumbsIndex = RoomSummaryEntity.NOT_IN_BREADCRUMBS + } + + // ...and apply new indexes + recentRoomIds.forEachIndexed { index, roomId -> + RoomSummaryEntity.where(realm, roomId) + .findFirst() + ?.breadcrumbsIndex = index + } } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index 1acb7dbb4a..cf7d3ce63d 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -19,21 +19,15 @@ package im.vector.matrix.android.internal.session.sync.job import androidx.lifecycle.LiveData import androidx.lifecycle.MutableLiveData import com.squareup.moshi.JsonEncodingException -import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.session.sync.SyncState -import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.task.TaskThread -import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.BackgroundDetectionObserver import kotlinx.coroutines.* import timber.log.Timber import java.net.SocketTimeoutException -import java.util.concurrent.CountDownLatch import javax.inject.Inject private const val RETRY_WAIT_TIME_MS = 10_000L diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt index 1df0dae901..7062c63816 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt @@ -84,6 +84,4 @@ internal open class ChannelCoroutineSequencer : CoroutineSequencer { throw cancellation } } - } - diff --git a/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt index a367632d9f..9591feaa32 100644 --- a/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt +++ b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt @@ -99,7 +99,6 @@ class CoroutineSequencersTest { sequencer.post { suspendingMethod("#1") }.also { results.add(it) } - }, GlobalScope.launch(dispatcher) { val result = sequencer.post { suspendingMethod("#2") }.also { @@ -127,5 +126,4 @@ class CoroutineSequencersTest { println("BLOCKING METHOD $name ENDS on ${Thread.currentThread().name}") return name } - } diff --git a/vector/src/main/java/im/vector/riotx/core/extensions/Session.kt b/vector/src/main/java/im/vector/riotx/core/extensions/Session.kt index 9a6905e4c7..3c201542e4 100644 --- a/vector/src/main/java/im/vector/riotx/core/extensions/Session.kt +++ b/vector/src/main/java/im/vector/riotx/core/extensions/Session.kt @@ -17,13 +17,11 @@ package im.vector.riotx.core.extensions import android.content.Context -import android.content.Intent import androidx.core.content.ContextCompat import androidx.lifecycle.Lifecycle import androidx.lifecycle.ProcessLifecycleOwner import im.vector.matrix.android.api.session.Session import im.vector.matrix.android.api.session.sync.FilterService -import im.vector.matrix.android.internal.session.sync.job.SyncService import im.vector.riotx.core.services.VectorSyncService import im.vector.riotx.features.notifications.PushRuleTriggerListener import im.vector.riotx.features.session.SessionListener diff --git a/vector/src/main/java/im/vector/riotx/features/MainActivity.kt b/vector/src/main/java/im/vector/riotx/features/MainActivity.kt index b95eddd011..20a71803d8 100644 --- a/vector/src/main/java/im/vector/riotx/features/MainActivity.kt +++ b/vector/src/main/java/im/vector/riotx/features/MainActivity.kt @@ -26,7 +26,6 @@ import im.vector.riotx.R import im.vector.riotx.core.di.ActiveSessionHolder import im.vector.riotx.core.di.ScreenComponent import im.vector.riotx.core.error.ErrorFormatter -import im.vector.riotx.core.extensions.configureAndStart import im.vector.riotx.core.extensions.startSyncing import im.vector.riotx.core.platform.VectorBaseActivity import im.vector.riotx.core.utils.deleteAllFiles