performance room key inserts in transactions to help improve performance

- adds explicit dispatcher contexts when interacting with the olm store
This commit is contained in:
Adam Brown 2022-09-03 13:51:14 +01:00
parent a860b4937b
commit d406586afa
7 changed files with 94 additions and 48 deletions

View File

@ -47,6 +47,10 @@ class OlmPersistenceWrapper(
olmPersistence.persist(sessionId, SerializedObject(inboundGroupSession.serialize()))
}
override suspend fun transaction(action: suspend () -> Unit) {
olmPersistence.startTransaction { action() }
}
override suspend fun readInbound(sessionId: SessionId): OlmInboundGroupSession? {
return olmPersistence.readInbound(sessionId)?.value?.deserialize()
}

View File

@ -12,6 +12,7 @@ interface OlmStore {
suspend fun read(): OlmAccount?
suspend fun persist(olmAccount: OlmAccount)
suspend fun transaction(action: suspend () -> Unit)
suspend fun readOutbound(roomId: RoomId): Pair<Long, OlmOutboundGroupSession>?
suspend fun persistOutbound(roomId: RoomId, creationTimestampUtc: Long, outboundGroupSession: OlmOutboundGroupSession)
suspend fun persistSession(identity: Curve25519, sessionId: SessionId, olmSession: OlmSession)

View File

@ -46,13 +46,16 @@ class OlmWrapper(
override suspend fun import(keys: List<SharedRoomKey>) {
interactWithOlm()
keys.forEach {
val inBound = when (it.isExported) {
true -> OlmInboundGroupSession.importSession(it.sessionKey)
false -> OlmInboundGroupSession(it.sessionKey)
olmStore.transaction {
keys.forEach {
val inBound = when (it.isExported) {
true -> OlmInboundGroupSession.importSession(it.sessionKey)
false -> OlmInboundGroupSession(it.sessionKey)
}
logger.crypto("import megolm ${it.sessionKey}")
olmStore.persist(it.sessionId, inBound)
}
logger.crypto("import megolm ${it.sessionKey}")
olmStore.persist(it.sessionId, inBound)
}
}

View File

@ -4,79 +4,113 @@ import app.dapk.db.DapkDb
import app.dapk.db.model.DbCryptoAccount
import app.dapk.db.model.DbCryptoMegolmInbound
import app.dapk.db.model.DbCryptoMegolmOutbound
import app.dapk.st.core.CoroutineDispatchers
import app.dapk.st.core.withIoContext
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.common.Curve25519
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.SessionId
import com.squareup.sqldelight.TransactionWithoutReturn
import kotlinx.coroutines.withContext
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class OlmPersistence(
private val database: DapkDb,
private val credentialsStore: CredentialsStore,
private val dispatchers: CoroutineDispatchers,
) {
suspend fun read(): String? {
return database.cryptoQueries
.selectAccount(credentialsStore.credentials()!!.userId.value)
.executeAsOneOrNull()
return dispatchers.withIoContext {
database.cryptoQueries
.selectAccount(credentialsStore.credentials()!!.userId.value)
.executeAsOneOrNull()
}
}
suspend fun persist(olmAccount: SerializedObject) {
database.cryptoQueries.insertAccount(
DbCryptoAccount(
user_id = credentialsStore.credentials()!!.userId.value,
blob = olmAccount.value
dispatchers.withIoContext {
database.cryptoQueries.insertAccount(
DbCryptoAccount(
user_id = credentialsStore.credentials()!!.userId.value,
blob = olmAccount.value
)
)
)
}
}
suspend fun readOutbound(roomId: RoomId): Pair<Long, String>? {
return database.cryptoQueries
.selectMegolmOutbound(roomId.value)
.executeAsOneOrNull()?.let {
it.utcEpochMillis to it.blob
}
return dispatchers.withIoContext {
database.cryptoQueries
.selectMegolmOutbound(roomId.value)
.executeAsOneOrNull()?.let {
it.utcEpochMillis to it.blob
}
}
}
suspend fun persistOutbound(roomId: RoomId, creationTimestampUtc: Long, outboundGroupSession: SerializedObject) {
database.cryptoQueries.insertMegolmOutbound(
DbCryptoMegolmOutbound(
room_id = roomId.value,
blob = outboundGroupSession.value,
utcEpochMillis = creationTimestampUtc,
dispatchers.withIoContext {
database.cryptoQueries.insertMegolmOutbound(
DbCryptoMegolmOutbound(
room_id = roomId.value,
blob = outboundGroupSession.value,
utcEpochMillis = creationTimestampUtc,
)
)
)
}
}
suspend fun persistSession(identity: Curve25519, sessionId: SessionId, olmSession: SerializedObject) {
database.cryptoQueries.insertOlmSession(
identity_key = identity.value,
session_id = sessionId.value,
blob = olmSession.value,
)
withContext(dispatchers.io) {
database.cryptoQueries.insertOlmSession(
identity_key = identity.value,
session_id = sessionId.value,
blob = olmSession.value,
)
}
}
suspend fun readSessions(identities: List<Curve25519>): List<Pair<Curve25519, String>>? {
return database.cryptoQueries
.selectOlmSession(identities.map { it.value })
.executeAsList()
.map { Curve25519(it.identity_key) to it.blob }
.takeIf { it.isNotEmpty() }
return withContext(dispatchers.io) {
database.cryptoQueries
.selectOlmSession(identities.map { it.value })
.executeAsList()
.map { Curve25519(it.identity_key) to it.blob }
.takeIf { it.isNotEmpty() }
}
}
suspend fun startTransaction(action: suspend TransactionWithoutReturn.() -> Unit) {
val transaction = suspendCoroutine {
database.cryptoQueries.transaction(false) {
it.resume(this)
}
}
dispatchers.withIoContext {
action(transaction)
}
}
suspend fun persist(sessionId: SessionId, inboundGroupSession: SerializedObject) {
database.cryptoQueries.insertMegolmInbound(
DbCryptoMegolmInbound(
session_id = sessionId.value,
blob = inboundGroupSession.value
withContext(dispatchers.io) {
database.cryptoQueries.insertMegolmInbound(
DbCryptoMegolmInbound(
session_id = sessionId.value,
blob = inboundGroupSession.value
)
)
)
}
}
suspend fun readInbound(sessionId: SessionId): SerializedObject? {
return database.cryptoQueries
.selectMegolmInbound(sessionId.value)
.executeAsOneOrNull()
?.let { SerializedObject((it)) }
return withContext(dispatchers.io) {
database.cryptoQueries
.selectMegolmInbound(sessionId.value)
.executeAsOneOrNull()
?.let { SerializedObject((it)) }
}
}
}

View File

@ -39,7 +39,7 @@ class StoreModule(
fun applicationStore() = ApplicationPreferences(preferences)
fun olmStore() = OlmPersistence(database, credentialsStore())
fun olmStore() = OlmPersistence(database, credentialsStore(), coroutineDispatchers)
fun knownDevicesStore() = DevicePersistence(database, KnownDevicesCache(), coroutineDispatchers)
fun profileStore(): ProfileStore = ProfilePersistence(preferences)

View File

@ -1,5 +1,6 @@
package app.dapk.st.matrix.crypto.internal
import app.dapk.st.core.logP
import app.dapk.st.matrix.common.*
import app.dapk.st.matrix.crypto.Crypto
import app.dapk.st.matrix.crypto.CryptoService
@ -48,8 +49,12 @@ internal class DefaultCryptoService(
}
override suspend fun InputStream.importRoomKeys(password: String): List<RoomId> {
return with(roomKeyImporter) {
importRoomKeys(password) { importRoomKeys(it) }
return logP("import room keys") {
with(roomKeyImporter) {
importRoomKeys(password) {
importRoomKeys(it)
}
}
}
}
}

View File

@ -150,7 +150,6 @@ const incrementVersionFile = async (github, branchName) => {
name: updatedVersionName,
}
const encodedContentUpdate = Buffer.from(JSON.stringify(updatedVersionFile, null, 2)).toString('base64')
await github.rest.repos.createOrUpdateFileContents({
owner: config.owner,