Merge pull request #3003 from vector-im/feature/bma/split_key_request
Split key request
This commit is contained in:
commit
0a854918c2
|
@ -5,6 +5,7 @@ Features ✨:
|
||||||
-
|
-
|
||||||
|
|
||||||
Improvements 🙌:
|
Improvements 🙌:
|
||||||
|
- Split network request `/keys/query` into smaller requests (250 users max) (#2925)
|
||||||
- Crypto improvement | Bulk send NO_OLM withheld code
|
- Crypto improvement | Bulk send NO_OLM withheld code
|
||||||
- Display the room shield in all room setting screens
|
- Display the room shield in all room setting screens
|
||||||
|
|
||||||
|
|
|
@ -16,17 +16,25 @@
|
||||||
|
|
||||||
package org.matrix.android.sdk.internal.crypto.tasks
|
package org.matrix.android.sdk.internal.crypto.tasks
|
||||||
|
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.coroutineScope
|
||||||
|
import kotlinx.coroutines.joinAll
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
import org.matrix.android.sdk.internal.crypto.api.CryptoApi
|
import org.matrix.android.sdk.internal.crypto.api.CryptoApi
|
||||||
|
import org.matrix.android.sdk.internal.crypto.model.rest.DeviceKeysWithUnsigned
|
||||||
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryBody
|
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryBody
|
||||||
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryResponse
|
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryResponse
|
||||||
|
import org.matrix.android.sdk.internal.crypto.model.rest.RestKeyInfo
|
||||||
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
|
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
|
||||||
import org.matrix.android.sdk.internal.network.executeRequest
|
import org.matrix.android.sdk.internal.network.executeRequest
|
||||||
import org.matrix.android.sdk.internal.task.Task
|
import org.matrix.android.sdk.internal.task.Task
|
||||||
|
import org.matrix.android.sdk.internal.util.computeBestChunkSize
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal interface DownloadKeysForUsersTask : Task<DownloadKeysForUsersTask.Params, KeysQueryResponse> {
|
internal interface DownloadKeysForUsersTask : Task<DownloadKeysForUsersTask.Params, KeysQueryResponse> {
|
||||||
data class Params(
|
data class Params(
|
||||||
// the list of users to get keys for.
|
// the list of users to get keys for. The list MUST NOT be empty
|
||||||
val userIds: List<String>,
|
val userIds: List<String>,
|
||||||
// the up-to token
|
// the up-to token
|
||||||
val token: String?
|
val token: String?
|
||||||
|
@ -39,15 +47,68 @@ internal class DefaultDownloadKeysForUsers @Inject constructor(
|
||||||
) : DownloadKeysForUsersTask {
|
) : DownloadKeysForUsersTask {
|
||||||
|
|
||||||
override suspend fun execute(params: DownloadKeysForUsersTask.Params): KeysQueryResponse {
|
override suspend fun execute(params: DownloadKeysForUsersTask.Params): KeysQueryResponse {
|
||||||
val downloadQuery = params.userIds.associateWith { emptyList<String>() }
|
val bestChunkSize = computeBestChunkSize(params.userIds.size, LIMIT)
|
||||||
|
val token = params.token?.takeIf { token -> token.isNotEmpty() }
|
||||||
|
|
||||||
val body = KeysQueryBody(
|
return if (bestChunkSize.shouldChunk()) {
|
||||||
deviceKeys = downloadQuery,
|
// Store server results in these mutable maps
|
||||||
token = params.token?.takeIf { it.isNotEmpty() }
|
val deviceKeys = mutableMapOf<String, Map<String, DeviceKeysWithUnsigned>>()
|
||||||
)
|
val failures = mutableMapOf<String, Map<String, Any>>()
|
||||||
|
val masterKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||||
|
val selfSigningKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||||
|
val userSigningKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||||
|
|
||||||
return executeRequest(globalErrorReceiver) {
|
val mutex = Mutex()
|
||||||
apiCall = cryptoApi.downloadKeysForUsers(body)
|
|
||||||
|
// Split network request into smaller request (#2925)
|
||||||
|
coroutineScope {
|
||||||
|
params.userIds
|
||||||
|
.chunked(bestChunkSize.chunkSize)
|
||||||
|
.map {
|
||||||
|
KeysQueryBody(
|
||||||
|
deviceKeys = it.associateWith { emptyList() },
|
||||||
|
token = token
|
||||||
|
)
|
||||||
|
}
|
||||||
|
.map { body ->
|
||||||
|
async {
|
||||||
|
val result = executeRequest<KeysQueryResponse>(globalErrorReceiver) {
|
||||||
|
apiCall = cryptoApi.downloadKeysForUsers(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
mutex.withLock {
|
||||||
|
deviceKeys.putAll(result.deviceKeys.orEmpty())
|
||||||
|
failures.putAll(result.failures.orEmpty())
|
||||||
|
masterKeys.putAll(result.masterKeys.orEmpty())
|
||||||
|
selfSigningKeys.putAll(result.selfSigningKeys.orEmpty())
|
||||||
|
userSigningKeys.putAll(result.userSigningKeys.orEmpty())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.joinAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
KeysQueryResponse(
|
||||||
|
deviceKeys = deviceKeys,
|
||||||
|
failures = failures,
|
||||||
|
masterKeys = masterKeys,
|
||||||
|
selfSigningKeys = selfSigningKeys,
|
||||||
|
userSigningKeys = userSigningKeys
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// No need to chunk, direct request
|
||||||
|
executeRequest(globalErrorReceiver) {
|
||||||
|
apiCall = cryptoApi.downloadKeysForUsers(
|
||||||
|
KeysQueryBody(
|
||||||
|
deviceKeys = params.userIds.associateWith { emptyList() },
|
||||||
|
token = token
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
const val LIMIT = 250
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,9 +64,9 @@ import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
||||||
|
import org.matrix.android.sdk.internal.util.computeBestChunkSize
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import kotlin.math.ceil
|
|
||||||
|
|
||||||
internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
|
internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
|
||||||
private val roomSummaryUpdater: RoomSummaryUpdater,
|
private val roomSummaryUpdater: RoomSummaryUpdater,
|
||||||
|
@ -140,17 +140,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
syncLocalTimeStampMillis: Long,
|
syncLocalTimeStampMillis: Long,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator,
|
aggregator: SyncResponsePostTreatmentAggregator,
|
||||||
reporter: ProgressReporter?) {
|
reporter: ProgressReporter?) {
|
||||||
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
val bestChunkSize = computeBestChunkSize(
|
||||||
val listSize = handlingStrategy.data.keys.size
|
listSize = handlingStrategy.data.keys.size,
|
||||||
val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt()
|
limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||||
|
)
|
||||||
|
|
||||||
if (numberOfChunks > 1) {
|
if (bestChunkSize.shouldChunk()) {
|
||||||
reportSubtask(reporter, InitSyncStep.ImportingAccountJoinedRooms, numberOfChunks, 0.6f) {
|
reportSubtask(reporter, InitSyncStep.ImportingAccountJoinedRooms, bestChunkSize.numberOfChunks, 0.6f) {
|
||||||
val chunkSize = listSize / numberOfChunks
|
Timber.d("INIT_SYNC ${handlingStrategy.data.keys.size} rooms to insert, split with $bestChunkSize")
|
||||||
Timber.d("INIT_SYNC $listSize rooms to insert, split into $numberOfChunks sublists of $chunkSize items")
|
|
||||||
// I cannot find a better way to chunk a map, so chunk the keys and then create new maps
|
// I cannot find a better way to chunk a map, so chunk the keys and then create new maps
|
||||||
handlingStrategy.data.keys
|
handlingStrategy.data.keys
|
||||||
.chunked(chunkSize)
|
.chunked(bestChunkSize.chunkSize)
|
||||||
.forEachIndexed { index, roomIds ->
|
.forEachIndexed { index, roomIds ->
|
||||||
val roomEntities = roomIds
|
val roomEntities = roomIds
|
||||||
.also { Timber.d("INIT_SYNC insert ${roomIds.size} rooms") }
|
.also { Timber.d("INIT_SYNC insert ${roomIds.size} rooms") }
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.matrix.android.sdk.internal.util
|
||||||
|
|
||||||
|
import kotlin.math.ceil
|
||||||
|
|
||||||
|
internal data class BestChunkSize(
|
||||||
|
val numberOfChunks: Int,
|
||||||
|
val chunkSize: Int
|
||||||
|
) {
|
||||||
|
fun shouldChunk() = numberOfChunks > 1
|
||||||
|
}
|
||||||
|
|
||||||
|
internal fun computeBestChunkSize(listSize: Int, limit: Int): BestChunkSize {
|
||||||
|
return if (listSize <= limit) {
|
||||||
|
BestChunkSize(
|
||||||
|
numberOfChunks = 1,
|
||||||
|
chunkSize = listSize
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
val numberOfChunks = ceil(listSize / limit.toDouble()).toInt()
|
||||||
|
// Round on next Int
|
||||||
|
val chunkSize = ceil(listSize / numberOfChunks.toDouble()).toInt()
|
||||||
|
|
||||||
|
BestChunkSize(
|
||||||
|
numberOfChunks = numberOfChunks,
|
||||||
|
chunkSize = chunkSize
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.matrix.android.sdk.internal.util
|
||||||
|
|
||||||
|
import org.amshove.kluent.shouldBeEqualTo
|
||||||
|
import org.amshove.kluent.shouldHaveSize
|
||||||
|
import org.junit.FixMethodOrder
|
||||||
|
import org.junit.Test
|
||||||
|
import org.junit.runners.MethodSorters
|
||||||
|
import org.matrix.android.sdk.MatrixTest
|
||||||
|
|
||||||
|
@FixMethodOrder(MethodSorters.JVM)
|
||||||
|
class MathUtilTest : MatrixTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize0() = doTest(0, 100, 1, 0)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize1to99() {
|
||||||
|
for (i in 1..99) {
|
||||||
|
doTest(i, 100, 1, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize100() = doTest(100, 100, 1, 100)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize101() = doTest(101, 100, 2, 51)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize199() = doTest(199, 100, 2, 100)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize200() = doTest(200, 100, 2, 100)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize201() = doTest(201, 100, 3, 67)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testComputeBestChunkSize240() = doTest(240, 100, 3, 80)
|
||||||
|
|
||||||
|
private fun doTest(listSize: Int, limit: Int, expectedNumberOfChunks: Int, expectedChunkSize: Int) {
|
||||||
|
val result = computeBestChunkSize(listSize, limit)
|
||||||
|
|
||||||
|
result.numberOfChunks shouldBeEqualTo expectedNumberOfChunks
|
||||||
|
result.chunkSize shouldBeEqualTo expectedChunkSize
|
||||||
|
|
||||||
|
// Test that the result make sense, when we use chunked()
|
||||||
|
if (result.chunkSize > 0) {
|
||||||
|
generateSequence { "a" }
|
||||||
|
.take(listSize)
|
||||||
|
.chunked(result.chunkSize)
|
||||||
|
.shouldHaveSize(result.numberOfChunks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue