ensuring the sync emits instantly or after the initial sync

This commit is contained in:
Adam Brown 2022-10-10 20:03:53 +01:00
parent 2b55c7dffa
commit 6e076e7c9f
7 changed files with 30 additions and 27 deletions

View File

@ -1,6 +1,8 @@
package app.dapk.st.core.extensions
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.takeWhile
suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolean): T? {
var counter = 0
@ -18,5 +20,3 @@ suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolea
return result
}
fun <T> Flow<T>.startAndIgnoreEmissions(): Flow<Boolean> = this.map { false }.onStart { emit(true) }.filter { it }

View File

@ -39,14 +39,9 @@ internal class DirectoryUseCase(
}
private fun overviewDatasource() = combine(
syncService.startSyncing().map { false }.onStart { emit(true) },
syncService.startSyncing(),
syncService.overview().map { it.map { it.engine() } }
) { isFirstLoad, overview ->
when {
isFirstLoad && overview.isEmpty() -> null
else -> overview
}
}.filterNotNull()
) { _, overview -> overview }.filterNotNull()
private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
return when {

View File

@ -4,7 +4,6 @@ import app.dapk.st.matrix.sync.SyncService
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
class InviteUseCase(
private val syncService: SyncService
@ -13,13 +12,8 @@ class InviteUseCase(
fun invites() = invitesDatasource()
private fun invitesDatasource() = combine(
syncService.startSyncing().map { false }.onStart { emit(true) },
syncService.startSyncing(),
syncService.invites().map { it.map { it.engine() } }
) { isFirstLoad, invites ->
when {
isFirstLoad && invites.isEmpty() -> null
else -> invites
}
}.filterNotNull()
) { _, invites -> invites }.filterNotNull()
}

View File

@ -64,7 +64,7 @@ class MatrixPushHandler(
private suspend fun waitForEvent(timeout: Long, eventId: EventId): EventId? {
return withTimeoutOrNull(timeout) {
combine(syncService.startSyncing().startInstantly(), syncService.observeEvent(eventId)) { _, event -> event }
combine(syncService.startSyncing(), syncService.observeEvent(eventId)) { _, event -> event }
.firstOrNull {
it == eventId
}
@ -73,11 +73,9 @@ class MatrixPushHandler(
private suspend fun waitForUnreadChange(timeout: Long): String? {
return withTimeoutOrNull(timeout) {
combine(syncService.startSyncing().startInstantly(), roomStore.observeUnread()) { _, unread -> unread }
combine(syncService.startSyncing(), roomStore.observeUnread()) { _, unread -> unread }
.first()
"ignored"
}
}
private fun Flow<Unit>.startInstantly() = this.onStart { emit(Unit) }
}

View File

@ -1,6 +1,5 @@
package app.dapk.st.engine
import app.dapk.st.core.extensions.startAndIgnoreEmissions
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.common.UserId
@ -44,8 +43,8 @@ internal class TimelineUseCaseImpl(
}
private fun roomDatasource(roomId: RoomId) = combine(
syncService.startSyncing().startAndIgnoreEmissions(),
syncService.room(roomId).map { it.engine() }
syncService.startSyncing(),
syncService.room(roomId).map { it.engine() }
) { _, room -> room }
}

View File

@ -18,6 +18,11 @@ interface SyncService : MatrixService {
fun invites(): Flow<InviteState>
fun overview(): Flow<OverviewState>
fun room(roomId: RoomId): Flow<RoomState>
/**
* Subscribe to keep the background syncing alive
* Emits once, either when the initial sync completes or immediately if has already sync'd once
*/
fun startSyncing(): Flow<Unit>
fun events(roomId: RoomId? = null): Flow<List<SyncEvent>>
suspend fun observeEvent(eventId: EventId): Flow<EventId>
@ -31,6 +36,7 @@ interface SyncService : MatrixService {
data class Typing(override val roomId: RoomId, val members: List<RoomMember>) : SyncEvent
}
}
fun MatrixServiceInstaller.installSyncService(

View File

@ -24,7 +24,7 @@ private val syncSubscriptionCount = AtomicInteger()
internal class DefaultSyncService(
httpClient: MatrixHttpClient,
syncStore: SyncStore,
private val syncStore: SyncStore,
private val overviewStore: OverviewStore,
private val roomStore: RoomStore,
filterStore: FilterStore,
@ -104,7 +104,18 @@ internal class DefaultSyncService(
}
}
override fun startSyncing() = syncFlow
override fun startSyncing(): Flow<Unit> {
return flow { emit(syncStore.read(SyncStore.SyncKey.Overview) != null) }.flatMapMerge { hasSynced ->
when (hasSynced) {
true -> syncFlow.filter { false }.onStart { emit(Unit) }
false -> {
var counter = 0
syncFlow.filter { counter < 1 }.onEach { counter++ }
}
}
}
}
override fun invites() = overviewStore.latestInvites()
override fun overview() = overviewStore.latest()
override fun room(roomId: RoomId) = roomStore.latest(roomId)