diff --git a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt index e712769c48..081a4f6192 100644 --- a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt +++ b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt @@ -16,16 +16,18 @@ package im.vector.app.core.utils +import im.vector.app.core.platform.VectorViewEvents import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.transform import java.util.concurrent.CopyOnWriteArraySet -interface SharedEvents { +interface SharedEvents { fun stream(consumerId: String): Flow } -class EventQueue(capacity: Int) : SharedEvents { +class EventQueue(capacity: Int) : SharedEvents { private val innerQueue = MutableSharedFlow>(replay = capacity) @@ -33,7 +35,12 @@ class EventQueue(capacity: Int) : SharedEvents { innerQueue.tryEmit(OneTimeEvent(event)) } - override fun stream(consumerId: String): Flow = innerQueue.filterNotHandledBy(consumerId) + override fun stream(consumerId: String): Flow = innerQueue + .onEach { + // Ensure that buffered Events will not be sent again to new subscribers. + innerQueue.resetReplayCache() + } + .filterNotHandledBy(consumerId) } /** @@ -42,7 +49,7 @@ class EventQueue(capacity: Int) : SharedEvents { * * Keeps track of who has already handled its content. */ -private class OneTimeEvent(private val content: T) { +private class OneTimeEvent(private val content: T) { private val handlers = CopyOnWriteArraySet() @@ -53,6 +60,6 @@ private class OneTimeEvent(private val content: T) { fun getIfNotHandled(asker: String): T? = if (handlers.add(asker)) content else null } -private fun Flow>.filterNotHandledBy(consumerId: String): Flow = transform { event -> +private fun Flow>.filterNotHandledBy(consumerId: String): Flow = transform { event -> event.getIfNotHandled(consumerId)?.let { emit(it) } }