From e149be9e0fe66a5fb89e8aeddac69b05f025c378 Mon Sep 17 00:00:00 2001 From: Valere Date: Mon, 26 Oct 2020 11:24:01 +0100 Subject: [PATCH] Offload Incoming Gossip to dedicated thread --- .../crypto/IncomingGossipingRequestManager.kt | 68 +++++++++++-------- .../internal/crypto/store/IMXCryptoStore.kt | 1 + .../crypto/store/db/RealmCryptoStore.kt | 22 ++++++ 3 files changed, 63 insertions(+), 28 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/IncomingGossipingRequestManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/IncomingGossipingRequestManager.kt index 8869e73432..5882205fca 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/IncomingGossipingRequestManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/IncomingGossipingRequestManager.kt @@ -38,6 +38,7 @@ import org.matrix.android.sdk.internal.session.SessionScope import org.matrix.android.sdk.internal.util.MatrixCoroutineDispatchers import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import timber.log.Timber +import java.util.concurrent.Executors import javax.inject.Inject @SessionScope @@ -52,6 +53,7 @@ internal class IncomingGossipingRequestManager @Inject constructor( private val coroutineDispatchers: MatrixCoroutineDispatchers, private val cryptoCoroutineScope: CoroutineScope) { + private val executor = Executors.newSingleThreadExecutor() // list of IncomingRoomKeyRequests/IncomingRoomKeyRequestCancellations // we received in the current sync. private val receivedGossipingRequests = ArrayList() @@ -108,8 +110,8 @@ internal class IncomingGossipingRequestManager @Inject constructor( // ignore, it was sent by me as * Timber.v("## GOSSIP onGossipingRequestEvent type ${event.type} ignore remote echo") } else { - // save in DB - cryptoStore.storeIncomingGossipingRequest(it, ageLocalTs) +// // save in DB +// cryptoStore.storeIncomingGossipingRequest(it, ageLocalTs) receivedGossipingRequests.add(it) } } @@ -119,7 +121,7 @@ internal class IncomingGossipingRequestManager @Inject constructor( // ignore, it was sent by me as * Timber.v("## GOSSIP onGossipingRequestEvent type ${event.type} ignore remote echo") } else { - cryptoStore.storeIncomingGossipingRequest(it, ageLocalTs) +// cryptoStore.storeIncomingGossipingRequest(it, ageLocalTs) receivedGossipingRequests.add(it) } } @@ -144,13 +146,8 @@ internal class IncomingGossipingRequestManager @Inject constructor( fun processReceivedGossipingRequests() { val roomKeyRequestsToProcess = receivedGossipingRequests.toList() receivedGossipingRequests.clear() - for (request in roomKeyRequestsToProcess) { - if (request is IncomingRoomKeyRequest) { - processIncomingRoomKeyRequest(request) - } else if (request is IncomingSecretShareRequest) { - processIncomingSecretShareRequest(request) - } - } + + Timber.v("## CRYPTO | GOSSIP processReceivedGossipingRequests() : ${roomKeyRequestsToProcess.size} request to process") var receivedRequestCancellations: List? = null @@ -161,27 +158,42 @@ internal class IncomingGossipingRequestManager @Inject constructor( } } - receivedRequestCancellations?.forEach { request -> - Timber.v("## CRYPTO | GOSSIP processReceivedGossipingRequests() : m.room_key_request cancellation $request") - // we should probably only notify the app of cancellations we told it - // about, but we don't currently have a record of that, so we just pass - // everything through. - if (request.userId == credentials.userId && request.deviceId == credentials.deviceId) { - // ignore remote echo - return@forEach - } - val matchingIncoming = cryptoStore.getIncomingRoomKeyRequest(request.userId ?: "", request.deviceId ?: "", request.requestId ?: "") - if (matchingIncoming == null) { - // ignore that? - return@forEach - } else { - // If it was accepted from this device, keep the information, do not mark as cancelled - if (matchingIncoming.state != GossipingRequestState.ACCEPTED) { - onRoomKeyRequestCancellation(request) - cryptoStore.updateGossipingRequestState(request, GossipingRequestState.CANCELLED_BY_REQUESTER) + + + executor.execute { + cryptoStore.storeIncomingGossipingRequests(roomKeyRequestsToProcess) + for (request in roomKeyRequestsToProcess) { + if (request is IncomingRoomKeyRequest) { + processIncomingRoomKeyRequest(request) + } else if (request is IncomingSecretShareRequest) { + processIncomingSecretShareRequest(request) } } + + receivedRequestCancellations?.forEach { request -> + Timber.v("## CRYPTO | GOSSIP processReceivedGossipingRequests() : m.room_key_request cancellation $request") + // we should probably only notify the app of cancellations we told it + // about, but we don't currently have a record of that, so we just pass + // everything through. + if (request.userId == credentials.userId && request.deviceId == credentials.deviceId) { + // ignore remote echo + return@forEach + } + val matchingIncoming = cryptoStore.getIncomingRoomKeyRequest(request.userId ?: "", request.deviceId ?: "", request.requestId ?: "") + if (matchingIncoming == null) { + // ignore that? + return@forEach + } else { + // If it was accepted from this device, keep the information, do not mark as cancelled + if (matchingIncoming.state != GossipingRequestState.ACCEPTED) { + onRoomKeyRequestCancellation(request) + cryptoStore.updateGossipingRequestState(request, GossipingRequestState.CANCELLED_BY_REQUESTER) + } + } + } + } + } private fun processIncomingRoomKeyRequest(request: IncomingRoomKeyRequest) { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/IMXCryptoStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/IMXCryptoStore.kt index 0ae1e69124..a43faa2cd8 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/IMXCryptoStore.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/IMXCryptoStore.kt @@ -127,6 +127,7 @@ internal interface IMXCryptoStore { fun getPendingIncomingGossipingRequests(): List fun storeIncomingGossipingRequest(request: IncomingShareRequestCommon, ageLocalTS: Long?) + fun storeIncomingGossipingRequests(request: List) // fun getPendingIncomingSecretShareRequests(): List /** diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/db/RealmCryptoStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/db/RealmCryptoStore.kt index b25349cba9..c0b538963d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/db/RealmCryptoStore.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/store/db/RealmCryptoStore.kt @@ -1284,6 +1284,28 @@ internal class RealmCryptoStore @Inject constructor( } } + override fun storeIncomingGossipingRequests(requests: List) { + doRealmTransactionAsync(realmConfiguration) { realm -> + requests.forEach { request -> + // After a clear cache, we might have a + realm.createObject(IncomingGossipingRequestEntity::class.java).let { + it.otherDeviceId = request.deviceId + it.otherUserId = request.userId + it.requestId = request.requestId ?: "" + it.requestState = GossipingRequestState.PENDING + it.localCreationTimestamp = request.localCreationTimestamp ?: System.currentTimeMillis() + if (request is IncomingSecretShareRequest) { + it.type = GossipRequestType.SECRET + it.requestedInfoStr = request.secretName + } else if (request is IncomingRoomKeyRequest) { + it.type = GossipRequestType.KEY + it.requestedInfoStr = request.requestBody?.toJson() + } + } + } + } + } + // override fun getPendingIncomingSecretShareRequests(): List { // return doRealmQueryAndCopyList(realmConfiguration) { // it.where()