Merge pull request #54 from ouchadam/message-tests

Separating initial sync vs cached sync loading states
This commit is contained in:
Adam Brown 2022-06-13 19:04:00 +01:00 committed by GitHub
commit 45f773e986
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 178 additions and 104 deletions

View File

@ -1,7 +1,6 @@
plugins { id 'kotlin' }
applyAndroidLibraryModule(project)
dependencies {
compileOnly project(":domains:android:stub")
implementation project(':core')
implementation project(':domains:android:core')
}

View File

@ -8,9 +8,7 @@ import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.*
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.*
@JvmInline
value class UnreadCount(val value: Int)
@ -32,23 +30,34 @@ class DirectoryUseCase(
) {
fun state(): Flow<DirectoryState> {
return combine(
syncService.startSyncing().map { credentialsStore.credentials()!!.userId },
syncService.overview(),
messageService.localEchos(),
roomStore.observeUnreadCountById(),
syncService.events()
) { userId, overviewState, localEchos, unread, events ->
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
RoomFoo(
overview = roomOverview,
unreadCount = UnreadCount(unread[roomOverview.roomId] ?: 0),
typing = events.filterIsInstance<Typing>().firstOrNull { it.roomId == roomOverview.roomId }
)
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapMerge { userId ->
combine(
overviewDatasource(),
messageService.localEchos(),
roomStore.observeUnreadCountById(),
syncService.events()
) { overviewState, localEchos, unread, events ->
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
RoomFoo(
overview = roomOverview,
unreadCount = UnreadCount(unread[roomOverview.roomId] ?: 0),
typing = events.filterIsInstance<Typing>().firstOrNull { it.roomId == roomOverview.roomId }
)
}
}
}
}
private fun overviewDatasource() = combine(
syncService.startSyncing().map { false }.onStart { emit(true) },
syncService.overview()
) { isFirstLoad, overview ->
when {
isFirstLoad && overview.isEmpty() -> null
else -> overview
}
}.filterNotNull()
private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
return when {
localEchos.isEmpty() -> this

View File

@ -9,6 +9,8 @@ import app.dapk.st.matrix.sync.RoomState
import app.dapk.st.matrix.sync.SyncService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
@ -21,11 +23,10 @@ internal class TimelineUseCaseImpl(
override fun invoke(roomId: RoomId, userId: UserId): Flow<MessengerState> {
return combine(
syncService.startSyncing(),
syncService.room(roomId),
roomDatasource(roomId),
messageService.localEchos(roomId),
syncService.events()
) { _, roomState, localEchos, events ->
) { roomState, localEchos, events ->
MessengerState(
roomState = when {
localEchos.isEmpty() -> roomState
@ -43,6 +44,10 @@ internal class TimelineUseCaseImpl(
}
}
private fun roomDatasource(roomId: RoomId) = combine(
syncService.startSyncing().map { false }.onStart { emit(true) },
syncService.room(roomId)
) { _, room -> room }
}
private fun UserId.toFallbackMember() = RoomMember(this, displayName = null, avatarUrl = null)

View File

@ -91,7 +91,7 @@ class TimelineUseCaseTest {
echos: List<MessageService.LocalEcho> = emptyList(),
events: List<SyncService.SyncEvent> = emptyList()
) {
fakeSyncService.givenSyncs()
fakeSyncService.givenStartsSyncing()
fakeSyncService.givenRoom(A_ROOM_ID).returns(flowOf(roomState))
fakeMessageService.givenEchos(A_ROOM_ID).returns(flowOf(echos))
fakeSyncService.givenEvents(A_ROOM_ID).returns(flowOf(events))

View File

@ -11,9 +11,7 @@ import app.dapk.st.work.WorkScheduler
import com.google.firebase.messaging.FirebaseMessagingService
import com.google.firebase.messaging.RemoteMessage
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.*
private var previousJob: Job? = null
@ -72,7 +70,7 @@ class PushAndroidService : FirebaseMessagingService() {
private suspend fun waitForEvent(timeout: Long, eventId: EventId): EventId? {
return withTimeoutOrNull(timeout) {
combine(module.syncService().startSyncing(), module.syncService().observeEvent(eventId)) { _, event -> event }
combine(module.syncService().startSyncing().startInstantly(), module.syncService().observeEvent(eventId)) { _, event -> event }
.firstOrNull {
it == eventId
}
@ -81,10 +79,12 @@ class PushAndroidService : FirebaseMessagingService() {
private suspend fun waitForUnreadChange(timeout: Long): String? {
return withTimeoutOrNull(timeout) {
combine(module.syncService().startSyncing(), module.roomStore().observeUnread()) { _, unread -> unread }
combine(module.syncService().startSyncing().startInstantly(), module.roomStore().observeUnread()) { _, unread -> unread }
.first()
"ignored"
}
}
}
private fun Flow<Unit>.startInstantly() = this.onStart { emit(Unit) }

View File

@ -95,7 +95,6 @@ internal class DefaultSyncService(
}
}
.onStart {
emit(Unit)
val subscriptions = syncSubscriptionCount.incrementAndGet()
logger.matrixLog(MatrixLogTag.SYNC, "flow onStart - count: $subscriptions")
}

View File

@ -6,15 +6,16 @@ import app.dapk.st.matrix.common.matrixLog
import kotlinx.coroutines.*
internal class SideEffectFlowIterator(private val logger: MatrixLogger) {
suspend fun <T> loop(initial: T?, action: suspend (T?) -> T?) {
suspend fun <T> loop(initial: T?, onPost: suspend () -> Unit, onIteration: suspend (T?) -> T?) {
var previousState = initial
while (currentCoroutineContext().isActive) {
logger.matrixLog(SYNC, "loop iteration")
try {
previousState = withContext(NonCancellable) {
action(previousState)
onIteration(previousState)
}
onPost()
} catch (error: Throwable) {
logger.matrixLog(SYNC, "on loop error: ${error.message}")
error.printStackTrace()

View File

@ -8,12 +8,9 @@ import app.dapk.st.matrix.sync.internal.SideEffectFlowIterator
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
import app.dapk.st.matrix.sync.internal.request.syncRequest
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.isActive
import kotlinx.coroutines.isActive
internal class SyncUseCase(
private val persistence: OverviewStore,
@ -29,50 +26,55 @@ internal class SyncUseCase(
) {
fun sync(): Flow<Unit> {
return flow<Unit> {
return flow {
val credentials = credentialsStore.credentials()!!
val filterId = filterUseCase.reducedFilter(credentials.userId)
with(flowIterator) {
loop<OverviewState>(initial = null) { previousState ->
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
val response = doSyncRequest(filterId, syncToken)
if (credentialsStore.isSignedIn()) {
logger.logP("sync processing") {
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
val sideEffects = logger.logP("side effects processing") {
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
}
val isInitialSync = syncToken == null
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
val overview = nextState.roomState.map { it.roomOverview }
if (nextState.roomsLeft.isNotEmpty()) {
persistence.removeRooms(nextState.roomsLeft)
}
if (nextState.invites.isNotEmpty()) {
persistence.persistInvites(nextState.invites)
}
if (nextState.newRoomsJoined.isNotEmpty()) {
persistence.removeInvites(nextState.newRoomsJoined)
}
when {
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
}
}
} else {
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
null
}
}
loop<OverviewState>(
initial = null,
onPost = { emit(Unit) },
onIteration = { onEachSyncIteration(filterId, credentials, previousState = it) }
)
}
}.cancellable()
}
private suspend fun onEachSyncIteration(filterId: SyncService.FilterId, credentials: UserCredentials, previousState: OverviewState?): OverviewState? {
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
val response = doSyncRequest(filterId, syncToken)
return if (credentialsStore.isSignedIn()) {
logger.logP("sync processing") {
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
val sideEffects = logger.logP("side effects processing") {
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
}
val isInitialSync = syncToken == null
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
val overview = nextState.roomState.map { it.roomOverview }
if (nextState.roomsLeft.isNotEmpty()) {
persistence.removeRooms(nextState.roomsLeft)
}
if (nextState.invites.isNotEmpty()) {
persistence.persistInvites(nextState.invites)
}
if (nextState.newRoomsJoined.isNotEmpty()) {
persistence.removeInvites(nextState.newRoomsJoined)
}
when {
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
}
}
} else {
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
null
}
}
private suspend fun doSyncRequest(filterId: SyncService.FilterId, syncToken: SyncToken?) = logger.logP("sync api") {
client.execute(
syncRequest(

View File

@ -4,12 +4,12 @@ import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.sync.SyncService
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.emptyFlow
import test.delegateReturn
class FakeSyncService : SyncService by mockk() {
fun givenSyncs() {
every { startSyncing() }.returns(flowOf(Unit))
fun givenStartsSyncing() {
every { startSyncing() }.returns(emptyFlow())
}
fun givenRoom(roomId: RoomId) = every { room(roomId) }.delegateReturn()

View File

@ -65,31 +65,14 @@ class SmokeTest {
@Test
@Order(4)
fun `can send and receive encrypted messages`() = testAfterInitialSync { alice, bob ->
val message = "from alice to bob : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
alice.sendEncryptedMessage(SharedState.sharedRoom, message.content)
bob.expectMessage(SharedState.sharedRoom, message)
val message2 = "from bob to alice : ${System.currentTimeMillis()}".from(SharedState.bob.roomMember)
bob.sendEncryptedMessage(SharedState.sharedRoom, message2.content)
alice.expectMessage(SharedState.sharedRoom, message2)
val aliceSecondDevice = TestMatrix(SharedState.alice).also { it.newlogin() }
aliceSecondDevice.client.syncService().startSyncing().collectAsync {
val message3 = "from alice to bob and alice's second device : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
alice.sendEncryptedMessage(SharedState.sharedRoom, message3.content)
aliceSecondDevice.expectMessage(SharedState.sharedRoom, message3)
bob.expectMessage(SharedState.sharedRoom, message3)
val message4 = "from alice's second device to bob and alice's first device : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
aliceSecondDevice.sendEncryptedMessage(SharedState.sharedRoom, message4.content)
alice.expectMessage(SharedState.sharedRoom, message4)
bob.expectMessage(SharedState.sharedRoom, message4)
}
}
fun `can send and receive clear text messages`() = testTextMessaging(isEncrypted = false)
@Test
@Order(5)
fun `can send and receive encrypted text messages`() = testTextMessaging(isEncrypted = true)
@Test
@Order(6)
fun `can request and verify devices`() = testAfterInitialSync { alice, bob ->
alice.client.cryptoService().verificationAction(Verification.Action.Request(bob.userId(), bob.deviceId()))
alice.client.cryptoService().verificationState().automaticVerification(alice).expectAsync { it == Verification.State.Done }
@ -110,6 +93,30 @@ class SmokeTest {
result shouldBeEqualTo listOf(RoomId(value = "!qOSENTtFUuCEKJSVzl:matrix.org"))
}
private fun testTextMessaging(isEncrypted: Boolean) = testAfterInitialSync { alice, bob ->
val message = "from alice to bob".from(SharedState.alice.roomMember)
alice.sendTextMessage(SharedState.sharedRoom, message.content, isEncrypted)
bob.expectTextMessage(SharedState.sharedRoom, message)
val message2 = "from bob to alice".from(SharedState.bob.roomMember)
bob.sendTextMessage(SharedState.sharedRoom, message2.content, isEncrypted)
alice.expectTextMessage(SharedState.sharedRoom, message2)
// Needs investigation
// val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true, withLogging = true).also { it.newlogin() }
// aliceSecondDevice.client.syncService().startSyncing().collectAsync {
// val message3 = "from alice to bob and alice's second device".from(SharedState.alice.roomMember)
// alice.sendTextMessage(SharedState.sharedRoom, message3.content, isEncrypted)
// aliceSecondDevice.expectTextMessage(SharedState.sharedRoom, message3)
// bob.expectTextMessage(SharedState.sharedRoom, message3)
//
// val message4 = "from alice's second device to bob and alice's first device".from(SharedState.alice.roomMember)
// aliceSecondDevice.sendTextMessage(SharedState.sharedRoom, message4.content, isEncrypted)
// alice.expectTextMessage(SharedState.sharedRoom, message4)
// bob.expectTextMessage(SharedState.sharedRoom, message4)
// }
}
}
private suspend fun createAndRegisterAccount(): TestUser {
@ -135,7 +142,6 @@ private suspend fun login(user: TestUser) {
.authService()
.login(AuthService.LoginRequest(userName = user.roomMember.id.value, password = user.password, serverUrl = null))
result shouldBeInstanceOf AuthService.LoginResult.Success::class.java
(result as AuthService.LoginResult.Success).userCredentials.let { credentials ->
credentials.accessToken shouldNotBeEqualTo null
@ -163,7 +169,7 @@ object SharedState {
data class TestUser(val password: String, val roomMember: RoomMember, val homeServer: String)
data class TestMessage(val content: String, val author: RoomMember)
fun String.from(roomMember: RoomMember) = TestMessage(this, roomMember)
fun String.from(roomMember: RoomMember) = TestMessage("$this - ${UUID.randomUUID()}", roomMember)
fun testAfterInitialSync(block: suspend MatrixTestScope.(TestMatrix, TestMatrix) -> Unit) {
restoreLoginAndInitialSync(TestMatrix(SharedState.alice, includeLogging = false), TestMatrix(SharedState.bob, includeLogging = false), block)

View File

@ -3,6 +3,7 @@
package test
import TestMessage
import TestUser
import app.dapk.st.core.extensions.ifNull
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.message.MessageService
@ -31,6 +32,8 @@ fun restoreLoginAndInitialSync(m1: TestMatrix, m2: TestMatrix, testBody: suspend
println("restore login 2")
m2.restoreLogin()
val testHelper = MatrixTestScope(this)
testHelper.testMatrix(m1)
testHelper.testMatrix(m2)
with(testHelper) {
combine(m1.client.syncService().startSyncing(), m2.client.syncService().startSyncing()) { _, _ -> }.collectAsync {
m1.client.syncService().overview().first()
@ -38,6 +41,7 @@ fun restoreLoginAndInitialSync(m1: TestMatrix, m2: TestMatrix, testBody: suspend
testBody(testHelper, m1, m2)
}
}
testHelper.release()
}
}
@ -54,6 +58,7 @@ suspend fun <T> Flow<T>.collectAsync(scope: CoroutineScope, block: suspend () ->
class MatrixTestScope(private val testScope: TestScope) {
private val inProgressExpects = mutableListOf<Deferred<*>>()
private val inProgressInstances = mutableListOf<TestMatrix>()
suspend fun <T> Flow<T>.collectAsync(block: suspend () -> Unit) {
collectAsync(testScope, block)
@ -99,6 +104,7 @@ class MatrixTestScope(private val testScope: TestScope) {
val collected = mutableListOf<T>()
val work = testScope.async {
flow.onEach {
println("found: $it")
collected.add(it)
}.first { it == expected }
}
@ -118,18 +124,20 @@ class MatrixTestScope(private val testScope: TestScope) {
.expect { it.any { it.roomId == roomId } }
}
suspend fun TestMatrix.expectMessage(roomId: RoomId, message: TestMessage) {
suspend fun TestMatrix.expectTextMessage(roomId: RoomId, message: TestMessage) {
println("expecting ${message.content}")
this.client.syncService().room(roomId)
.map { it.events.filterIsInstance<RoomEvent.Message>().map { TestMessage(it.content, it.author) }.firstOrNull() }
.assert(message)
}
suspend fun TestMatrix.sendEncryptedMessage(roomId: RoomId, content: String) {
suspend fun TestMatrix.sendTextMessage(roomId: RoomId, content: String, isEncrypted: Boolean) {
println("sending $content")
this.client.messageService().scheduleMessage(
MessageService.Message.TextMessage(
content = MessageService.Message.Content.TextContent(body = content),
roomId = roomId,
sendEncrypted = true,
sendEncrypted = isEncrypted,
localId = "local.${UUID.randomUUID()}",
timestampUtc = System.currentTimeMillis(),
)
@ -143,4 +151,16 @@ class MatrixTestScope(private val testScope: TestScope) {
}
}
fun testMatrix(user: TestUser, isTemp: Boolean, withLogging: Boolean = false) = TestMatrix(
user,
temporaryDatabase = isTemp,
includeLogging = withLogging
).also { inProgressInstances.add(it) }
fun testMatrix(testMatrix: TestMatrix) = inProgressInstances.add(testMatrix)
suspend fun release() {
inProgressInstances.forEach { it.release() }
}
}

View File

@ -33,10 +33,10 @@ import app.dapk.st.matrix.sync.internal.room.MessageDecrypter
import app.dapk.st.olm.DeviceKeyFactory
import app.dapk.st.olm.OlmPersistenceWrapper
import app.dapk.st.olm.OlmWrapper
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.amshove.kluent.fail
import test.impl.InMemoryDatabase
import test.impl.InMemoryPreferences
import test.impl.InstantScheduler
@ -45,6 +45,7 @@ import java.time.Clock
class TestMatrix(
private val user: TestUser,
temporaryDatabase: Boolean = false,
includeHttpLogging: Boolean = false,
includeLogging: Boolean = false,
) {
@ -57,7 +58,10 @@ class TestMatrix(
}
private val preferences = InMemoryPreferences()
private val database = InMemoryDatabase.realInstance(user.roomMember.id.value)
private val database = when (temporaryDatabase) {
true -> InMemoryDatabase.temp()
false -> InMemoryDatabase.realInstance(user.roomMember.id.value)
}
private val coroutineDispatchers = CoroutineDispatchers(
Dispatchers.Unconfined,
main = Dispatchers.Unconfined,
@ -258,8 +262,12 @@ class TestMatrix(
}
suspend fun newlogin() {
client.authService()
val result = client.authService()
.login(AuthService.LoginRequest(user.roomMember.id.value, user.password, null))
if (result !is AuthService.LoginResult.Success) {
fail("Login failed: $result")
}
}
suspend fun restoreLogin() {
@ -277,6 +285,24 @@ class TestMatrix(
suspend fun deviceId() = storeModule.credentialsStore().credentials()!!.deviceId
suspend fun userId() = storeModule.credentialsStore().credentials()!!.userId
suspend fun release() {
coroutineDispatchers.global.waitForCancel()
coroutineDispatchers.io.waitForCancel()
coroutineDispatchers.main.waitForCancel()
}
}
private suspend fun CoroutineDispatcher.waitForCancel() {
if (this.isActive) {
this.job.cancelAndJoin()
}
}
private suspend fun CoroutineScope.waitForCancel() {
if (this.isActive) {
this.coroutineContext.job.cancelAndJoin()
}
}
class JavaBase64 : Base64 {

View File

@ -19,4 +19,11 @@ object InMemoryDatabase {
})
}
fun temp(): DapkDb {
val driver = JdbcSqliteDriver(JdbcSqliteDriver.IN_MEMORY)
return DapkDb(driver).also {
DapkDb.Schema.create(driver)
}
}
}