Split network request `/keys/query` into smaller requests (250 users max) (#2925)
This commit is contained in:
parent
3078adf0da
commit
9946ba8aa4
|
@ -5,7 +5,7 @@ Features ✨:
|
|||
-
|
||||
|
||||
Improvements 🙌:
|
||||
-
|
||||
- Split network request `/keys/query` into smaller requests (250 users max) (#2925)
|
||||
|
||||
Bugfix 🐛:
|
||||
-
|
||||
|
|
|
@ -16,13 +16,21 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.crypto.tasks
|
||||
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.joinAll
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import org.matrix.android.sdk.internal.crypto.api.CryptoApi
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.DeviceKeysWithUnsigned
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryBody
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.KeysQueryResponse
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.RestKeyInfo
|
||||
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
|
||||
import org.matrix.android.sdk.internal.network.executeRequest
|
||||
import org.matrix.android.sdk.internal.task.Task
|
||||
import javax.inject.Inject
|
||||
import kotlin.math.ceil
|
||||
|
||||
internal interface DownloadKeysForUsersTask : Task<DownloadKeysForUsersTask.Params, KeysQueryResponse> {
|
||||
data class Params(
|
||||
|
@ -39,15 +47,56 @@ internal class DefaultDownloadKeysForUsers @Inject constructor(
|
|||
) : DownloadKeysForUsersTask {
|
||||
|
||||
override suspend fun execute(params: DownloadKeysForUsersTask.Params): KeysQueryResponse {
|
||||
val downloadQuery = params.userIds.associateWith { emptyList<String>() }
|
||||
val numberOfChunks = ceil(params.userIds.size / LIMIT.toDouble()).toInt().coerceAtLeast(1)
|
||||
val chunkSize = params.userIds.size / numberOfChunks
|
||||
|
||||
val body = KeysQueryBody(
|
||||
deviceKeys = downloadQuery,
|
||||
token = params.token?.takeIf { it.isNotEmpty() }
|
||||
)
|
||||
// Store server results in these mutable maps
|
||||
val deviceKeys = mutableMapOf<String, Map<String, DeviceKeysWithUnsigned>>()
|
||||
val failures = mutableMapOf<String, Map<String, Any>>()
|
||||
val masterKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||
val selfSigningKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||
val userSigningKeys = mutableMapOf<String, RestKeyInfo?>()
|
||||
|
||||
return executeRequest(globalErrorReceiver) {
|
||||
apiCall = cryptoApi.downloadKeysForUsers(body)
|
||||
val mutex = Mutex()
|
||||
|
||||
// Split network request into smaller request (#2925)
|
||||
coroutineScope {
|
||||
params.userIds
|
||||
.chunked(chunkSize)
|
||||
.map {
|
||||
KeysQueryBody(
|
||||
deviceKeys = it.associateWith { emptyList() },
|
||||
token = params.token?.takeIf { token -> token.isNotEmpty() }
|
||||
)
|
||||
}
|
||||
.map { body ->
|
||||
async {
|
||||
val result = executeRequest<KeysQueryResponse>(globalErrorReceiver) {
|
||||
apiCall = cryptoApi.downloadKeysForUsers(body)
|
||||
}
|
||||
|
||||
mutex.withLock {
|
||||
deviceKeys.putAll(result.deviceKeys.orEmpty())
|
||||
failures.putAll(result.failures.orEmpty())
|
||||
masterKeys.putAll(result.masterKeys.orEmpty())
|
||||
selfSigningKeys.putAll(result.selfSigningKeys.orEmpty())
|
||||
userSigningKeys.putAll(result.userSigningKeys.orEmpty())
|
||||
}
|
||||
}
|
||||
}
|
||||
.joinAll()
|
||||
}
|
||||
|
||||
return KeysQueryResponse(
|
||||
deviceKeys = deviceKeys,
|
||||
failures = failures,
|
||||
masterKeys = masterKeys,
|
||||
selfSigningKeys = selfSigningKeys,
|
||||
userSigningKeys = userSigningKeys
|
||||
)
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val LIMIT = 250
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue