emitting events each time the sync completes so that we can know if the first sync is happening

This commit is contained in:
Adam Brown 2022-06-11 22:46:41 +01:00
parent 482e5c15b7
commit b82397d965
5 changed files with 54 additions and 50 deletions

View File

@ -95,7 +95,6 @@ internal class DefaultSyncService(
}
}
.onStart {
emit(Unit)
val subscriptions = syncSubscriptionCount.incrementAndGet()
logger.matrixLog(MatrixLogTag.SYNC, "flow onStart - count: $subscriptions")
}

View File

@ -6,15 +6,16 @@ import app.dapk.st.matrix.common.matrixLog
import kotlinx.coroutines.*
internal class SideEffectFlowIterator(private val logger: MatrixLogger) {
suspend fun <T> loop(initial: T?, action: suspend (T?) -> T?) {
suspend fun <T> loop(initial: T?, onPost: suspend () -> Unit, onIteration: suspend (T?) -> T?) {
var previousState = initial
while (currentCoroutineContext().isActive) {
logger.matrixLog(SYNC, "loop iteration")
try {
previousState = withContext(NonCancellable) {
action(previousState)
onIteration(previousState)
}
onPost()
} catch (error: Throwable) {
logger.matrixLog(SYNC, "on loop error: ${error.message}")
error.printStackTrace()

View File

@ -8,12 +8,9 @@ import app.dapk.st.matrix.sync.internal.SideEffectFlowIterator
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
import app.dapk.st.matrix.sync.internal.request.syncRequest
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.isActive
import kotlinx.coroutines.isActive
internal class SyncUseCase(
private val persistence: OverviewStore,
@ -29,50 +26,55 @@ internal class SyncUseCase(
) {
fun sync(): Flow<Unit> {
return flow<Unit> {
return flow {
val credentials = credentialsStore.credentials()!!
val filterId = filterUseCase.reducedFilter(credentials.userId)
with(flowIterator) {
loop<OverviewState>(initial = null) { previousState ->
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
val response = doSyncRequest(filterId, syncToken)
if (credentialsStore.isSignedIn()) {
logger.logP("sync processing") {
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
val sideEffects = logger.logP("side effects processing") {
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
}
val isInitialSync = syncToken == null
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
val overview = nextState.roomState.map { it.roomOverview }
if (nextState.roomsLeft.isNotEmpty()) {
persistence.removeRooms(nextState.roomsLeft)
}
if (nextState.invites.isNotEmpty()) {
persistence.persistInvites(nextState.invites)
}
if (nextState.newRoomsJoined.isNotEmpty()) {
persistence.removeInvites(nextState.newRoomsJoined)
}
when {
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
}
}
} else {
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
null
}
}
loop<OverviewState>(
initial = null,
onPost = { emit(Unit) },
onIteration = { onEachSyncIteration(filterId, credentials, previousState = it) }
)
}
}.cancellable()
}
private suspend fun onEachSyncIteration(filterId: SyncService.FilterId, credentials: UserCredentials, previousState: OverviewState?): OverviewState? {
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
val response = doSyncRequest(filterId, syncToken)
return if (credentialsStore.isSignedIn()) {
logger.logP("sync processing") {
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
val sideEffects = logger.logP("side effects processing") {
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
}
val isInitialSync = syncToken == null
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
val overview = nextState.roomState.map { it.roomOverview }
if (nextState.roomsLeft.isNotEmpty()) {
persistence.removeRooms(nextState.roomsLeft)
}
if (nextState.invites.isNotEmpty()) {
persistence.persistInvites(nextState.invites)
}
if (nextState.newRoomsJoined.isNotEmpty()) {
persistence.removeInvites(nextState.newRoomsJoined)
}
when {
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
}
}
} else {
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
null
}
}
private suspend fun doSyncRequest(filterId: SyncService.FilterId, syncToken: SyncToken?) = logger.logP("sync api") {
client.execute(
syncRequest(

View File

@ -67,9 +67,9 @@ class SmokeTest {
@Order(4)
fun `can send and receive clear text messages`() = testTextMessaging(isEncrypted = false)
// @Test
// @Order(5)
// fun `can send and receive encrypted text messages`() = testTextMessaging(isEncrypted = true)
@Test
@Order(5)
fun `can send and receive encrypted text messages`() = testTextMessaging(isEncrypted = true)
@Test
@Order(6)
@ -103,7 +103,7 @@ class SmokeTest {
bob.sendTextMessage(SharedState.sharedRoom, message2.content, isEncrypted)
alice.expectTextMessage(SharedState.sharedRoom, message2)
val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true).also { it.newlogin() }
val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true, withLogging = true).also { it.newlogin() }
aliceSecondDevice.client.syncService().startSyncing().collectAsync {
val message3 = "from alice to bob and alice's second device".from(SharedState.alice.roomMember)
alice.sendTextMessage(SharedState.sharedRoom, message3.content, isEncrypted)

View File

@ -151,9 +151,11 @@ class MatrixTestScope(private val testScope: TestScope) {
}
}
fun testMatrix(user: TestUser, isTemp: Boolean) = TestMatrix(user, temporaryDatabase = isTemp).also {
inProgressInstances.add(it)
}
fun testMatrix(user: TestUser, isTemp: Boolean, withLogging: Boolean = false) = TestMatrix(
user,
temporaryDatabase = isTemp,
includeLogging = withLogging
).also { inProgressInstances.add(it) }
fun testMatrix(testMatrix: TestMatrix) = inProgressInstances.add(testMatrix)