Merge pull request #7917 from vector-im/feature/bma/viewEventReplay
View event replay
This commit is contained in:
commit
ac482b1389
|
@ -16,16 +16,18 @@
|
||||||
|
|
||||||
package im.vector.app.core.utils
|
package im.vector.app.core.utils
|
||||||
|
|
||||||
|
import im.vector.app.core.platform.VectorViewEvents
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.transform
|
import kotlinx.coroutines.flow.transform
|
||||||
import java.util.concurrent.CopyOnWriteArraySet
|
import java.util.concurrent.CopyOnWriteArraySet
|
||||||
|
|
||||||
interface SharedEvents<out T> {
|
interface SharedEvents<out T : VectorViewEvents> {
|
||||||
fun stream(consumerId: String): Flow<T>
|
fun stream(consumerId: String): Flow<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
class EventQueue<T>(capacity: Int) : SharedEvents<T> {
|
class EventQueue<T : VectorViewEvents>(capacity: Int) : SharedEvents<T> {
|
||||||
|
|
||||||
private val innerQueue = MutableSharedFlow<OneTimeEvent<T>>(replay = capacity)
|
private val innerQueue = MutableSharedFlow<OneTimeEvent<T>>(replay = capacity)
|
||||||
|
|
||||||
|
@ -33,7 +35,12 @@ class EventQueue<T>(capacity: Int) : SharedEvents<T> {
|
||||||
innerQueue.tryEmit(OneTimeEvent(event))
|
innerQueue.tryEmit(OneTimeEvent(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun stream(consumerId: String): Flow<T> = innerQueue.filterNotHandledBy(consumerId)
|
override fun stream(consumerId: String): Flow<T> = innerQueue
|
||||||
|
.onEach {
|
||||||
|
// Ensure that buffered Events will not be sent again to new subscribers.
|
||||||
|
innerQueue.resetReplayCache()
|
||||||
|
}
|
||||||
|
.filterNotHandledBy(consumerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,7 +49,7 @@ class EventQueue<T>(capacity: Int) : SharedEvents<T> {
|
||||||
*
|
*
|
||||||
* Keeps track of who has already handled its content.
|
* Keeps track of who has already handled its content.
|
||||||
*/
|
*/
|
||||||
private class OneTimeEvent<out T>(private val content: T) {
|
private class OneTimeEvent<out T : VectorViewEvents>(private val content: T) {
|
||||||
|
|
||||||
private val handlers = CopyOnWriteArraySet<String>()
|
private val handlers = CopyOnWriteArraySet<String>()
|
||||||
|
|
||||||
|
@ -53,6 +60,6 @@ private class OneTimeEvent<out T>(private val content: T) {
|
||||||
fun getIfNotHandled(asker: String): T? = if (handlers.add(asker)) content else null
|
fun getIfNotHandled(asker: String): T? = if (handlers.add(asker)) content else null
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <T> Flow<OneTimeEvent<T>>.filterNotHandledBy(consumerId: String): Flow<T> = transform { event ->
|
private fun <T : VectorViewEvents> Flow<OneTimeEvent<T>>.filterNotHandledBy(consumerId: String): Flow<T> = transform { event ->
|
||||||
event.getIfNotHandled(consumerId)?.let { emit(it) }
|
event.getIfNotHandled(consumerId)?.let { emit(it) }
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue