add 6 args flow combine
This commit is contained in:
parent
0d0c3d55fe
commit
093ca6df3b
|
@ -20,3 +20,26 @@ suspend fun <T> Flow<T>.firstOrNull(count: Int, predicate: suspend (T) -> Boolea
|
|||
|
||||
return result
|
||||
}
|
||||
|
||||
inline fun <T1, T2, T3, T4, T5, T6, R> combine(
|
||||
flow: Flow<T1>,
|
||||
flow2: Flow<T2>,
|
||||
flow3: Flow<T3>,
|
||||
flow4: Flow<T4>,
|
||||
flow5: Flow<T5>,
|
||||
flow6: Flow<T6>,
|
||||
crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R
|
||||
): Flow<R> {
|
||||
return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) { args: Array<*> ->
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
transform(
|
||||
args[0] as T1,
|
||||
args[1] as T2,
|
||||
args[2] as T3,
|
||||
args[3] as T4,
|
||||
args[4] as T5,
|
||||
args[5] as T6,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
package app.dapk.st.engine
|
||||
|
||||
import app.dapk.st.core.extensions.combine
|
||||
import app.dapk.st.matrix.common.*
|
||||
import app.dapk.st.matrix.message.MessageService
|
||||
import app.dapk.st.matrix.room.RoomService
|
||||
import app.dapk.st.matrix.sync.RoomStore
|
||||
import app.dapk.st.matrix.sync.SyncService
|
||||
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flatMapConcat
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
|
||||
internal class DirectoryUseCase(
|
||||
private val syncService: SyncService,
|
||||
|
@ -17,14 +21,15 @@ internal class DirectoryUseCase(
|
|||
) {
|
||||
|
||||
fun state(): Flow<DirectoryState> {
|
||||
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapMerge { userId ->
|
||||
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapConcat { userId ->
|
||||
combine(
|
||||
overviewDatasource(),
|
||||
syncService.startSyncing(),
|
||||
syncService.overview().map { it.map { it.engine() } },
|
||||
messageService.localEchos(),
|
||||
roomStore.observeUnreadCountById(),
|
||||
syncService.events(),
|
||||
roomStore.observeMuted(),
|
||||
) { overviewState, localEchos, unread, events, muted ->
|
||||
) { _, overviewState, localEchos, unread, events, muted ->
|
||||
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
|
||||
DirectoryItem(
|
||||
overview = roomOverview,
|
||||
|
@ -37,11 +42,6 @@ internal class DirectoryUseCase(
|
|||
}
|
||||
}
|
||||
|
||||
private fun overviewDatasource() = combine(
|
||||
syncService.startSyncing(),
|
||||
syncService.overview().map { it.map { it.engine() } }
|
||||
) { _, overview -> overview }.filterNotNull()
|
||||
|
||||
private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
|
||||
return when {
|
||||
localEchos.isEmpty() -> this
|
||||
|
@ -80,6 +80,3 @@ internal class DirectoryUseCase(
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue