From cceb1cd66cdb2b0c8594f0f3c6c7e05aeaaa1734 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Mon, 9 Jan 2023 16:39:12 +0100 Subject: [PATCH 1/2] Add constraint on T. It has to extend `VectorViewEvents` --- .../main/java/im/vector/app/core/utils/SharedEvent.kt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt index e712769c48..779270c092 100644 --- a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt +++ b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt @@ -16,16 +16,17 @@ package im.vector.app.core.utils +import im.vector.app.core.platform.VectorViewEvents import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.transform import java.util.concurrent.CopyOnWriteArraySet -interface SharedEvents { +interface SharedEvents { fun stream(consumerId: String): Flow } -class EventQueue(capacity: Int) : SharedEvents { +class EventQueue(capacity: Int) : SharedEvents { private val innerQueue = MutableSharedFlow>(replay = capacity) @@ -42,7 +43,7 @@ class EventQueue(capacity: Int) : SharedEvents { * * Keeps track of who has already handled its content. */ -private class OneTimeEvent(private val content: T) { +private class OneTimeEvent(private val content: T) { private val handlers = CopyOnWriteArraySet() @@ -53,6 +54,6 @@ private class OneTimeEvent(private val content: T) { fun getIfNotHandled(asker: String): T? = if (handlers.add(asker)) content else null } -private fun Flow>.filterNotHandledBy(consumerId: String): Flow = transform { event -> +private fun Flow>.filterNotHandledBy(consumerId: String): Flow = transform { event -> event.getIfNotHandled(consumerId)?.let { emit(it) } } From 02c61d3fb5ce650b9162a957b06f71a59667d34c Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Tue, 10 Jan 2023 10:39:49 +0100 Subject: [PATCH 2/2] Fix view event replay --- .../src/main/java/im/vector/app/core/utils/SharedEvent.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt index 779270c092..081a4f6192 100644 --- a/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt +++ b/vector/src/main/java/im/vector/app/core/utils/SharedEvent.kt @@ -19,6 +19,7 @@ package im.vector.app.core.utils import im.vector.app.core.platform.VectorViewEvents import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.transform import java.util.concurrent.CopyOnWriteArraySet @@ -34,7 +35,12 @@ class EventQueue(capacity: Int) : SharedEvents { innerQueue.tryEmit(OneTimeEvent(event)) } - override fun stream(consumerId: String): Flow = innerQueue.filterNotHandledBy(consumerId) + override fun stream(consumerId: String): Flow = innerQueue + .onEach { + // Ensure that buffered Events will not be sent again to new subscribers. + innerQueue.resetReplayCache() + } + .filterNotHandledBy(consumerId) } /**