mirror of
https://github.com/ouchadam/small-talk.git
synced 2025-03-09 08:30:15 +01:00
avoiding duplicated room event emissions when watching a specific room id
This commit is contained in:
parent
55cdedd5af
commit
78c62ab7bb
@ -13,10 +13,7 @@ import app.dapk.st.matrix.sync.RoomStore
|
|||||||
import com.squareup.sqldelight.runtime.coroutines.asFlow
|
import com.squareup.sqldelight.runtime.coroutines.asFlow
|
||||||
import com.squareup.sqldelight.runtime.coroutines.mapToList
|
import com.squareup.sqldelight.runtime.coroutines.mapToList
|
||||||
import com.squareup.sqldelight.runtime.coroutines.mapToOneNotNull
|
import com.squareup.sqldelight.runtime.coroutines.mapToOneNotNull
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.combine
|
|
||||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
|
||||||
private val json = Json
|
private val json = Json
|
||||||
@ -54,12 +51,13 @@ internal class RoomPersistence(
|
|||||||
override fun latest(roomId: RoomId): Flow<RoomState> {
|
override fun latest(roomId: RoomId): Flow<RoomState> {
|
||||||
val overviewFlow = database.overviewStateQueries.selectRoom(roomId.value).asFlow().mapToOneNotNull().map {
|
val overviewFlow = database.overviewStateQueries.selectRoom(roomId.value).asFlow().mapToOneNotNull().map {
|
||||||
json.decodeFromString(RoomOverview.serializer(), it)
|
json.decodeFromString(RoomOverview.serializer(), it)
|
||||||
}
|
}.distinctUntilChanged()
|
||||||
|
|
||||||
return database.roomEventQueries.selectRoom(roomId.value)
|
return database.roomEventQueries.selectRoom(roomId.value)
|
||||||
.asFlow()
|
.asFlow()
|
||||||
.mapToList()
|
.mapToList()
|
||||||
.map { it.map { json.decodeFromString(RoomEvent.serializer(), it) } }
|
.map { it.map { json.decodeFromString(RoomEvent.serializer(), it) } }
|
||||||
|
.distinctUntilChanged()
|
||||||
.combine(overviewFlow) { events, overview ->
|
.combine(overviewFlow) { events, overview ->
|
||||||
RoomState(overview, events)
|
RoomState(overview, events)
|
||||||
}
|
}
|
||||||
|
@ -7,10 +7,7 @@ import app.dapk.st.matrix.message.MessageService
|
|||||||
import app.dapk.st.matrix.room.RoomService
|
import app.dapk.st.matrix.room.RoomService
|
||||||
import app.dapk.st.matrix.sync.RoomState
|
import app.dapk.st.matrix.sync.RoomState
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.combine
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.flow.onStart
|
|
||||||
|
|
||||||
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
||||||
|
|
||||||
@ -25,7 +22,7 @@ internal class TimelineUseCaseImpl(
|
|||||||
return combine(
|
return combine(
|
||||||
roomDatasource(roomId),
|
roomDatasource(roomId),
|
||||||
messageService.localEchos(roomId),
|
messageService.localEchos(roomId),
|
||||||
syncService.events()
|
syncService.events(roomId)
|
||||||
) { roomState, localEchos, events ->
|
) { roomState, localEchos, events ->
|
||||||
MessengerState(
|
MessengerState(
|
||||||
roomState = when {
|
roomState = when {
|
||||||
@ -45,7 +42,7 @@ internal class TimelineUseCaseImpl(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun roomDatasource(roomId: RoomId) = combine(
|
private fun roomDatasource(roomId: RoomId) = combine(
|
||||||
syncService.startSyncing().map { false }.onStart { emit(true) },
|
syncService.startSyncing().map { false }.onStart { emit(true) }.filter { it },
|
||||||
syncService.room(roomId)
|
syncService.room(roomId)
|
||||||
) { _, room -> room }
|
) { _, room -> room }
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user