Timeline: make 3 integration tests passing (also add some suspend method on the timeline)

This commit is contained in:
ganfra 2021-12-07 21:13:41 +01:00
parent 49eee0dc38
commit 29a4fd1e41
11 changed files with 331 additions and 426 deletions

View File

@ -145,36 +145,9 @@ class CommonTestHelper(context: Context) {
* @param nbOfMessages the number of time the message will be sent * @param nbOfMessages the number of time the message will be sent
*/ */
fun sendTextMessage(room: Room, message: String, nbOfMessages: Int, timeout: Long = TestConstants.timeOutMillis): List<TimelineEvent> { fun sendTextMessage(room: Room, message: String, nbOfMessages: Int, timeout: Long = TestConstants.timeOutMillis): List<TimelineEvent> {
val sentEvents = ArrayList<TimelineEvent>(nbOfMessages)
val timeline = room.createTimeline(null, TimelineSettings(10)) val timeline = room.createTimeline(null, TimelineSettings(10))
timeline.start() timeline.start()
waitWithLatch(timeout + 1_000L * nbOfMessages) { latch -> val sentEvents = sendTextMessagesBatched(timeline, room, message, nbOfMessages, timeout)
val timelineListener = object : Timeline.Listener {
override fun onTimelineFailure(throwable: Throwable) {
}
override fun onNewTimelineEvents(eventIds: List<String>) {
// noop
}
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
val newMessages = snapshot
.filter { it.root.sendState == SendState.SYNCED }
.filter { it.root.getClearType() == EventType.MESSAGE }
.filter { it.root.getClearContent().toModel<MessageContent>()?.body?.startsWith(message) == true }
Timber.v("New synced message size: ${newMessages.size}")
if (newMessages.size == nbOfMessages) {
sentEvents.addAll(newMessages)
// Remove listener now, if not at the next update sendEvents could change
timeline.removeListener(this)
latch.countDown()
}
}
}
timeline.addListener(timelineListener)
sendTextMessagesBatched(room, message, nbOfMessages)
}
timeline.dispose() timeline.dispose()
// Check that all events has been created // Check that all events has been created
assertEquals("Message number do not match $sentEvents", nbOfMessages.toLong(), sentEvents.size.toLong()) assertEquals("Message number do not match $sentEvents", nbOfMessages.toLong(), sentEvents.size.toLong())
@ -182,9 +155,10 @@ class CommonTestHelper(context: Context) {
} }
/** /**
* Will send nb of messages provided by count parameter but waits a bit every 10 messages to avoid gap in sync * Will send nb of messages provided by count parameter but waits every 10 messages to avoid gap in sync
*/ */
private fun sendTextMessagesBatched(room: Room, message: String, count: Int) { private fun sendTextMessagesBatched(timeline: Timeline, room: Room, message: String, count: Int, timeout: Long): List<TimelineEvent> {
val sentEvents = ArrayList<TimelineEvent>(count)
(1 until count + 1) (1 until count + 1)
.map { "$message #$it" } .map { "$message #$it" }
.chunked(10) .chunked(10)
@ -192,8 +166,34 @@ class CommonTestHelper(context: Context) {
batchedMessages.forEach { formattedMessage -> batchedMessages.forEach { formattedMessage ->
room.sendTextMessage(formattedMessage) room.sendTextMessage(formattedMessage)
} }
Thread.sleep(1_000L) waitWithLatch(timeout) { latch ->
val timelineListener = object : Timeline.Listener {
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
val allSentMessages = snapshot
.filter { it.root.sendState == SendState.SYNCED }
.filter { it.root.getClearType() == EventType.MESSAGE }
.filter { it.root.getClearContent().toModel<MessageContent>()?.body?.startsWith(message) == true }
val hasSyncedAllBatchedMessages = allSentMessages
.map {
it.root.getClearContent().toModel<MessageContent>()?.body
}
.containsAll(batchedMessages)
if (allSentMessages.size == count) {
sentEvents.addAll(allSentMessages)
}
if (hasSyncedAllBatchedMessages) {
timeline.removeListener(this)
latch.countDown()
}
}
}
timeline.addListener(timelineListener)
}
} }
return sentEvents
} }
// PRIVATE METHODS ***************************************************************************** // PRIVATE METHODS *****************************************************************************
@ -332,13 +332,6 @@ class CommonTestHelper(context: Context) {
fun createEventListener(latch: CountDownLatch, predicate: (List<TimelineEvent>) -> Boolean): Timeline.Listener { fun createEventListener(latch: CountDownLatch, predicate: (List<TimelineEvent>) -> Boolean): Timeline.Listener {
return object : Timeline.Listener { return object : Timeline.Listener {
override fun onTimelineFailure(throwable: Throwable) {
// noop
}
override fun onNewTimelineEvents(eventIds: List<String>) {
// noop
}
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) { override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
if (predicate(snapshot)) { if (predicate(snapshot)) {

View File

@ -1,183 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import org.amshove.kluent.shouldBeFalse
import org.amshove.kluent.shouldBeTrue
import org.junit.Assert.assertTrue
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.MethodSorters
import org.matrix.android.sdk.InstrumentedTest
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.common.CommonTestHelper
import org.matrix.android.sdk.common.CryptoTestHelper
import org.matrix.android.sdk.common.checkSendOrder
import timber.log.Timber
import java.util.concurrent.CountDownLatch
@RunWith(JUnit4::class)
@FixMethodOrder(MethodSorters.JVM)
class TimelineBackToPreviousLastForwardTest : InstrumentedTest {
private val commonTestHelper = CommonTestHelper(context())
private val cryptoTestHelper = CryptoTestHelper(commonTestHelper)
/**
* This test ensure that if we have a chunk in the timeline which is due to a sync, and we click to permalink of an
* even contained in a previous lastForward chunk, we will be able to go back to the live
*/
@Test
fun backToPreviousLastForwardTest() {
val cryptoTestData = cryptoTestHelper.doE2ETestWithAliceAndBobInARoom(false)
val aliceSession = cryptoTestData.firstSession
val bobSession = cryptoTestData.secondSession!!
val aliceRoomId = cryptoTestData.roomId
aliceSession.cryptoService().setWarnOnUnknownDevices(false)
bobSession.cryptoService().setWarnOnUnknownDevices(false)
val roomFromAlicePOV = aliceSession.getRoom(aliceRoomId)!!
val roomFromBobPOV = bobSession.getRoom(aliceRoomId)!!
val bobTimeline = roomFromBobPOV.createTimeline(null, TimelineSettings(30))
bobTimeline.start()
var roomCreationEventId: String? = null
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
roomCreationEventId = snapshot.lastOrNull()?.root?.eventId
// Ok, we have the 8 first messages of the initial sync (room creation and bob join event)
snapshot.size == 8
}
bobTimeline.addListener(eventsListener)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
}
// Bob stop to sync
bobSession.stopSync()
val messageRoot = "First messages from Alice"
// Alice sends 30 messages
commonTestHelper.sendTextMessage(
roomFromAlicePOV,
messageRoot,
30)
// Bob start to sync
bobSession.startSync(true)
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// Ok, we have the 10 last messages from Alice.
snapshot.size == 10 &&
snapshot.all { it.root.content.toModel<MessageContent>()?.body?.startsWith(messageRoot).orFalse() }
}
bobTimeline.addListener(eventsListener)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeTrue()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
}
// Bob navigate to the first event (room creation event), so inside the previous last forward chunk
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// The event is in db, so it is fetch and auto pagination occurs, half of the number of events we have for this chunk (?)
snapshot.size == 4
}
bobTimeline.addListener(eventsListener)
// Restart the timeline to the first sent event, which is already in the database, so pagination should start automatically
assertTrue(roomFromBobPOV.getTimeLineEvent(roomCreationEventId!!) != null)
bobTimeline.restartWithEventId(roomCreationEventId)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
}
// Bob scroll to the future
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// Bob can see the first event of the room (so Back pagination has worked)
snapshot.lastOrNull()?.root?.getClearType() == EventType.STATE_ROOM_CREATE &&
// 8 for room creation item, and 30 for the forward pagination
snapshot.size == 38 &&
snapshot.checkSendOrder(messageRoot, 30, 0)
}
bobTimeline.addListener(eventsListener)
bobTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
}
bobTimeline.dispose()
cryptoTestData.cleanUp(commonTestHelper)
}
}

View File

@ -16,6 +16,8 @@
package org.matrix.android.sdk.session.room.timeline package org.matrix.android.sdk.session.room.timeline
import kotlinx.coroutines.runBlocking
import org.amshove.kluent.internal.assertEquals
import org.amshove.kluent.shouldBeFalse import org.amshove.kluent.shouldBeFalse
import org.amshove.kluent.shouldBeTrue import org.amshove.kluent.shouldBeTrue
import org.junit.FixMethodOrder import org.junit.FixMethodOrder
@ -123,54 +125,29 @@ class TimelineForwardPaginationTest : InstrumentedTest {
// Alice paginates BACKWARD and FORWARD of 50 events each // Alice paginates BACKWARD and FORWARD of 50 events each
// Then she can only navigate FORWARD // Then she can only navigate FORWARD
run { run {
val lock = CountDownLatch(1) val snapshot = runBlocking {
val aliceEventsListener = commonTestHelper.createEventListener(lock) { snapshot -> aliceTimeline.awaitPaginate(Timeline.Direction.BACKWARDS, 50)
Timber.e("Alice timeline updated: with ${snapshot.size} events:") aliceTimeline.awaitPaginate(Timeline.Direction.FORWARDS, 50)
snapshot.forEach {
Timber.w(" event ${it.root.content}")
}
// Alice can see the first event of the room (so Back pagination has worked)
snapshot.lastOrNull()?.root?.getClearType() == EventType.STATE_ROOM_CREATE &&
// 6 for room creation item (backward pagination), 1 for the context, and 50 for the forward pagination
snapshot.size == 57 // 6 + 1 + 50
} }
aliceTimeline.addListener(aliceEventsListener)
// Restart the timeline to the first sent event
// We ask to load event backward and forward
aliceTimeline.paginate(Timeline.Direction.BACKWARDS, 50)
aliceTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock)
aliceTimeline.removeAllListeners()
aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue() aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue()
aliceTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse() aliceTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
assertEquals(EventType.STATE_ROOM_CREATE, snapshot.lastOrNull()?.root?.getClearType())
// 6 for room creation item (backward pagination), 1 for the context, and 50 for the forward pagination
// 6 + 1 + 50
assertEquals(57, snapshot.size)
} }
// Alice paginates once again FORWARD for 50 events // Alice paginates once again FORWARD for 50 events
// All the timeline is retrieved, she cannot paginate anymore in both direction // All the timeline is retrieved, she cannot paginate anymore in both direction
run { run {
val lock = CountDownLatch(1)
val aliceEventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Alice timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root.content}")
}
// 6 for room creation item (backward pagination),and numberOfMessagesToSend (all the message of the room)
snapshot.size == 6 + numberOfMessagesToSend &&
snapshot.checkSendOrder(message, numberOfMessagesToSend, 0)
}
aliceTimeline.addListener(aliceEventsListener)
// Ask for a forward pagination // Ask for a forward pagination
aliceTimeline.paginate(Timeline.Direction.FORWARDS, 50) val snapshot = runBlocking {
aliceTimeline.awaitPaginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock) }
aliceTimeline.removeAllListeners() // 6 for room creation item (backward pagination),and numberOfMessagesToSend (all the message of the room)
snapshot.size == 6 + numberOfMessagesToSend &&
snapshot.checkSendOrder(message, numberOfMessagesToSend, 0)
// The timeline is fully loaded // The timeline is fully loaded
aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse() aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()

View File

@ -168,10 +168,8 @@ class TimelinePreviousLastForwardTest : InstrumentedTest {
bobTimeline.addListener(eventsListener) bobTimeline.addListener(eventsListener)
// Restart the timeline to the first sent event, and paginate in both direction // Restart the timeline to the first sent event
bobTimeline.restartWithEventId(firstMessageFromAliceId) bobTimeline.restartWithEventId(firstMessageFromAliceId)
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 50)
bobTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock) commonTestHelper.await(lock)
bobTimeline.removeAllListeners() bobTimeline.removeAllListeners()

View File

@ -0,0 +1,105 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import kotlinx.coroutines.runBlocking
import org.amshove.kluent.internal.assertEquals
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.MethodSorters
import org.matrix.android.sdk.InstrumentedTest
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.events.model.isTextMessage
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageTextContent
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.common.CommonTestHelper
import org.matrix.android.sdk.common.CryptoTestHelper
import org.matrix.android.sdk.common.TestConstants
@RunWith(JUnit4::class)
@FixMethodOrder(MethodSorters.JVM)
class TimelineSimpleBackPaginationTest : InstrumentedTest {
private val commonTestHelper = CommonTestHelper(context())
private val cryptoTestHelper = CryptoTestHelper(commonTestHelper)
@Test
fun timeline_backPaginate_shouldReachEndOfTimeline() {
val numberOfMessagesToSent = 200
val cryptoTestData = cryptoTestHelper.doE2ETestWithAliceAndBobInARoom(false)
val aliceSession = cryptoTestData.firstSession
val bobSession = cryptoTestData.secondSession!!
val roomId = cryptoTestData.roomId
aliceSession.cryptoService().setWarnOnUnknownDevices(false)
bobSession.cryptoService().setWarnOnUnknownDevices(false)
val roomFromAlicePOV = aliceSession.getRoom(roomId)!!
val roomFromBobPOV = bobSession.getRoom(roomId)!!
// Alice sends X messages
val message = "Message from Alice"
commonTestHelper.sendTextMessage(
roomFromAlicePOV,
message,
numberOfMessagesToSent)
val bobTimeline = roomFromBobPOV.createTimeline(null, TimelineSettings(30))
bobTimeline.start()
commonTestHelper.waitWithLatch(timeout = TestConstants.timeOutMillis * 10) {
val listener = object : Timeline.Listener {
override fun onStateUpdated(direction: Timeline.Direction, state: Timeline.PaginationState) {
if (direction == Timeline.Direction.FORWARDS)
return
if (state.hasMoreToLoad && !state.loading) {
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 30)
} else if (!state.hasMoreToLoad) {
bobTimeline.removeListener(this)
it.countDown()
}
}
}
bobTimeline.addListener(listener)
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 30)
}
assertEquals(false, bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS))
assertEquals(false, bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS))
val onlySentEvents = runBlocking {
bobTimeline.awaitSnapshot()
}
.filter {
it.root.isTextMessage()
}.filter {
(it.root.content.toModel<MessageTextContent>())?.body?.startsWith(message).orFalse()
}
assertEquals(numberOfMessagesToSent, onlySentEvents.size)
bobTimeline.dispose()
cryptoTestData.cleanUp(commonTestHelper)
}
}

View File

@ -1,84 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.InstrumentedTest
internal class TimelineTest : InstrumentedTest {
companion object {
private const val ROOM_ID = "roomId"
}
private lateinit var monarchy: Monarchy
// @Before
// fun setup() {
// Timber.plant(Timber.DebugTree())
// Realm.init(context())
// val testConfiguration = RealmConfiguration.Builder().name("test-realm")
// .modules(SessionRealmModule()).build()
//
// Realm.deleteRealm(testConfiguration)
// monarchy = Monarchy.Builder().setRealmConfiguration(testConfiguration).build()
// RoomDataHelper.fakeInitialSync(monarchy, ROOM_ID)
// }
//
// private fun createTimeline(initialEventId: String? = null): Timeline {
// val taskExecutor = TaskExecutor(testCoroutineDispatchers)
// val tokenChunkEventPersistor = TokenChunkEventPersistor(monarchy)
// val paginationTask = FakePaginationTask @Inject constructor(tokenChunkEventPersistor)
// val getContextOfEventTask = FakeGetContextOfEventTask @Inject constructor(tokenChunkEventPersistor)
// val roomMemberExtractor = SenderRoomMemberExtractor(ROOM_ID)
// val timelineEventFactory = TimelineEventFactory(roomMemberExtractor, EventRelationExtractor())
// return DefaultTimeline(
// ROOM_ID,
// initialEventId,
// monarchy.realmConfiguration,
// taskExecutor,
// getContextOfEventTask,
// timelineEventFactory,
// paginationTask,
// null)
// }
//
// @Test
// fun backPaginate_shouldLoadMoreEvents_whenPaginateIsCalled() {
// val timeline = createTimeline()
// timeline.start()
// val paginationCount = 30
// var initialLoad = 0
// val latch = CountDownLatch(2)
// var timelineEvents: List<TimelineEvent> = emptyList()
// timeline.listener = object : Timeline.Listener {
// override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
// if (snapshot.isNotEmpty()) {
// if (initialLoad == 0) {
// initialLoad = snapshot.size
// }
// timelineEvents = snapshot
// latch.countDown()
// timeline.paginate(Timeline.Direction.BACKWARDS, paginationCount)
// }
// }
// }
// latch.await()
// timelineEvents.size shouldBeEqualTo initialLoad + paginationCount
// timeline.dispose()
// }
}

View File

@ -70,13 +70,28 @@ interface Timeline {
*/ */
fun paginate(direction: Direction, count: Int) fun paginate(direction: Direction, count: Int)
/**
* This is the same than the regular paginate method but waits for the results instead
* of relying on the timeline listener.
* Note that it will still trigger onTimelineUpdated internally.
*/
suspend fun awaitPaginate(direction: Direction, count: Int): List<TimelineEvent>
/** /**
* Returns the index of a built event or null. * Returns the index of a built event or null.
*/ */
fun getIndexOfEvent(eventId: String?): Int? fun getIndexOfEvent(eventId: String?): Int?
/**
* Returns the current pagination state for the direction.
*/
fun getPaginationState(direction: Direction): PaginationState fun getPaginationState(direction: Direction): PaginationState
/**
* Returns a snapshot of the timeline in his current state.
*/
suspend fun awaitSnapshot(): List<TimelineEvent>
interface Listener { interface Listener {
/** /**
* Call when the timeline has been updated through pagination or sync. * Call when the timeline has been updated through pagination or sync.

View File

@ -29,6 +29,7 @@ import kotlinx.coroutines.withContext
import okhttp3.internal.closeQuietly import okhttp3.internal.closeQuietly
import org.matrix.android.sdk.api.extensions.tryOrNull import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
@ -47,7 +48,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
private val realmConfiguration: RealmConfiguration, private val realmConfiguration: RealmConfiguration,
private val loadRoomMembersTask: LoadRoomMembersTask, private val loadRoomMembersTask: LoadRoomMembersTask,
private val readReceiptHandler: ReadReceiptHandler, private val readReceiptHandler: ReadReceiptHandler,
settings: TimelineSettings, private val settings: TimelineSettings,
paginationTask: PaginationTask, paginationTask: PaginationTask,
getEventTask: GetContextOfEventTask, getEventTask: GetContextOfEventTask,
fetchTokenAndPaginateTask: FetchTokenAndPaginateTask, fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
@ -83,6 +84,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
realm = backgroundRealm, realm = backgroundRealm,
getContextOfEventTask = getEventTask, getContextOfEventTask = getEventTask,
threadsAwarenessHandler = threadsAwarenessHandler, threadsAwarenessHandler = threadsAwarenessHandler,
onLimitedTimeline = this::onLimitedTimeline,
onEventsUpdated = this::postSnapshot, onEventsUpdated = this::postSnapshot,
onNewTimelineEvents = this::onNewTimelineEvents onNewTimelineEvents = this::onNewTimelineEvents
) )
@ -108,7 +110,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
override fun start() { override fun start() {
timelineScope.launch { timelineScope.launch {
loadRoomMemberIfNeeded() loadRoomMembersIfNeeded()
} }
timelineScope.launch { timelineScope.launch {
sequencer.post { sequencer.post {
@ -147,10 +149,21 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
override fun paginate(direction: Timeline.Direction, count: Int) { override fun paginate(direction: Timeline.Direction, count: Int) {
timelineScope.launch { timelineScope.launch {
loadMore(count.toLong(), direction) loadMore(count, direction, fetchOnServerIfNeeded = true)
} }
} }
override suspend fun awaitPaginate(direction: Timeline.Direction, count: Int): List<TimelineEvent> {
withContext(timelineDispatcher) {
loadMore(count, direction, fetchOnServerIfNeeded = true)
}
return awaitSnapshot()
}
override suspend fun awaitSnapshot(): List<TimelineEvent> = withContext(timelineDispatcher) {
strategy.buildSnapshot()
}
override fun getIndexOfEvent(eventId: String?): Int? { override fun getIndexOfEvent(eventId: String?): Int? {
if (eventId == null) return null if (eventId == null) return null
return strategy.getBuiltEventIndex(eventId) return strategy.getBuiltEventIndex(eventId)
@ -164,8 +177,8 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
}.get() }.get()
} }
private suspend fun loadMore(count: Long, direction: Timeline.Direction) { private suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean) {
val baseLogMessage = "loadMore(count: $count, direction: $direction, roomId: $roomId)" val baseLogMessage = "loadMore(count: $count, direction: $direction, roomId: $roomId, fetchOnServer: $fetchOnServerIfNeeded)"
Timber.v("$baseLogMessage started") Timber.v("$baseLogMessage started")
if (!isStarted.get()) { if (!isStarted.get()) {
throw IllegalStateException("You should call start before using timeline") throw IllegalStateException("You should call start before using timeline")
@ -182,7 +195,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
updateState(direction) { updateState(direction) {
it.copy(loading = true) it.copy(loading = true)
} }
val loadMoreResult = strategy.loadMore(count, direction) val loadMoreResult = strategy.loadMore(count, direction, fetchOnServerIfNeeded)
Timber.v("$baseLogMessage: result $loadMoreResult") Timber.v("$baseLogMessage: result $loadMoreResult")
val hasMoreToLoad = loadMoreResult != LoadMoreResult.REACHED_END val hasMoreToLoad = loadMoreResult != LoadMoreResult.REACHED_END
updateState(direction) { updateState(direction) {
@ -202,13 +215,29 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
} else { } else {
buildStrategy(LoadTimelineStrategy.Mode.Permalink(eventId)) buildStrategy(LoadTimelineStrategy.Mode.Permalink(eventId))
} }
initPaginationStates(eventId)
strategy.onStart()
loadMore(
count = strategyDependencies.timelineSettings.initialSize,
direction = Timeline.Direction.BACKWARDS,
fetchOnServerIfNeeded = false
)
}
private suspend fun initPaginationStates(eventId: String?) {
updateState(Timeline.Direction.FORWARDS) { updateState(Timeline.Direction.FORWARDS) {
it.copy(loading = false, hasMoreToLoad = eventId != null) it.copy(loading = false, hasMoreToLoad = eventId != null)
} }
updateState(Timeline.Direction.BACKWARDS) { updateState(Timeline.Direction.BACKWARDS) {
it.copy(loading = false, hasMoreToLoad = true) it.copy(loading = false, hasMoreToLoad = true)
} }
strategy.onStart() }
private fun onLimitedTimeline() {
timelineScope.launch {
initPaginationStates(null)
loadMore(settings.initialSize, Timeline.Direction.BACKWARDS, false)
}
} }
private fun postSnapshot() { private fun postSnapshot() {
@ -239,10 +268,15 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
val currentValue = stateReference.get() val currentValue = stateReference.get()
val newValue = update(currentValue) val newValue = update(currentValue)
stateReference.set(newValue) stateReference.set(newValue)
withContext(Dispatchers.Main) { if (newValue != currentValue) {
listeners.forEach { postPaginationState(direction, newValue)
tryOrNull { it.onStateUpdated(direction, newValue) } }
} }
private suspend fun postPaginationState(direction: Timeline.Direction, state: Timeline.PaginationState) = withContext(Dispatchers.Main) {
Timber.v("Post $direction pagination state: $state ")
listeners.forEach {
tryOrNull { it.onStateUpdated(direction, state) }
} }
} }
@ -255,14 +289,14 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
) )
} }
private suspend fun loadRoomMemberIfNeeded() { private suspend fun loadRoomMembersIfNeeded() {
val loadRoomMembersParam = LoadRoomMembersTask.Params(roomId) val loadRoomMembersParam = LoadRoomMembersTask.Params(roomId)
try { try {
loadRoomMembersTask.execute(loadRoomMembersParam) loadRoomMembersTask.execute(loadRoomMembersParam)
} catch (failure: Throwable) { } catch (failure: Throwable) {
Timber.v("Failed to load room members. Retry in 10s.") Timber.v("Failed to load room members. Retry in 10s.")
delay(10_000L) delay(10_000L)
loadRoomMemberIfNeeded() loadRoomMembersIfNeeded()
} }
} }

View File

@ -20,7 +20,9 @@ import io.realm.OrderedCollectionChangeSet
import io.realm.OrderedRealmCollectionChangeListener import io.realm.OrderedRealmCollectionChangeListener
import io.realm.Realm import io.realm.Realm
import io.realm.RealmResults import io.realm.RealmResults
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.Timeline
@ -73,18 +75,27 @@ internal class LoadTimelineStrategy(
val timelineEventMapper: TimelineEventMapper, val timelineEventMapper: TimelineEventMapper,
val threadsAwarenessHandler: ThreadsAwarenessHandler, val threadsAwarenessHandler: ThreadsAwarenessHandler,
val onEventsUpdated: () -> Unit, val onEventsUpdated: () -> Unit,
val onLimitedTimeline: () -> Unit,
val onNewTimelineEvents: (List<String>) -> Unit val onNewTimelineEvents: (List<String>) -> Unit
) )
private var getContextLatch: CompletableDeferred<Unit>? = null
private var chunkEntity: RealmResults<ChunkEntity>? = null private var chunkEntity: RealmResults<ChunkEntity>? = null
private var timelineChunk: TimelineChunk? = null private var timelineChunk: TimelineChunk? = null
private val chunkEntityListener = OrderedRealmCollectionChangeListener { _: RealmResults<ChunkEntity>, changeSet: OrderedCollectionChangeSet -> private val chunkEntityListener = OrderedRealmCollectionChangeListener { _: RealmResults<ChunkEntity>, changeSet: OrderedCollectionChangeSet ->
// Can be call either when you open a permalink on an unknown event
// or when there is a gap in the timeline.
val shouldRebuildChunk = changeSet.insertions.isNotEmpty() val shouldRebuildChunk = changeSet.insertions.isNotEmpty()
if (shouldRebuildChunk) { if (shouldRebuildChunk) {
timelineChunk?.close(closeNext = true, closePrev = true) timelineChunk?.close(closeNext = true, closePrev = true)
timelineChunk = chunkEntity?.createTimelineChunk() timelineChunk = chunkEntity?.createTimelineChunk()
dependencies.onEventsUpdated() // If we are waiting for a result of get context, post completion
getContextLatch?.complete(Unit)
// If we have a gap, just tell the timeline about it.
if (timelineChunk?.hasReachedLastForward().orFalse()) {
dependencies.onLimitedTimeline()
}
} }
} }
@ -95,6 +106,7 @@ internal class LoadTimelineStrategy(
} }
private val timelineInputListener = object : TimelineInput.Listener { private val timelineInputListener = object : TimelineInput.Listener {
override fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) { override fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) {
if (roomId != this@LoadTimelineStrategy.roomId) { if (roomId != this@LoadTimelineStrategy.roomId) {
return return
@ -130,7 +142,7 @@ internal class LoadTimelineStrategy(
onEventsUpdated = dependencies.onEventsUpdated onEventsUpdated = dependencies.onEventsUpdated
) )
suspend fun onStart() { fun onStart() {
dependencies.eventDecryptor.start() dependencies.eventDecryptor.start()
dependencies.timelineInput.listeners.add(timelineInputListener) dependencies.timelineInput.listeners.add(timelineInputListener)
val realm = dependencies.realm.get() val realm = dependencies.realm.get()
@ -139,9 +151,6 @@ internal class LoadTimelineStrategy(
it.addChangeListener(chunkEntityListener) it.addChangeListener(chunkEntityListener)
timelineChunk = it.createTimelineChunk() timelineChunk = it.createTimelineChunk()
} }
if (mode is Mode.Live) {
loadMore(dependencies.timelineSettings.initialSize.toLong(), Timeline.Direction.BACKWARDS)
}
} }
fun onStop() { fun onStop() {
@ -150,22 +159,25 @@ internal class LoadTimelineStrategy(
chunkEntity?.removeChangeListener(chunkEntityListener) chunkEntity?.removeChangeListener(chunkEntityListener)
sendingEventsDataSource.stop() sendingEventsDataSource.stop()
timelineChunk?.close(closeNext = true, closePrev = true) timelineChunk?.close(closeNext = true, closePrev = true)
getContextLatch?.cancel()
chunkEntity = null chunkEntity = null
timelineChunk = null timelineChunk = null
} }
suspend fun loadMore(count: Long, direction: Timeline.Direction): LoadMoreResult { suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean = true): LoadMoreResult {
return if (mode is Mode.Permalink && timelineChunk == null) { if (mode is Mode.Permalink && timelineChunk == null) {
val params = GetContextOfEventTask.Params(roomId, mode.originEventId) val params = GetContextOfEventTask.Params(roomId, mode.originEventId)
try { try {
getContextLatch = CompletableDeferred()
dependencies.getContextOfEventTask.execute(params) dependencies.getContextOfEventTask.execute(params)
LoadMoreResult.SUCCESS // waits for the query to be fulfilled
getContextLatch?.await()
getContextLatch = null
} catch (failure: Throwable) { } catch (failure: Throwable) {
LoadMoreResult.FAILURE return LoadMoreResult.FAILURE
} }
} else {
timelineChunk?.loadMore(count, direction) ?: LoadMoreResult.FAILURE
} }
return timelineChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
} }
fun getBuiltEventIndex(eventId: String): Int? { fun getBuiltEventIndex(eventId: String): Int? {
@ -198,7 +210,7 @@ internal class LoadTimelineStrategy(
} }
} }
private fun hasReachedLastForward(): Boolean{ private fun hasReachedLastForward(): Boolean {
return timelineChunk?.hasReachedLastForward().orFalse() return timelineChunk?.hasReachedLastForward().orFalse()
} }

View File

@ -22,10 +22,11 @@ import io.realm.RealmObjectChangeListener
import io.realm.RealmQuery import io.realm.RealmQuery
import io.realm.RealmResults import io.realm.RealmResults
import io.realm.Sort import io.realm.Sort
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.extensions.tryOrNull import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
@ -40,11 +41,6 @@ import timber.log.Timber
import java.util.Collections import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
/**
* This is the value used to fetch on server. It's better to make constant as otherwise we can have weird chunks with disparate and small chunk of data.
*/
private const val PAGINATION_COUNT = 50
/** /**
* This is a wrapper around a ChunkEntity in the database. * This is a wrapper around a ChunkEntity in the database.
* It does mainly listen to the db timeline events. * It does mainly listen to the db timeline events.
@ -65,6 +61,9 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
private val onBuiltEvents: () -> Unit) { private val onBuiltEvents: () -> Unit) {
private val isLastForward = AtomicBoolean(chunkEntity.isLastForward) private val isLastForward = AtomicBoolean(chunkEntity.isLastForward)
private val isLastBackward = AtomicBoolean(chunkEntity.isLastBackward)
private var prevChunkLatch: CompletableDeferred<Unit>? = null
private var nextChunkLatch: CompletableDeferred<Unit>? = null
private val chunkObjectListener = RealmObjectChangeListener<ChunkEntity> { _, changeSet -> private val chunkObjectListener = RealmObjectChangeListener<ChunkEntity> { _, changeSet ->
if (changeSet == null) return@RealmObjectChangeListener if (changeSet == null) return@RealmObjectChangeListener
@ -75,17 +74,16 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
if (changeSet.isFieldChanged(ChunkEntityFields.IS_LAST_FORWARD)) { if (changeSet.isFieldChanged(ChunkEntityFields.IS_LAST_FORWARD)) {
isLastForward.set(chunkEntity.isLastForward) isLastForward.set(chunkEntity.isLastForward)
} }
if (changeSet.isFieldChanged(ChunkEntityFields.IS_LAST_BACKWARD)) {
isLastBackward.set(chunkEntity.isLastBackward)
}
if (changeSet.isFieldChanged(ChunkEntityFields.NEXT_CHUNK.`$`)) { if (changeSet.isFieldChanged(ChunkEntityFields.NEXT_CHUNK.`$`)) {
nextChunk = createTimelineChunk(chunkEntity.nextChunk) nextChunk = createTimelineChunk(chunkEntity.nextChunk)
timelineScope.launch { nextChunkLatch?.complete(Unit)
nextChunk?.loadMore(PAGINATION_COUNT.toLong(), Timeline.Direction.FORWARDS)
}
} }
if (changeSet.isFieldChanged(ChunkEntityFields.PREV_CHUNK.`$`)) { if (changeSet.isFieldChanged(ChunkEntityFields.PREV_CHUNK.`$`)) {
prevChunk = createTimelineChunk(chunkEntity.prevChunk) prevChunk = createTimelineChunk(chunkEntity.prevChunk)
timelineScope.launch { prevChunkLatch?.complete(Unit)
prevChunk?.loadMore(PAGINATION_COUNT.toLong(), Timeline.Direction.BACKWARDS)
}
} }
} }
@ -128,42 +126,63 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
return deepBuiltItems return deepBuiltItems
} }
suspend fun loadMore(count: Long, direction: Timeline.Direction): LoadMoreResult { /**
val loadFromDbCount = loadFromDb(count, direction) * This will take care of loading and building events of this chunk for the given direction and count.
Timber.v("Has loaded $loadFromDbCount items from db") * If @param fetchFromServerIfNeeded is true, it will try to fetch more events on server to get the right amount of data.
* This method will also post a snapshot as soon the data is built from db to avoid waiting for server response.
*/
suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean = true): LoadMoreResult {
if (direction == Timeline.Direction.FORWARDS && nextChunk != null) {
return nextChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
} else if (direction == Timeline.Direction.BACKWARDS && prevChunk != null) {
return prevChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
}
val loadFromDbCount = loadFromStorage(count, direction)
Timber.v("Has loaded $loadFromDbCount items from storage")
val offsetCount = count - loadFromDbCount val offsetCount = count - loadFromDbCount
// We have built the right amount of data return if (direction == Timeline.Direction.FORWARDS && isLastForward.get()) {
return if (offsetCount == 0L) { LoadMoreResult.REACHED_END
} else if (direction == Timeline.Direction.BACKWARDS && isLastBackward.get()) {
LoadMoreResult.REACHED_END
} else if (offsetCount == 0) {
LoadMoreResult.SUCCESS LoadMoreResult.SUCCESS
} else { } else {
delegateLoadMore(offsetCount, direction) delegateLoadMore(fetchOnServerIfNeeded, offsetCount, direction)
} }
} }
private suspend fun delegateLoadMore(offsetCount: Long, direction: Timeline.Direction): LoadMoreResult { private suspend fun delegateLoadMore(fetchFromServerIfNeeded: Boolean, offsetCount: Int, direction: Timeline.Direction): LoadMoreResult {
return if (direction == Timeline.Direction.FORWARDS) { return if (direction == Timeline.Direction.FORWARDS) {
val nextChunkEntity = chunkEntity.nextChunk val nextChunkEntity = chunkEntity.nextChunk
if (nextChunkEntity == null) { when {
// Fetch next chunk from server if not in the db nextChunkEntity != null -> {
fetchFromServer(chunkEntity.nextToken, direction) if (nextChunk == null) {
} else { nextChunk = createTimelineChunk(nextChunkEntity)
// otherwise we delegate to the next chunk }
if (nextChunk == null) { nextChunk?.loadMore(offsetCount, direction, fetchFromServerIfNeeded) ?: LoadMoreResult.FAILURE
nextChunk = createTimelineChunk(nextChunkEntity) }
fetchFromServerIfNeeded -> {
fetchFromServer(offsetCount, chunkEntity.nextToken, direction)
}
else -> {
LoadMoreResult.SUCCESS
} }
nextChunk?.loadMore(offsetCount, direction) ?: LoadMoreResult.FAILURE
} }
} else { } else {
val prevChunkEntity = chunkEntity.prevChunk val prevChunkEntity = chunkEntity.prevChunk
if (prevChunkEntity == null) { when {
// Fetch prev chunk from server if not in the db prevChunkEntity != null -> {
fetchFromServer(chunkEntity.prevToken, direction) if (prevChunk == null) {
} else { prevChunk = createTimelineChunk(prevChunkEntity)
// otherwise we delegate to the prev chunk }
if (prevChunk == null) { prevChunk?.loadMore(offsetCount, direction, fetchFromServerIfNeeded) ?: LoadMoreResult.FAILURE
prevChunk = createTimelineChunk(prevChunkEntity) }
fetchFromServerIfNeeded -> {
fetchFromServer(offsetCount, chunkEntity.prevToken, direction)
}
else -> {
LoadMoreResult.SUCCESS
} }
prevChunk?.loadMore(offsetCount, direction) ?: LoadMoreResult.FAILURE
} }
} }
} }
@ -239,7 +258,9 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
prevChunk?.close(closeNext = false, closePrev = true) prevChunk?.close(closeNext = false, closePrev = true)
} }
nextChunk = null nextChunk = null
nextChunkLatch?.cancel()
prevChunk = null prevChunk = null
prevChunkLatch?.cancel()
chunkEntity.removeChangeListener(chunkObjectListener) chunkEntity.removeChangeListener(chunkObjectListener)
timelineEventEntities.removeChangeListener(timelineEventCollectionListener) timelineEventEntities.removeChangeListener(timelineEventCollectionListener)
} }
@ -247,7 +268,7 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
/** /**
* This method tries to read events from the current chunk. * This method tries to read events from the current chunk.
*/ */
private suspend fun loadFromDb(count: Long, direction: Timeline.Direction): Long { private suspend fun loadFromStorage(count: Int, direction: Timeline.Direction): Int {
val displayIndex = getNextDisplayIndex(direction) ?: return 0 val displayIndex = getNextDisplayIndex(direction) ?: return 0
val baseQuery = timelineEventEntities.where() val baseQuery = timelineEventEntities.where()
val timelineEvents = baseQuery.offsets(direction, count, displayIndex).findAll().orEmpty() val timelineEvents = baseQuery.offsets(direction, count, displayIndex).findAll().orEmpty()
@ -259,6 +280,9 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
timelineEvents timelineEvents
.mapIndexed { index, timelineEventEntity -> .mapIndexed { index, timelineEventEntity ->
val timelineEvent = timelineEventEntity.buildAndDecryptIfNeeded() val timelineEvent = timelineEventEntity.buildAndDecryptIfNeeded()
if (timelineEvent.root.type == EventType.STATE_ROOM_CREATE) {
isLastBackward.set(true)
}
if (direction == Timeline.Direction.FORWARDS) { if (direction == Timeline.Direction.FORWARDS) {
builtEventsIndexes[timelineEvent.eventId] = index builtEventsIndexes[timelineEvent.eventId] = index
builtEvents.add(index, timelineEvent) builtEvents.add(index, timelineEvent)
@ -268,7 +292,7 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
} }
} }
onBuiltEvents() onBuiltEvents()
return timelineEvents.size.toLong() return timelineEvents.size
} }
/** /**
@ -309,23 +333,35 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
* It will take care to update the database by inserting new events and linking new chunk * It will take care to update the database by inserting new events and linking new chunk
* with this one. * with this one.
*/ */
private suspend fun fetchFromServer(token: String?, direction: Timeline.Direction): LoadMoreResult { private suspend fun fetchFromServer(count: Int, token: String?, direction: Timeline.Direction): LoadMoreResult {
val paginationResult = try { val latch = if (direction == Timeline.Direction.FORWARDS) {
nextChunkLatch = CompletableDeferred()
nextChunkLatch
} else {
prevChunkLatch = CompletableDeferred()
prevChunkLatch
}
val loadMoreResult = try {
if (token == null) { if (token == null) {
if (direction == Timeline.Direction.BACKWARDS || !chunkEntity.hasBeenALastForwardChunk()) return LoadMoreResult.REACHED_END if (direction == Timeline.Direction.BACKWARDS || !chunkEntity.hasBeenALastForwardChunk()) return LoadMoreResult.REACHED_END
val lastKnownEventId = chunkEntity.sortedTimelineEvents().firstOrNull()?.eventId ?: return LoadMoreResult.FAILURE val lastKnownEventId = chunkEntity.sortedTimelineEvents().firstOrNull()?.eventId ?: return LoadMoreResult.FAILURE
val taskParams = FetchTokenAndPaginateTask.Params(roomId, lastKnownEventId, direction.toPaginationDirection(), PAGINATION_COUNT) val taskParams = FetchTokenAndPaginateTask.Params(roomId, lastKnownEventId, direction.toPaginationDirection(), count)
fetchTokenAndPaginateTask.execute(taskParams) fetchTokenAndPaginateTask.execute(taskParams).toLoadMoreResult()
} else { } else {
Timber.v("Fetch more events on server") Timber.v("Fetch $count more events on server")
val taskParams = PaginationTask.Params(roomId, token, direction.toPaginationDirection(), PAGINATION_COUNT) val taskParams = PaginationTask.Params(roomId, token, direction.toPaginationDirection(), count)
paginationTask.execute(taskParams) paginationTask.execute(taskParams).toLoadMoreResult()
} }
} catch (failure: Throwable) { } catch (failure: Throwable) {
Timber.e("Failed to fetch from server: $failure", failure) Timber.e("Failed to fetch from server: $failure", failure)
return LoadMoreResult.FAILURE LoadMoreResult.FAILURE
}
return if (loadMoreResult == LoadMoreResult.SUCCESS) {
latch?.await()
loadMore(count, direction, fetchOnServerIfNeeded = false)
} else {
loadMoreResult
} }
return paginationResult.toLoadMoreResult()
} }
private fun TokenChunkEventPersistor.Result.toLoadMoreResult(): LoadMoreResult { private fun TokenChunkEventPersistor.Result.toLoadMoreResult(): LoadMoreResult {
@ -358,6 +394,9 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
.map { it.buildAndDecryptIfNeeded() } .map { it.buildAndDecryptIfNeeded() }
builtEventsIndexes.entries.filter { it.value >= range.startIndex }.forEach { it.setValue(it.value + range.length) } builtEventsIndexes.entries.filter { it.value >= range.startIndex }.forEach { it.setValue(it.value + range.length) }
newItems.mapIndexed { index, timelineEvent -> newItems.mapIndexed { index, timelineEvent ->
if (timelineEvent.root.type == EventType.STATE_ROOM_CREATE) {
isLastBackward.set(true)
}
val correctedIndex = range.startIndex + index val correctedIndex = range.startIndex + index
builtEvents.add(correctedIndex, timelineEvent) builtEvents.add(correctedIndex, timelineEvent)
builtEventsIndexes[timelineEvent.eventId] = correctedIndex builtEventsIndexes[timelineEvent.eventId] = correctedIndex
@ -421,7 +460,7 @@ internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
private fun RealmQuery<TimelineEventEntity>.offsets( private fun RealmQuery<TimelineEventEntity>.offsets(
direction: Timeline.Direction, direction: Timeline.Direction,
count: Long, count: Int,
startDisplayIndex: Int startDisplayIndex: Int
): RealmQuery<TimelineEventEntity> { ): RealmQuery<TimelineEventEntity> {
sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING) sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
@ -430,7 +469,7 @@ private fun RealmQuery<TimelineEventEntity>.offsets(
} else { } else {
greaterThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex) greaterThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
} }
return limit(count) return limit(count.toLong())
} }
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection { private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {

View File

@ -174,7 +174,6 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
private var inSubmitList: Boolean = false private var inSubmitList: Boolean = false
private var hasReachedInvite: Boolean = false private var hasReachedInvite: Boolean = false
private var hasUTD: Boolean = false private var hasUTD: Boolean = false
private var hasReachedCreateEvent: Boolean = false
private var positionOfReadMarker: Int? = null private var positionOfReadMarker: Int? = null
private var partialState: PartialState = PartialState() private var partialState: PartialState = PartialState()
@ -287,7 +286,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
return return
} }
// Avoid displaying two loaders if there is no elements between them // Avoid displaying two loaders if there is no elements between them
val showBackwardsLoader = (!showingForwardLoader || timelineModels.isNotEmpty()) && !hasReachedCreateEvent val showBackwardsLoader = !showingForwardLoader || timelineModels.isNotEmpty()
// We can hide the loader but still add the item to controller so it can trigger backwards pagination // We can hide the loader but still add the item to controller so it can trigger backwards pagination
LoadingItem_() LoadingItem_()
.id("backward_loading_item_$timestamp") .id("backward_loading_item_$timestamp")
@ -302,6 +301,10 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
submitSnapshot(snapshot) submitSnapshot(snapshot)
} }
override fun onStateUpdated(direction: Timeline.Direction, state: Timeline.PaginationState) {
requestDelayedModelBuild(0)
}
private fun submitSnapshot(newSnapshot: List<TimelineEvent>) { private fun submitSnapshot(newSnapshot: List<TimelineEvent>) {
backgroundHandler.post { backgroundHandler.post {
inSubmitList = true inSubmitList = true
@ -456,14 +459,10 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
private fun preprocessReverseEvents() { private fun preprocessReverseEvents() {
receiptsByEvent.clear() receiptsByEvent.clear()
timelineEventsGroups.clear() timelineEventsGroups.clear()
hasReachedCreateEvent = false
val itr = currentSnapshot.listIterator(currentSnapshot.size) val itr = currentSnapshot.listIterator(currentSnapshot.size)
var lastShownEventId: String? = null var lastShownEventId: String? = null
while (itr.hasPrevious()) { while (itr.hasPrevious()) {
val event = itr.previous() val event = itr.previous()
if (!hasReachedCreateEvent && event.root.type == EventType.STATE_ROOM_CREATE) {
hasReachedCreateEvent = true
}
timelineEventsGroups.addOrIgnore(event) timelineEventsGroups.addOrIgnore(event)
val currentReadReceipts = ArrayList(event.readReceipts).filter { val currentReadReceipts = ArrayList(event.readReceipts).filter {
it.user.userId != session.myUserId it.user.userId != session.myUserId