From 5781adb163e0164465c19d43f735e998220ce147 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 8 Jun 2021 20:08:28 +0200 Subject: [PATCH] Timeline merging : introduce TimelineProxy (WIP) --- .../detail/timeline/merged/MergedTimelines.kt | 194 ++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 vector/src/main/java/im/vector/app/features/home/room/detail/timeline/merged/MergedTimelines.kt diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/merged/MergedTimelines.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/merged/MergedTimelines.kt new file mode 100644 index 0000000000..5bc5ea0803 --- /dev/null +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/merged/MergedTimelines.kt @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.app.features.home.room.detail.timeline.merged + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit +import kotlinx.coroutines.withContext +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.session.room.timeline.Timeline +import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent + +class MergedTimelines( + private val coroutineScope: CoroutineScope, + private val mainTimeline: Timeline, + private val secondaryTimelineParams: SecondaryTimelineParams) : Timeline by mainTimeline { + + data class SecondaryTimelineParams( + val timeline: Timeline, + val shouldFilterTypes: Boolean = false, + val allowedTypes: List = emptyList() + ) + + private val secondaryTimeline = secondaryTimelineParams.timeline + + private val listenersMapping = HashMap>() + private val mainTimelineEvents = ArrayList() + private val secondaryTimelineEvents = ArrayList() + private val positionsMapping = HashMap() + private val mergedEvents = ArrayList() + + private val processingSemaphore = Semaphore(1) + + private class ListenerInterceptor( + var timeline: Timeline?, + private val wrappedListener: Timeline.Listener, + private val shouldFilterTypes: Boolean, + private val allowedTypes: List, + private val onTimelineUpdate: (List) -> Unit + ) : Timeline.Listener by wrappedListener { + + override fun onTimelineUpdated(snapshot: List) { + val filteredEvents = if (shouldFilterTypes) { + snapshot.filter { + allowedTypes.contains(it.root.getClearType()) + } + } else { + snapshot + } + onTimelineUpdate(filteredEvents) + } + } + + override fun addListener(listener: Timeline.Listener): Boolean { + val mainTimelineListener = ListenerInterceptor(mainTimeline, listener, false, emptyList()) { + processTimelineUpdates(mainTimelineEvents, it) + } + val secondaryTimelineListener = ListenerInterceptor(secondaryTimeline, listener, secondaryTimelineParams.shouldFilterTypes, secondaryTimelineParams.allowedTypes) { + processTimelineUpdates(secondaryTimelineEvents, it) + } + listenersMapping[listener] = listOf(mainTimelineListener, secondaryTimelineListener) + return mainTimeline.addListener(mainTimelineListener) && secondaryTimeline.addListener(secondaryTimelineListener) + } + + override fun removeListener(listener: Timeline.Listener): Boolean { + return listenersMapping.remove(listener)?.let { + it.forEach { listener -> + listener.timeline?.removeListener(listener) + listener.timeline = null + } + true + } ?: false + } + + override fun removeAllListeners() { + mainTimeline.removeAllListeners() + secondaryTimeline.removeAllListeners() + } + + override fun start() { + mainTimeline.start() + secondaryTimeline.start() + } + + override fun dispose() { + mainTimeline.dispose() + secondaryTimeline.dispose() + } + + override fun restartWithEventId(eventId: String?) { + mainTimeline.restartWithEventId(eventId) + } + + override fun hasMoreToLoad(direction: Timeline.Direction): Boolean { + return mainTimeline.hasMoreToLoad(direction) || secondaryTimeline.hasMoreToLoad(direction) + } + + override fun paginate(direction: Timeline.Direction, count: Int) { + mainTimeline.paginate(direction, count) + secondaryTimeline.paginate(direction, count) + } + + override fun pendingEventCount(): Int { + return mainTimeline.pendingEventCount() + secondaryTimeline.pendingEventCount() + } + + override fun failedToDeliverEventCount(): Int { + return mainTimeline.pendingEventCount() + secondaryTimeline.pendingEventCount() + } + + override fun getTimelineEventAtIndex(index: Int): TimelineEvent? { + return mergedEvents.getOrNull(index) + } + + override fun getIndexOfEvent(eventId: String?): Int? { + return positionsMapping[eventId] + } + + override fun getTimelineEventWithId(eventId: String?): TimelineEvent? { + return positionsMapping[eventId]?.let { + getTimelineEventAtIndex(it) + } + } + + private fun processTimelineUpdates(eventsRef: MutableList, newData: List) { + coroutineScope.launch(Dispatchers.Default) { + processingSemaphore.withPermit { + eventsRef.apply { + clear() + addAll(newData) + } + mergeTimeline() + } + } + } + + private suspend fun mergeTimeline() { + val merged = mutableListOf() + val mainItr = mainTimelineEvents.toList().listIterator() + val secondaryItr = secondaryTimelineEvents.toList().listIterator() + var index = 0 + + while (merged.size < mainTimelineEvents.size + secondaryTimelineEvents.size) { + if (mainItr.hasNext()) { + val nextMain = mainItr.next() + if (secondaryItr.hasNext()) { + val nextSecondary = secondaryItr.next() + if (nextSecondary.root.originServerTs ?: 0 > nextMain.root.originServerTs ?: 0) { + positionsMapping[nextSecondary.eventId] = index + merged.add(nextSecondary) + mainItr.previous() + } else { + positionsMapping[nextMain.eventId] = index + merged.add(nextMain) + secondaryItr.previous() + } + } else { + positionsMapping[nextMain.eventId] = index + merged.add(nextMain) + } + } else if (secondaryItr.hasNext()) { + val nextSecondary = secondaryItr.next() + positionsMapping[nextSecondary.eventId] = index + merged.add(nextSecondary) + } + index++ + } + mergedEvents.apply { + clear() + addAll(mergedEvents) + } + withContext(Dispatchers.Main) { + listenersMapping.keys.forEach { listener -> + tryOrNull { listener.onTimelineUpdated(merged) } + } + } + } +}