From 894592870bc6e58676685d7f388b49fc0aea1249 Mon Sep 17 00:00:00 2001 From: Adam Brown Date: Tue, 27 Sep 2022 19:05:07 +0100 Subject: [PATCH 1/4] using correct data param when creating device crypto --- domains/olm/src/main/kotlin/app/dapk/st/olm/OlmWrapper.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/domains/olm/src/main/kotlin/app/dapk/st/olm/OlmWrapper.kt b/domains/olm/src/main/kotlin/app/dapk/st/olm/OlmWrapper.kt index ad6a6e6..50c6548 100644 --- a/domains/olm/src/main/kotlin/app/dapk/st/olm/OlmWrapper.kt +++ b/domains/olm/src/main/kotlin/app/dapk/st/olm/OlmWrapper.kt @@ -170,6 +170,8 @@ class OlmWrapper( val inBound = OlmInboundGroupSession(roomCryptoSession.key) olmStore.persist(roomCryptoSession.id, inBound) + logger.crypto("Creating megolm: ${roomCryptoSession.id}") + return roomCryptoSession } @@ -181,7 +183,7 @@ class OlmWrapper( private suspend fun deviceCrypto(input: OlmSessionInput): DeviceCryptoSession? { return olmStore.readSessions(listOf(input.identity))?.let { DeviceCryptoSession( - input.deviceId, input.userId, input.identity, input.fingerprint, it + input.deviceId, input.userId, input.identity, input.fingerprint, it.map { it.second } ) } } From 1c3766748568749c2f300ee60914540384403bd7 Mon Sep 17 00:00:00 2001 From: Adam Brown Date: Tue, 27 Sep 2022 19:09:21 +0100 Subject: [PATCH 2/4] using a single sync flow instance to reduce memory and risk of concurrent flows --- .../matrix/sync/internal/sync/SyncUseCase.kt | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/internal/sync/SyncUseCase.kt b/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/internal/sync/SyncUseCase.kt index dd4301b..42a3b69 100644 --- a/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/internal/sync/SyncUseCase.kt +++ b/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/internal/sync/SyncUseCase.kt @@ -8,7 +8,6 @@ import app.dapk.st.matrix.sync.internal.SideEffectFlowIterator import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase import app.dapk.st.matrix.sync.internal.request.syncRequest import app.dapk.st.matrix.sync.internal.room.SyncSideEffects -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.flow.flow @@ -25,19 +24,17 @@ internal class SyncUseCase( private val syncConfig: SyncConfig, ) { - fun sync(): Flow { - return flow { - val credentials = credentialsStore.credentials()!! - val filterId = filterUseCase.reducedFilter(credentials.userId) - with(flowIterator) { - loop( - initial = null, - onPost = { emit(Unit) }, - onIteration = { onEachSyncIteration(filterId, credentials, previousState = it) } - ) - } - }.cancellable() - } + private val _flow = flow { + val credentials = credentialsStore.credentials()!! + val filterId = filterUseCase.reducedFilter(credentials.userId) + with(flowIterator) { + loop( + initial = null, + onPost = { emit(Unit) }, + onIteration = { onEachSyncIteration(filterId, credentials, previousState = it) } + ) + } + }.cancellable() private suspend fun onEachSyncIteration(filterId: SyncService.FilterId, credentials: UserCredentials, previousState: OverviewState?): OverviewState? { val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview) @@ -85,4 +82,6 @@ internal class SyncUseCase( ) } + fun sync() = _flow + } From ef41f13a7b1224b673d4470694650a1074242ebb Mon Sep 17 00:00:00 2001 From: Adam Brown Date: Tue, 27 Sep 2022 20:41:32 +0100 Subject: [PATCH 3/4] adding entry point for extending services --- .../kotlin/app/dapk/st/matrix/MatrixClient.kt | 6 +++- .../app/dapk/st/matrix/ServiceInstaller.kt | 28 +++++++++++++++---- .../app/dapk/st/matrix/auth/AuthService.kt | 5 ++-- .../dapk/st/matrix/crypto/CryptoService.kt | 9 ++---- .../dapk/st/matrix/device/DeviceService.kt | 5 ++-- .../dapk/st/matrix/message/MessageService.kt | 9 ++---- .../app/dapk/st/matrix/room/ProfileService.kt | 5 ++-- .../app/dapk/st/matrix/push/PushService.kt | 5 ++-- .../app/dapk/st/matrix/room/RoomService.kt | 9 ++---- .../app/dapk/st/matrix/sync/SyncService.kt | 9 ++---- 10 files changed, 51 insertions(+), 39 deletions(-) diff --git a/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/MatrixClient.kt b/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/MatrixClient.kt index 24294eb..695dff4 100644 --- a/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/MatrixClient.kt +++ b/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/MatrixClient.kt @@ -43,7 +43,11 @@ data class ServiceDependencies( interface MatrixServiceInstaller { fun serializers(builder: SerializersModuleBuilder.() -> Unit) - fun install(factory: MatrixService.Factory) + fun install(factory: MatrixService.Factory): InstallExtender +} + +interface InstallExtender { + fun proxy(proxy: (T) -> T) } interface MatrixServiceProvider { diff --git a/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/ServiceInstaller.kt b/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/ServiceInstaller.kt index 2e59c9c..a2d6a1c 100644 --- a/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/ServiceInstaller.kt +++ b/matrix/matrix/src/main/kotlin/app/dapk/st/matrix/ServiceInstaller.kt @@ -11,15 +11,22 @@ internal class ServiceInstaller { private val services = mutableMapOf() private val serviceInstaller = object : MatrixServiceInstaller { - val serviceCollector = mutableListOf() + val serviceCollector = mutableListOf MatrixService>>() val serializers = mutableListOf Unit>() override fun serializers(builder: SerializersModuleBuilder.() -> Unit) { serializers.add(builder) } - override fun install(factory: MatrixService.Factory) { - serviceCollector.add(factory) + override fun install(factory: MatrixService.Factory): InstallExtender { + val mutableProxy = MutableProxy() + return object : InstallExtender { + override fun proxy(proxy: (T) -> T) { + mutableProxy.value = proxy + } + }.also { + serviceCollector.add(factory to mutableProxy) + } } } @@ -39,9 +46,9 @@ internal class ServiceInstaller { val serviceProvider = object : MatrixServiceProvider { override fun getService(key: ServiceKey) = this@ServiceInstaller.getService(key) } - serviceInstaller.serviceCollector.forEach { - val (key, service) = it.create(ServiceDependencies(httpClient, json, serviceProvider, logger)) - services[key] = service + serviceInstaller.serviceCollector.forEach { (factory, extender) -> + val (key, service) = factory.create(ServiceDependencies(httpClient, json, serviceProvider, logger)) + services[key] = extender(service) } } @@ -57,4 +64,13 @@ internal class ServiceInstaller { ?: throw IllegalArgumentException("No service available to handle ${task.type}") } +} + +internal class MutableProxy : (MatrixService) -> MatrixService { + + var value: (T) -> T = { it } + + @Suppress("UNCHECKED_CAST") + override fun invoke(service: MatrixService) = value(service as T) + } \ No newline at end of file diff --git a/matrix/services/auth/src/main/kotlin/app/dapk/st/matrix/auth/AuthService.kt b/matrix/services/auth/src/main/kotlin/app/dapk/st/matrix/auth/AuthService.kt index 95db7f4..348df96 100644 --- a/matrix/services/auth/src/main/kotlin/app/dapk/st/matrix/auth/AuthService.kt +++ b/matrix/services/auth/src/main/kotlin/app/dapk/st/matrix/auth/AuthService.kt @@ -1,5 +1,6 @@ package app.dapk.st.matrix.auth +import app.dapk.st.matrix.InstallExtender import app.dapk.st.matrix.MatrixClient import app.dapk.st.matrix.MatrixService import app.dapk.st.matrix.MatrixServiceInstaller @@ -25,8 +26,8 @@ interface AuthService : MatrixService { fun MatrixServiceInstaller.installAuthService( credentialsStore: CredentialsStore, -) { - this.install { (httpClient, json) -> +): InstallExtender { + return this.install { (httpClient, json) -> SERVICE_KEY to DefaultAuthService(httpClient, credentialsStore, json) } } diff --git a/matrix/services/crypto/src/main/kotlin/app/dapk/st/matrix/crypto/CryptoService.kt b/matrix/services/crypto/src/main/kotlin/app/dapk/st/matrix/crypto/CryptoService.kt index becaf3b..ba5936b 100644 --- a/matrix/services/crypto/src/main/kotlin/app/dapk/st/matrix/crypto/CryptoService.kt +++ b/matrix/services/crypto/src/main/kotlin/app/dapk/st/matrix/crypto/CryptoService.kt @@ -2,10 +2,7 @@ package app.dapk.st.matrix.crypto import app.dapk.st.core.Base64 import app.dapk.st.core.CoroutineDispatchers -import app.dapk.st.matrix.MatrixService -import app.dapk.st.matrix.MatrixServiceInstaller -import app.dapk.st.matrix.MatrixServiceProvider -import app.dapk.st.matrix.ServiceDepFactory +import app.dapk.st.matrix.* import app.dapk.st.matrix.common.* import app.dapk.st.matrix.crypto.internal.* import app.dapk.st.matrix.device.deviceService @@ -136,8 +133,8 @@ fun MatrixServiceInstaller.installCryptoService( roomMembersProvider: ServiceDepFactory, base64: Base64, coroutineDispatchers: CoroutineDispatchers, -) { - this.install { (_, _, services, logger) -> +): InstallExtender { + return this.install { (_, _, services, logger) -> val deviceService = services.deviceService() val accountCryptoUseCase = FetchAccountCryptoUseCaseImpl(credentialsStore, olm, deviceService) diff --git a/matrix/services/device/src/main/kotlin/app/dapk/st/matrix/device/DeviceService.kt b/matrix/services/device/src/main/kotlin/app/dapk/st/matrix/device/DeviceService.kt index c69d8d4..1244c65 100644 --- a/matrix/services/device/src/main/kotlin/app/dapk/st/matrix/device/DeviceService.kt +++ b/matrix/services/device/src/main/kotlin/app/dapk/st/matrix/device/DeviceService.kt @@ -1,5 +1,6 @@ package app.dapk.st.matrix.device +import app.dapk.st.matrix.InstallExtender import app.dapk.st.matrix.MatrixService import app.dapk.st.matrix.MatrixServiceInstaller import app.dapk.st.matrix.MatrixServiceProvider @@ -122,8 +123,8 @@ sealed class ToDevicePayload { sealed interface VerificationPayload } -fun MatrixServiceInstaller.installEncryptionService(knownDeviceStore: KnownDeviceStore) { - this.install { (httpClient, _, _, logger) -> +fun MatrixServiceInstaller.installEncryptionService(knownDeviceStore: KnownDeviceStore): InstallExtender { + return this.install { (httpClient, _, _, logger) -> SERVICE_KEY to DefaultDeviceService(httpClient, logger, knownDeviceStore) } } diff --git a/matrix/services/message/src/main/kotlin/app/dapk/st/matrix/message/MessageService.kt b/matrix/services/message/src/main/kotlin/app/dapk/st/matrix/message/MessageService.kt index 23fdb98..35b6297 100644 --- a/matrix/services/message/src/main/kotlin/app/dapk/st/matrix/message/MessageService.kt +++ b/matrix/services/message/src/main/kotlin/app/dapk/st/matrix/message/MessageService.kt @@ -1,10 +1,7 @@ package app.dapk.st.matrix.message import app.dapk.st.core.Base64 -import app.dapk.st.matrix.MatrixService -import app.dapk.st.matrix.MatrixServiceInstaller -import app.dapk.st.matrix.MatrixServiceProvider -import app.dapk.st.matrix.ServiceDepFactory +import app.dapk.st.matrix.* import app.dapk.st.matrix.common.AlgorithmName import app.dapk.st.matrix.common.EventId import app.dapk.st.matrix.common.MessageType @@ -132,8 +129,8 @@ fun MatrixServiceInstaller.installMessageService( imageContentReader: ImageContentReader, messageEncrypter: ServiceDepFactory = ServiceDepFactory { MissingMessageEncrypter }, mediaEncrypter: ServiceDepFactory = ServiceDepFactory { MissingMediaEncrypter }, -) { - this.install { (httpClient, _, installedServices) -> +): InstallExtender { + return this.install { (httpClient, _, installedServices) -> SERVICE_KEY to DefaultMessageService( httpClient, localEchoStore, diff --git a/matrix/services/profile/src/main/kotlin/app/dapk/st/matrix/room/ProfileService.kt b/matrix/services/profile/src/main/kotlin/app/dapk/st/matrix/room/ProfileService.kt index 28ba329..73e4768 100644 --- a/matrix/services/profile/src/main/kotlin/app/dapk/st/matrix/room/ProfileService.kt +++ b/matrix/services/profile/src/main/kotlin/app/dapk/st/matrix/room/ProfileService.kt @@ -1,6 +1,7 @@ package app.dapk.st.matrix.room import app.dapk.st.core.SingletonFlows +import app.dapk.st.matrix.InstallExtender import app.dapk.st.matrix.MatrixService import app.dapk.st.matrix.MatrixServiceInstaller import app.dapk.st.matrix.MatrixServiceProvider @@ -29,8 +30,8 @@ fun MatrixServiceInstaller.installProfileService( profileStore: ProfileStore, singletonFlows: SingletonFlows, credentialsStore: CredentialsStore, -) { - this.install { (httpClient, _, _, logger) -> +): InstallExtender { + return this.install { (httpClient, _, _, logger) -> SERVICE_KEY to DefaultProfileService(httpClient, logger, profileStore, singletonFlows, credentialsStore) } } diff --git a/matrix/services/push/src/main/kotlin/app/dapk/st/matrix/push/PushService.kt b/matrix/services/push/src/main/kotlin/app/dapk/st/matrix/push/PushService.kt index 5402ed3..34026a6 100644 --- a/matrix/services/push/src/main/kotlin/app/dapk/st/matrix/push/PushService.kt +++ b/matrix/services/push/src/main/kotlin/app/dapk/st/matrix/push/PushService.kt @@ -1,5 +1,6 @@ package app.dapk.st.matrix.push +import app.dapk.st.matrix.InstallExtender import app.dapk.st.matrix.MatrixClient import app.dapk.st.matrix.MatrixService import app.dapk.st.matrix.MatrixServiceInstaller @@ -38,8 +39,8 @@ interface PushService : MatrixService { fun MatrixServiceInstaller.installPushService( credentialsStore: CredentialsStore, -) { - this.install { (httpClient, _, _, logger) -> +): InstallExtender { + return this.install { (httpClient, _, _, logger) -> SERVICE_KEY to DefaultPushService(httpClient, credentialsStore, logger) } } diff --git a/matrix/services/room/src/main/kotlin/app/dapk/st/matrix/room/RoomService.kt b/matrix/services/room/src/main/kotlin/app/dapk/st/matrix/room/RoomService.kt index 1f933a9..56ba0a1 100644 --- a/matrix/services/room/src/main/kotlin/app/dapk/st/matrix/room/RoomService.kt +++ b/matrix/services/room/src/main/kotlin/app/dapk/st/matrix/room/RoomService.kt @@ -1,9 +1,6 @@ package app.dapk.st.matrix.room -import app.dapk.st.matrix.MatrixService -import app.dapk.st.matrix.MatrixServiceInstaller -import app.dapk.st.matrix.MatrixServiceProvider -import app.dapk.st.matrix.ServiceDepFactory +import app.dapk.st.matrix.* import app.dapk.st.matrix.common.EventId import app.dapk.st.matrix.common.RoomId import app.dapk.st.matrix.common.RoomMember @@ -42,8 +39,8 @@ fun MatrixServiceInstaller.installRoomService( memberStore: MemberStore, roomMessenger: ServiceDepFactory, roomInviteRemover: RoomInviteRemover, -) { - this.install { (httpClient, _, services, logger) -> +): InstallExtender { + return this.install { (httpClient, _, services, logger) -> SERVICE_KEY to DefaultRoomService( httpClient, logger, diff --git a/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/SyncService.kt b/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/SyncService.kt index f0c8530..d0487d3 100644 --- a/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/SyncService.kt +++ b/matrix/services/sync/src/main/kotlin/app/dapk/st/matrix/sync/SyncService.kt @@ -2,10 +2,7 @@ package app.dapk.st.matrix.sync import app.dapk.st.core.CoroutineDispatchers import app.dapk.st.core.extensions.ErrorTracker -import app.dapk.st.matrix.MatrixClient -import app.dapk.st.matrix.MatrixService -import app.dapk.st.matrix.MatrixServiceInstaller -import app.dapk.st.matrix.ServiceDepFactory +import app.dapk.st.matrix.* import app.dapk.st.matrix.common.* import app.dapk.st.matrix.sync.internal.DefaultSyncService import app.dapk.st.matrix.sync.internal.request.* @@ -49,7 +46,7 @@ fun MatrixServiceInstaller.installSyncService( errorTracker: ErrorTracker, coroutineDispatchers: CoroutineDispatchers, syncConfig: SyncConfig = SyncConfig(), -) { +): InstallExtender { this.serializers { polymorphicDefault(ApiTimelineEvent::class) { ApiTimelineEvent.Ignored.serializer() @@ -71,7 +68,7 @@ fun MatrixServiceInstaller.installSyncService( } } - this.install { (httpClient, json, services, logger) -> + return this.install { (httpClient, json, services, logger) -> SERVICE_KEY to DefaultSyncService( httpClient = httpClient, syncStore = syncStore, From 69e7dfd90a81fbcd7667d266bdffda80cd6dcd1a Mon Sep 17 00:00:00 2001 From: Adam Brown Date: Tue, 27 Sep 2022 21:13:21 +0100 Subject: [PATCH 4/4] using proxy to allow the encryption tests to wait for keys to be uploaded, re-enabling the 2nd device user tests --- test-harness/src/test/kotlin/SmokeTest.kt | 26 +++++++++--------- .../src/test/kotlin/test/TestMatrix.kt | 27 +++++++++++++++++-- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/test-harness/src/test/kotlin/SmokeTest.kt b/test-harness/src/test/kotlin/SmokeTest.kt index cccb5c8..488680c 100644 --- a/test-harness/src/test/kotlin/SmokeTest.kt +++ b/test-harness/src/test/kotlin/SmokeTest.kt @@ -123,18 +123,20 @@ class SmokeTest { alice.expectTextMessage(SharedState.sharedRoom, message2) // Needs investigation -// val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true, withLogging = true).also { it.newlogin() } -// aliceSecondDevice.client.syncService().startSyncing().collectAsync { -// val message3 = "from alice to bob and alice's second device".from(SharedState.alice.roomMember) -// alice.sendTextMessage(SharedState.sharedRoom, message3.content, isEncrypted) -// aliceSecondDevice.expectTextMessage(SharedState.sharedRoom, message3) -// bob.expectTextMessage(SharedState.sharedRoom, message3) -// -// val message4 = "from alice's second device to bob and alice's first device".from(SharedState.alice.roomMember) -// aliceSecondDevice.sendTextMessage(SharedState.sharedRoom, message4.content, isEncrypted) -// alice.expectTextMessage(SharedState.sharedRoom, message4) -// bob.expectTextMessage(SharedState.sharedRoom, message4) -// } + val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true, withLogging = true).also { it.newlogin() } + aliceSecondDevice.client.syncService().startSyncing().collectAsync { + aliceSecondDevice.client.proxyDeviceService().waitForOneTimeKeysToBeUploaded() + + val message3 = "from alice to bob and alice's second device".from(SharedState.alice.roomMember) + alice.sendTextMessage(SharedState.sharedRoom, message3.content, isEncrypted) + aliceSecondDevice.expectTextMessage(SharedState.sharedRoom, message3) + bob.expectTextMessage(SharedState.sharedRoom, message3) + + val message4 = "from alice's second device to bob and alice's first device".from(SharedState.alice.roomMember) + aliceSecondDevice.sendTextMessage(SharedState.sharedRoom, message4.content, isEncrypted) + alice.expectTextMessage(SharedState.sharedRoom, message4) + bob.expectTextMessage(SharedState.sharedRoom, message4) + } } } diff --git a/test-harness/src/test/kotlin/test/TestMatrix.kt b/test-harness/src/test/kotlin/test/TestMatrix.kt index 023a3b1..fce9c4f 100644 --- a/test-harness/src/test/kotlin/test/TestMatrix.kt +++ b/test-harness/src/test/kotlin/test/TestMatrix.kt @@ -14,6 +14,7 @@ import app.dapk.st.matrix.crypto.RoomMembersProvider import app.dapk.st.matrix.crypto.Verification import app.dapk.st.matrix.crypto.cryptoService import app.dapk.st.matrix.crypto.installCryptoService +import app.dapk.st.matrix.device.DeviceService import app.dapk.st.matrix.device.deviceService import app.dapk.st.matrix.device.installEncryptionService import app.dapk.st.matrix.http.ktor.KtorMatrixHttpClientFactory @@ -39,6 +40,7 @@ import test.impl.PrintingErrorTracking import java.io.File import java.time.Clock import javax.imageio.ImageIO +import kotlin.coroutines.resume object TestUsers { @@ -93,7 +95,9 @@ class TestMatrix( ).also { it.install { installAuthService(storeModule.credentialsStore()) - installEncryptionService(storeModule.knownDevicesStore()) + installEncryptionService(storeModule.knownDevicesStore()).proxy { + ProxyDeviceService(it) + } val olmAccountStore = OlmPersistenceWrapper(storeModule.olmStore(), base64) val olm = OlmWrapper( @@ -355,4 +359,23 @@ class JavaImageContentReader : ImageContentReader { override fun inputStream(uri: String) = File(uri).inputStream() -} \ No newline at end of file +} + +class ProxyDeviceService(private val deviceService: DeviceService) : DeviceService by deviceService { + + private var oneTimeKeysContinuation: (() -> Unit)? = null + + override suspend fun uploadOneTimeKeys(oneTimeKeys: DeviceService.OneTimeKeys) { + deviceService.uploadOneTimeKeys(oneTimeKeys) + oneTimeKeysContinuation?.invoke()?.also { oneTimeKeysContinuation = null } + } + + suspend fun waitForOneTimeKeysToBeUploaded() { + suspendCancellableCoroutine { continuation -> + oneTimeKeysContinuation = { continuation.resume(Unit) } + } + } + +} + +fun MatrixClient.proxyDeviceService() = this.deviceService() as ProxyDeviceService \ No newline at end of file