diff --git a/matrix-sdk-android/src/kotlinCrypto/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt b/matrix-sdk-android/src/kotlinCrypto/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt index cfbfd45ef4..3661ddf889 100755 --- a/matrix-sdk-android/src/kotlinCrypto/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt +++ b/matrix-sdk-android/src/kotlinCrypto/java/org/matrix/android/sdk/internal/crypto/DefaultCryptoService.kt @@ -193,7 +193,7 @@ internal class DefaultCryptoService @Inject constructor( private val isStarting = AtomicBoolean(false) private val isStarted = AtomicBoolean(false) - override fun onStateEvent(roomId: String, event: Event) { + override suspend fun onStateEvent(roomId: String, event: Event) { when (event.type) { EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event) @@ -201,7 +201,7 @@ internal class DefaultCryptoService @Inject constructor( } } - override fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { + override suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { // handle state events if (event.isStateEvent()) { when (event.type) { @@ -214,7 +214,7 @@ internal class DefaultCryptoService @Inject constructor( // handle verification if (!initialSync) { if (event.type != null && verificationMessageProcessor.shouldProcess(event.type)) { - cryptoCoroutineScope.launch(coroutineDispatchers.dmVerif) { + withContext(coroutineDispatchers.dmVerif) { verificationMessageProcessor.process(roomId, event) } } @@ -788,7 +788,7 @@ internal class DefaultCryptoService @Inject constructor( */ @Throws(MXCryptoError::class) private suspend fun internalDecryptEvent(event: Event, timeline: String): MXEventDecryptionResult { - return eventDecryptor.decryptEvent(event, timeline) + return withContext(coroutineDispatchers.crypto) { eventDecryptor.decryptEvent(event, timeline) } } /** @@ -937,13 +937,13 @@ internal class DefaultCryptoService @Inject constructor( * @param roomId the room Id * @param event the encryption event. */ - private fun onRoomEncryptionEvent(roomId: String, event: Event) { + private suspend fun onRoomEncryptionEvent(roomId: String, event: Event) { if (!event.isStateEvent()) { // Ignore Timber.tag(loggerTag.value).w("Invalid encryption event") return } - cryptoCoroutineScope.launch(coroutineDispatchers.crypto) { + withContext(coroutineDispatchers.io) { val userIds = getRoomUserIds(roomId) setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), true, userIds) } @@ -961,7 +961,7 @@ internal class DefaultCryptoService @Inject constructor( * @param roomId the room Id * @param event the membership event causing the change */ - private fun onRoomMembershipEvent(roomId: String, event: Event) { + private suspend fun onRoomMembershipEvent(roomId: String, event: Event) { // because the encryption event can be after the join/invite in the same batch event.stateKey?.let { _ -> val roomMember: RoomMemberContent? = event.content.toModel() @@ -970,37 +970,39 @@ internal class DefaultCryptoService @Inject constructor( unrequestedForwardManager.onInviteReceived(roomId, event.senderId.orEmpty(), clock.epochMillis()) } } - roomEncryptorsStore.get(roomId) ?: /* No encrypting in this room */ return - - event.stateKey?.let { userId -> - val roomMember: RoomMemberContent? = event.content.toModel() - val membership = roomMember?.membership - if (membership == Membership.JOIN) { - // make sure we are tracking the deviceList for this user. - deviceListManager.startTrackingDeviceList(listOf(userId)) - } else if (membership == Membership.INVITE && - shouldEncryptForInvitedMembers(roomId) && - isEncryptionEnabledForInvitedUser()) { - // track the deviceList for this invited user. - // Caution: there's a big edge case here in that federated servers do not - // know what other servers are in the room at the time they've been invited. - // They therefore will not send device updates if a user logs in whilst - // their state is invite. - deviceListManager.startTrackingDeviceList(listOf(userId)) + withContext(coroutineDispatchers.io) { + event.stateKey?.let { userId -> + val roomMember: RoomMemberContent? = event.content.toModel() + val membership = roomMember?.membership + if (membership == Membership.JOIN) { + // make sure we are tracking the deviceList for this user. + deviceListManager.startTrackingDeviceList(listOf(userId)) + } else if (membership == Membership.INVITE && + shouldEncryptForInvitedMembers(roomId) && + isEncryptionEnabledForInvitedUser()) { + // track the deviceList for this invited user. + // Caution: there's a big edge case here in that federated servers do not + // know what other servers are in the room at the time they've been invited. + // They therefore will not send device updates if a user logs in whilst + // their state is invite. + deviceListManager.startTrackingDeviceList(listOf(userId)) + } } } } - private fun onRoomHistoryVisibilityEvent(roomId: String, event: Event) { + private suspend fun onRoomHistoryVisibilityEvent(roomId: String, event: Event) { if (!event.isStateEvent()) return val eventContent = event.content.toModel() val historyVisibility = eventContent?.historyVisibility - if (historyVisibility == null) { - cryptoStore.setShouldShareHistory(roomId, false) - } else { - cryptoStore.setShouldEncryptForInvitedMembers(roomId, historyVisibility != RoomHistoryVisibility.JOINED) - cryptoStore.setShouldShareHistory(roomId, historyVisibility.shouldShareHistory()) + withContext(coroutineDispatchers.io) { + if (historyVisibility == null) { + cryptoStore.setShouldShareHistory(roomId, false) + } else { + cryptoStore.setShouldEncryptForInvitedMembers(roomId, historyVisibility != RoomHistoryVisibility.JOINED) + cryptoStore.setShouldShareHistory(roomId, historyVisibility.shouldShareHistory()) + } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/crypto/CryptoService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/crypto/CryptoService.kt index c6ee3aad30..006cd72142 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/crypto/CryptoService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/crypto/CryptoService.kt @@ -222,8 +222,8 @@ interface CryptoService { suspend fun onSyncWillProcess(isInitialSync: Boolean) fun isStarted(): Boolean suspend fun receiveSyncChanges(toDevice: ToDeviceSyncResponse?, deviceChanges: DeviceListResponse?, keyCounts: DeviceOneTimeKeysCountSyncResponse?) - fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) - fun onStateEvent(roomId: String, event: Event) {} + suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) + suspend fun onStateEvent(roomId: String, event: Event) {} suspend fun onSyncCompleted(syncResponse: SyncResponse) fun logDbUsageInfo() suspend fun setRoomUnBlacklistUnverifiedDevices(roomId: String) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/create/CreateLocalRoomTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/create/CreateLocalRoomTask.kt index d6defc6d9d..1e165708d0 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/create/CreateLocalRoomTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/create/CreateLocalRoomTask.kt @@ -179,7 +179,9 @@ internal class DefaultCreateLocalRoomTask @Inject constructor( } // Give info to crypto module - cryptoService.onStateEvent(roomId, event) + runBlocking { + cryptoService.onStateEvent(roomId, event) + } } roomMemberContentsByUser.getOrPut(event.senderId) { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index f995e29cef..97f83f2c77 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -23,6 +23,9 @@ import org.matrix.android.sdk.api.extensions.measureMetric import org.matrix.android.sdk.api.extensions.measureSpan import org.matrix.android.sdk.api.metrics.SyncDurationMetricPlugin import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.crypto.MXCryptoError +import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult +import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.pushrules.PushRuleService import org.matrix.android.sdk.api.session.pushrules.RuleScope import org.matrix.android.sdk.api.session.sync.InitialSyncStep @@ -39,6 +42,7 @@ import org.matrix.android.sdk.internal.session.sync.handler.SyncResponsePostTrea import org.matrix.android.sdk.internal.session.sync.handler.UserAccountDataSyncHandler import org.matrix.android.sdk.internal.session.sync.handler.room.RoomSyncHandler import org.matrix.android.sdk.internal.util.awaitTransaction +import org.matrix.android.sdk.internal.util.time.Clock import timber.log.Timber import javax.inject.Inject import kotlin.system.measureTimeMillis @@ -56,6 +60,7 @@ internal class SyncResponseHandler @Inject constructor( private val processEventForPushTask: ProcessEventForPushTask, private val pushRuleService: PushRuleService, private val presenceSyncHandler: PresenceSyncHandler, + private val clock: Clock, matrixConfiguration: MatrixConfiguration, ) { @@ -67,7 +72,6 @@ internal class SyncResponseHandler @Inject constructor( reporter: ProgressReporter? ) { val isInitialSync = fromToken == null - Timber.v("Start handling sync, is InitialSync: $isInitialSync") relevantPlugins.measureMetric { startCryptoService(isInitialSync) @@ -78,6 +82,25 @@ internal class SyncResponseHandler @Inject constructor( relevantPlugins.measureSpan("task", "handle_to_device") { handleToDevice(syncResponse) } + val syncLocalTimestampMillis = clock.epochMillis() + + // pass live state/crypto related event to crypto + syncResponse.rooms?.join?.entries?.map { (roomId, roomSync) -> + roomSync.state + ?.events + ?.filter { it.isStateEvent() } + ?.forEach { + cryptoService.onStateEvent(roomId, it) + } + + roomSync.timeline?.events?.forEach { + if (it.isEncrypted() && !isInitialSync) { + decryptIfNeeded(it, roomId) + } + it.ageLocalTs = syncLocalTimestampMillis - (it.unsignedData?.age ?: 0) + cryptoService.onLiveEvent(roomId, it, isInitialSync) + } + } val aggregator = SyncResponsePostTreatmentAggregator() @@ -101,6 +124,32 @@ internal class SyncResponseHandler @Inject constructor( } } + private suspend fun decryptIfNeeded(event: Event, roomId: String) { + try { + val timelineId = generateTimelineId(roomId) + // Event from sync does not have roomId, so add it to the event first + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching + val result = cryptoService.decryptEvent(event.copy(roomId = roomId), timelineId) + event.mxDecryptionResult = OlmDecryptionResult( + payload = result.clearEvent, + senderKey = result.senderCurve25519Key, + keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) }, + forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain, + isSafe = result.isSafe + ) + } catch (e: MXCryptoError) { + Timber.v(e, "Failed to decrypt $roomId") + if (e is MXCryptoError.Base) { + event.mCryptoError = e.errorType + event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription + } + } + } + + private fun generateTimelineId(roomId: String): String { + return "RoomSyncHandler$roomId" + } + private suspend fun startCryptoService(isInitialSync: Boolean) { relevantPlugins.measureSpan("task", "start_crypto_service") { measureTimeMillis { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 0c51f85730..263505ce37 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -19,10 +19,7 @@ package org.matrix.android.sdk.internal.session.sync.handler.room import dagger.Lazy import io.realm.Realm import io.realm.kotlin.createObject -import kotlinx.coroutines.runBlocking import org.matrix.android.sdk.api.crypto.MXCRYPTO_ALGORITHM_MEGOLM -import org.matrix.android.sdk.api.session.crypto.CryptoService -import org.matrix.android.sdk.api.session.crypto.MXCryptoError import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.EventType @@ -92,7 +89,6 @@ internal class RoomSyncHandler @Inject constructor( private val readReceiptHandler: ReadReceiptHandler, private val roomSummaryUpdater: RoomSummaryUpdater, private val roomAccountDataHandler: RoomSyncAccountDataHandler, - private val cryptoService: CryptoService, private val roomMemberEventHandler: RoomMemberEventHandler, private val roomTypingUsersHandler: RoomTypingUsersHandler, private val threadsAwarenessHandler: ThreadsAwarenessHandler, @@ -258,7 +254,6 @@ internal class RoomSyncHandler @Inject constructor( root = eventEntity } // Give info to crypto module - cryptoService.onStateEvent(roomId, event) roomMemberEventHandler.handle(realm, roomId, event, isInitialSync, aggregator) } } @@ -423,15 +418,9 @@ internal class RoomSyncHandler @Inject constructor( val isInitialSync = insertType == EventInsertType.INITIAL_SYNC eventIds.add(event.eventId) + liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync) - if (event.isEncrypted() && !isInitialSync) { - try { - decryptIfNeeded(event, roomId) - } catch (e: InterruptedException) { - Timber.i("Decryption got interrupted") - } - } var contentToInject: String? = null if (!isInitialSync) { contentToInject = threadsAwarenessHandler.makeEventThreadAware(realm, roomId, event) @@ -486,7 +475,9 @@ internal class RoomSyncHandler @Inject constructor( } } // Give info to crypto module - cryptoService.onLiveEvent(roomEntity.roomId, event, isInitialSync) +// runBlocking { +// cryptoService.onLiveEvent(roomEntity.roomId, event, isInitialSync) +// } // Try to remove local echo event.unsignedData?.transactionId?.also { txId -> @@ -567,31 +558,6 @@ internal class RoomSyncHandler @Inject constructor( } } - private fun decryptIfNeeded(event: Event, roomId: String) { - try { - val timelineId = generateTimelineId(roomId) - // Event from sync does not have roomId, so add it to the event first - // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching - val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), timelineId) } - event.mxDecryptionResult = OlmDecryptionResult( - payload = result.clearEvent, - senderKey = result.senderCurve25519Key, - keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) }, - forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain, - isSafe = result.isSafe - ) - } catch (e: MXCryptoError) { - if (e is MXCryptoError.Base) { - event.mCryptoError = e.errorType - event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription - } - } - } - - private fun generateTimelineId(roomId: String): String { - return "RoomSyncHandler$roomId" - } - data class EphemeralResult( val typingUserIds: List = emptyList() ) diff --git a/matrix-sdk-android/src/rustCrypto/java/org/matrix/android/sdk/internal/crypto/RustCryptoService.kt b/matrix-sdk-android/src/rustCrypto/java/org/matrix/android/sdk/internal/crypto/RustCryptoService.kt index dc7159ae8e..836f4159d1 100755 --- a/matrix-sdk-android/src/rustCrypto/java/org/matrix/android/sdk/internal/crypto/RustCryptoService.kt +++ b/matrix-sdk-android/src/rustCrypto/java/org/matrix/android/sdk/internal/crypto/RustCryptoService.kt @@ -137,7 +137,7 @@ internal class RustCryptoService @Inject constructor( private val isStarting = AtomicBoolean(false) private val isStarted = AtomicBoolean(false) - override fun onStateEvent(roomId: String, event: Event) { + override suspend fun onStateEvent(roomId: String, event: Event) { when (event.type) { EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event) @@ -145,7 +145,7 @@ internal class RustCryptoService @Inject constructor( } } - override fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { + override suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { if (event.isStateEvent()) { when (event.getClearType()) { EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) @@ -153,9 +153,7 @@ internal class RustCryptoService @Inject constructor( EventType.STATE_ROOM_HISTORY_VISIBILITY -> onRoomHistoryVisibilityEvent(roomId, event) } } else { - cryptoCoroutineScope.launch { - verificationService.onEvent(roomId, event) - } + verificationService.onEvent(roomId, event) } } @@ -487,7 +485,7 @@ internal class RustCryptoService @Inject constructor( * * @param event the encryption event. */ - private fun onRoomEncryptionEvent(roomId: String, event: Event) { + private suspend fun onRoomEncryptionEvent(roomId: String, event: Event) { if (!event.isStateEvent()) { // Ignore Timber.tag(loggerTag.value).w("Invalid encryption event") @@ -495,7 +493,7 @@ internal class RustCryptoService @Inject constructor( } // Do not load members here, would defeat lazy loading - cryptoCoroutineScope.launch(coroutineDispatchers.crypto) { +// cryptoCoroutineScope.launch(coroutineDispatchers.crypto) { // val params = LoadRoomMembersTask.Params(roomId) // try { // loadRoomMembersTask.execute(params) @@ -505,7 +503,7 @@ internal class RustCryptoService @Inject constructor( val userIds = getRoomUserIds(roomId) setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), userIds) // } - } +// } } override fun onE2ERoomMemberLoadedFromServer(roomId: String) { @@ -530,7 +528,7 @@ internal class RustCryptoService @Inject constructor( * * @param event the membership event causing the change */ - private fun onRoomMembershipEvent(roomId: String, event: Event) { + private suspend fun onRoomMembershipEvent(roomId: String, event: Event) { // We only care about the memberships if this room is encrypted if (!isRoomEncrypted(roomId)) { return @@ -551,9 +549,7 @@ internal class RustCryptoService @Inject constructor( // know what other servers are in the room at the time they've been invited. // They therefore will not send device updates if a user logs in whilst // their state is invite. - cryptoCoroutineScope.launch { olmMachine.updateTrackedUsers(listOf(userId)) - } } else { // nop } @@ -699,7 +695,9 @@ internal class RustCryptoService @Inject constructor( } override fun enableShareKeyOnInvite(enable: Boolean) { - TODO("Enable share key on invite not implemented") + if (enable) { + TODO("Enable share key on invite not implemented") + } } override fun isShareKeysOnInviteEnabled(): Boolean {