extracting flow extension for starting and ignoring future emissions
This commit is contained in:
parent
8e336fd730
commit
511273517e
|
@ -1,11 +1,7 @@
|
||||||
package app.dapk.st.core.extensions
|
package app.dapk.st.core.extensions
|
||||||
|
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.filter
|
|
||||||
import kotlinx.coroutines.flow.takeWhile
|
|
||||||
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
|
||||||
suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolean): T? {
|
suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolean): T? {
|
||||||
var counter = 0
|
var counter = 0
|
||||||
|
|
||||||
|
@ -22,3 +18,5 @@ suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolea
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun <T> Flow<T>.startAndIgnoreEmissions(): Flow<Boolean> = this.map { false }.onStart { emit(true) }.filter { it }
|
|
@ -1,5 +1,6 @@
|
||||||
package app.dapk.st.messenger
|
package app.dapk.st.messenger
|
||||||
|
|
||||||
|
import app.dapk.st.core.extensions.startAndIgnoreEmissions
|
||||||
import app.dapk.st.matrix.common.RoomId
|
import app.dapk.st.matrix.common.RoomId
|
||||||
import app.dapk.st.matrix.common.RoomMember
|
import app.dapk.st.matrix.common.RoomMember
|
||||||
import app.dapk.st.matrix.common.UserId
|
import app.dapk.st.matrix.common.UserId
|
||||||
|
@ -7,7 +8,8 @@ import app.dapk.st.matrix.message.MessageService
|
||||||
import app.dapk.st.matrix.room.RoomService
|
import app.dapk.st.matrix.room.RoomService
|
||||||
import app.dapk.st.matrix.sync.RoomState
|
import app.dapk.st.matrix.sync.RoomState
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.combine
|
||||||
|
|
||||||
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
||||||
|
|
||||||
|
@ -42,7 +44,7 @@ internal class TimelineUseCaseImpl(
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun roomDatasource(roomId: RoomId) = combine(
|
private fun roomDatasource(roomId: RoomId) = combine(
|
||||||
syncService.startSyncing().map { false }.onStart { emit(true) }.filter { it },
|
syncService.startSyncing().startAndIgnoreEmissions(),
|
||||||
syncService.room(roomId)
|
syncService.room(roomId)
|
||||||
) { _, room -> room }
|
) { _, room -> room }
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue