From f6b38d2b494cb18b4be6c32a5c57e1717bf88649 Mon Sep 17 00:00:00 2001 From: ariskotsomitopoulos Date: Mon, 21 Mar 2022 13:13:09 +0200 Subject: [PATCH] Add runBlocking when decrypt events to avoid thread switching when accessing the realm instance (thread local) --- .../database/helper/ThreadSummaryHelper.kt | 19 ++++-- .../sync/handler/room/RoomSyncHandler.kt | 66 ++++++++++--------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt index 7087f07162..8d9304f61b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt @@ -20,6 +20,7 @@ import io.realm.Realm import io.realm.RealmQuery import io.realm.Sort import io.realm.kotlin.createObject +import kotlinx.coroutines.runBlocking import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.crypto.MXCryptoError import org.matrix.android.sdk.api.session.events.model.Event @@ -127,7 +128,7 @@ private fun EventEntity.toTimelineEventEntity(roomMemberContentsByUser: HashMap< return timelineEventEntity } -internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( +internal fun ThreadSummaryEntity.Companion.createOrUpdate( threadSummaryType: ThreadSummaryUpdateType, realm: Realm, roomId: String, @@ -153,10 +154,18 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( } val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also { - decryptIfNeeded(cryptoService, it, roomId) + try { + decryptIfNeeded(cryptoService, it, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") + } } val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also { - decryptIfNeeded(cryptoService, it, roomId) + try { + decryptIfNeeded(cryptoService, it, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") + } } val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId) @@ -204,8 +213,8 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( } } -private suspend fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) { - cryptoService ?: return +private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) = runBlocking { + cryptoService ?: return@runBlocking val event = eventEntity.asDomain() if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) { try { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 8fe85f0d31..51e32b25cb 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -102,11 +102,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle data class LEFT(val data: Map) : HandlingStrategy() } - suspend fun handle(realm: Realm, - roomsSyncResponse: RoomsSyncResponse, - isInitialSync: Boolean, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter? = null) { + fun handle(realm: Realm, + roomsSyncResponse: RoomsSyncResponse, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter? = null) { Timber.v("Execute transaction from $this") handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter) handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter) @@ -121,11 +121,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } // PRIVATE METHODS ***************************************************************************** - private suspend fun handleRoomSync(realm: Realm, - handlingStrategy: HandlingStrategy, - isInitialSync: Boolean, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter?) { + private fun handleRoomSync(realm: Realm, + handlingStrategy: HandlingStrategy, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val insertType = if (isInitialSync) { EventInsertType.INITIAL_SYNC } else { @@ -158,11 +158,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm.insertOrUpdate(rooms) } - private suspend fun insertJoinRoomsFromInitSync(realm: Realm, - handlingStrategy: HandlingStrategy.JOINED, - syncLocalTimeStampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter?) { + private fun insertJoinRoomsFromInitSync(realm: Realm, + handlingStrategy: HandlingStrategy.JOINED, + syncLocalTimeStampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val bestChunkSize = computeBestChunkSize( listSize = handlingStrategy.data.keys.size, limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE @@ -200,12 +200,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } - private suspend fun handleJoinedRoom(realm: Realm, - roomId: String, - roomSync: RoomSync, - insertType: EventInsertType, - syncLocalTimestampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { + private fun handleJoinedRoom(realm: Realm, + roomId: String, + roomSync: RoomSync, + insertType: EventInsertType, + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { Timber.v("Handle join sync for room $roomId") val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) @@ -351,15 +351,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle return roomEntity } - private suspend fun handleTimelineEvents(realm: Realm, - roomId: String, - roomEntity: RoomEntity, - eventList: List, - prevToken: String? = null, - isLimited: Boolean = true, - insertType: EventInsertType, - syncLocalTimestampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity { + private fun handleTimelineEvents(realm: Realm, + roomId: String, + roomEntity: RoomEntity, + eventList: List, + prevToken: String? = null, + isLimited: Boolean = true, + insertType: EventInsertType, + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity { val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId) if (isLimited && lastChunk != null) { lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true) @@ -387,8 +387,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle val isInitialSync = insertType == EventInsertType.INITIAL_SYNC if (event.isEncrypted() && !isInitialSync) { - runBlocking { + try { decryptIfNeeded(event, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") } } var contentToInject: String? = null @@ -504,7 +506,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } - private suspend fun decryptIfNeeded(event: Event, roomId: String) { + private fun decryptIfNeeded(event: Event, roomId: String) = runBlocking { try { // Event from sync does not have roomId, so add it to the event first val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")