avoiding duplicated room event emissions when watching a specific room id
This commit is contained in:
parent
74dff9ccf2
commit
8e336fd730
|
@ -13,10 +13,7 @@ import app.dapk.st.matrix.sync.RoomStore
|
||||||
import com.squareup.sqldelight.runtime.coroutines.asFlow
|
import com.squareup.sqldelight.runtime.coroutines.asFlow
|
||||||
import com.squareup.sqldelight.runtime.coroutines.mapToList
|
import com.squareup.sqldelight.runtime.coroutines.mapToList
|
||||||
import com.squareup.sqldelight.runtime.coroutines.mapToOneNotNull
|
import com.squareup.sqldelight.runtime.coroutines.mapToOneNotNull
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.combine
|
|
||||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
|
||||||
private val json = Json
|
private val json = Json
|
||||||
|
@ -54,12 +51,13 @@ internal class RoomPersistence(
|
||||||
override fun latest(roomId: RoomId): Flow<RoomState> {
|
override fun latest(roomId: RoomId): Flow<RoomState> {
|
||||||
val overviewFlow = database.overviewStateQueries.selectRoom(roomId.value).asFlow().mapToOneNotNull().map {
|
val overviewFlow = database.overviewStateQueries.selectRoom(roomId.value).asFlow().mapToOneNotNull().map {
|
||||||
json.decodeFromString(RoomOverview.serializer(), it)
|
json.decodeFromString(RoomOverview.serializer(), it)
|
||||||
}
|
}.distinctUntilChanged()
|
||||||
|
|
||||||
return database.roomEventQueries.selectRoom(roomId.value)
|
return database.roomEventQueries.selectRoom(roomId.value)
|
||||||
.asFlow()
|
.asFlow()
|
||||||
.mapToList()
|
.mapToList()
|
||||||
.map { it.map { json.decodeFromString(RoomEvent.serializer(), it) } }
|
.map { it.map { json.decodeFromString(RoomEvent.serializer(), it) } }
|
||||||
|
.distinctUntilChanged()
|
||||||
.combine(overviewFlow) { events, overview ->
|
.combine(overviewFlow) { events, overview ->
|
||||||
RoomState(overview, events)
|
RoomState(overview, events)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,10 +7,7 @@ import app.dapk.st.matrix.message.MessageService
|
||||||
import app.dapk.st.matrix.room.RoomService
|
import app.dapk.st.matrix.room.RoomService
|
||||||
import app.dapk.st.matrix.sync.RoomState
|
import app.dapk.st.matrix.sync.RoomState
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.combine
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.flow.onStart
|
|
||||||
|
|
||||||
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
||||||
|
|
||||||
|
@ -25,7 +22,7 @@ internal class TimelineUseCaseImpl(
|
||||||
return combine(
|
return combine(
|
||||||
roomDatasource(roomId),
|
roomDatasource(roomId),
|
||||||
messageService.localEchos(roomId),
|
messageService.localEchos(roomId),
|
||||||
syncService.events()
|
syncService.events(roomId)
|
||||||
) { roomState, localEchos, events ->
|
) { roomState, localEchos, events ->
|
||||||
MessengerState(
|
MessengerState(
|
||||||
roomState = when {
|
roomState = when {
|
||||||
|
@ -45,7 +42,7 @@ internal class TimelineUseCaseImpl(
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun roomDatasource(roomId: RoomId) = combine(
|
private fun roomDatasource(roomId: RoomId) = combine(
|
||||||
syncService.startSyncing().map { false }.onStart { emit(true) },
|
syncService.startSyncing().map { false }.onStart { emit(true) }.filter { it },
|
||||||
syncService.room(roomId)
|
syncService.room(roomId)
|
||||||
) { _, room -> room }
|
) { _, room -> room }
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue