- Integrate /relations API to create a live thread timeline
This commit is contained in:
parent
fd89acc112
commit
49b7726ac8
@ -62,7 +62,11 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val fakeEvent = createFakeMessageEvent().toEntity(ROOM_ID, SendState.SYNCED, System.currentTimeMillis()).let {
|
||||
realm.copyToRealm(it)
|
||||
}
|
||||
chunk.addTimelineEvent(ROOM_ID, fakeEvent, PaginationDirection.FORWARDS, emptyMap())
|
||||
chunk.addTimelineEvent(
|
||||
roomId = ROOM_ID,
|
||||
eventEntity = fakeEvent,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
roomMemberContentsByUser = emptyMap())
|
||||
chunk.timelineEvents.size shouldBeEqualTo 1
|
||||
}
|
||||
}
|
||||
@ -74,8 +78,16 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val fakeEvent = createFakeMessageEvent().toEntity(ROOM_ID, SendState.SYNCED, System.currentTimeMillis()).let {
|
||||
realm.copyToRealm(it)
|
||||
}
|
||||
chunk.addTimelineEvent(ROOM_ID, fakeEvent, PaginationDirection.FORWARDS, emptyMap())
|
||||
chunk.addTimelineEvent(ROOM_ID, fakeEvent, PaginationDirection.FORWARDS, emptyMap())
|
||||
chunk.addTimelineEvent(
|
||||
roomId = ROOM_ID,
|
||||
eventEntity = fakeEvent,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
roomMemberContentsByUser = emptyMap())
|
||||
chunk.addTimelineEvent(
|
||||
roomId = ROOM_ID,
|
||||
eventEntity = fakeEvent,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
roomMemberContentsByUser = emptyMap())
|
||||
chunk.timelineEvents.size shouldBeEqualTo 1
|
||||
}
|
||||
}
|
||||
@ -144,7 +156,11 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val fakeEvent = event.toEntity(roomId, SendState.SYNCED, System.currentTimeMillis()).let {
|
||||
realm.copyToRealm(it)
|
||||
}
|
||||
addTimelineEvent(roomId, fakeEvent, direction, emptyMap())
|
||||
addTimelineEvent(
|
||||
roomId = roomId,
|
||||
eventEntity = fakeEvent,
|
||||
direction = direction,
|
||||
roomMemberContentsByUser = emptyMap())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,7 +201,11 @@ data class Event(
|
||||
*/
|
||||
fun getDecryptedTextSummary(): String? {
|
||||
if (isRedacted()) return "Message Deleted"
|
||||
val text = getDecryptedValue() ?: return null
|
||||
val text = getDecryptedValue() ?: run {
|
||||
if (isPoll()) {return getPollQuestion() ?: "created a poll."}
|
||||
return null
|
||||
}
|
||||
|
||||
return when {
|
||||
isReplyRenderedInThread() || isQuote() -> ContentUtils.extractUsefulTextFromReply(text)
|
||||
isFileMessage() -> "sent a file."
|
||||
|
@ -54,6 +54,7 @@ data class TimelineEvent(
|
||||
* It's not unique on the timeline as it's reset on each chunk.
|
||||
*/
|
||||
val displayIndex: Int,
|
||||
var ownedByThreadChunk: Boolean = false,
|
||||
val senderInfo: SenderInfo,
|
||||
val annotations: EventAnnotationsSummary? = null,
|
||||
val readReceipts: List<ReadReceipt> = emptyList()
|
||||
|
@ -57,7 +57,7 @@ internal class RealmSessionStoreMigration @Inject constructor(
|
||||
) : RealmMigration {
|
||||
|
||||
companion object {
|
||||
const val SESSION_STORE_SCHEMA_VERSION = 24L
|
||||
const val SESSION_STORE_SCHEMA_VERSION = 26L
|
||||
}
|
||||
|
||||
/**
|
||||
@ -94,6 +94,7 @@ internal class RealmSessionStoreMigration @Inject constructor(
|
||||
if (oldVersion <= 21) migrateTo22(realm)
|
||||
if (oldVersion <= 22) migrateTo23(realm)
|
||||
if (oldVersion <= 23) migrateTo24(realm)
|
||||
if (oldVersion <= 24) migrateTo25(realm)
|
||||
}
|
||||
|
||||
private fun migrateTo1(realm: DynamicRealm) {
|
||||
@ -489,4 +490,15 @@ internal class RealmSessionStoreMigration @Inject constructor(
|
||||
?.addField(PreviewUrlCacheEntityFields.IMAGE_HEIGHT, Int::class.java)
|
||||
?.setNullable(PreviewUrlCacheEntityFields.IMAGE_HEIGHT, true)
|
||||
}
|
||||
|
||||
private fun migrateTo25(realm: DynamicRealm){
|
||||
Timber.d("Step 24 -> 25")
|
||||
realm.schema.get("ChunkEntity")
|
||||
?.addField(ChunkEntityFields.ROOT_THREAD_EVENT_ID, String::class.java, FieldAttribute.INDEXED)
|
||||
?.addField(ChunkEntityFields.IS_LAST_FORWARD_THREAD, Boolean::class.java, FieldAttribute.INDEXED)
|
||||
|
||||
realm.schema.get("TimelineEventEntity")
|
||||
?.addField(TimelineEventEntityFields.OWNED_BY_THREAD_CHUNK, Boolean::class.java)
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -82,17 +82,18 @@ internal fun ChunkEntity.addStateEvent(roomId: String, stateEvent: EventEntity,
|
||||
internal fun ChunkEntity.addTimelineEvent(roomId: String,
|
||||
eventEntity: EventEntity,
|
||||
direction: PaginationDirection,
|
||||
roomMemberContentsByUser: Map<String, RoomMemberContent?>? = null) {
|
||||
ownedByThreadChunk: Boolean = false,
|
||||
roomMemberContentsByUser: Map<String, RoomMemberContent?>? = null): TimelineEventEntity? {
|
||||
val eventId = eventEntity.eventId
|
||||
if (timelineEvents.find(eventId) != null) {
|
||||
return
|
||||
return null
|
||||
}
|
||||
val displayIndex = nextDisplayIndex(direction)
|
||||
val localId = TimelineEventEntity.nextId(realm)
|
||||
val senderId = eventEntity.sender ?: ""
|
||||
|
||||
// Update RR for the sender of a new message with a dummy one
|
||||
val readReceiptsSummaryEntity = handleReadReceipts(realm, roomId, eventEntity, senderId)
|
||||
val readReceiptsSummaryEntity = if (!ownedByThreadChunk) handleReadReceipts(realm, roomId, eventEntity, senderId) else null
|
||||
val timelineEventEntity = realm.createObject<TimelineEventEntity>().apply {
|
||||
this.localId = localId
|
||||
this.root = eventEntity
|
||||
@ -102,6 +103,7 @@ internal fun ChunkEntity.addTimelineEvent(roomId: String,
|
||||
?.also { it.cleanUp(eventEntity.sender) }
|
||||
this.readReceipts = readReceiptsSummaryEntity
|
||||
this.displayIndex = displayIndex
|
||||
this.ownedByThreadChunk = ownedByThreadChunk
|
||||
val roomMemberContent = roomMemberContentsByUser?.get(senderId)
|
||||
this.senderAvatar = roomMemberContent?.avatarUrl
|
||||
this.senderName = roomMemberContent?.displayName
|
||||
@ -113,6 +115,7 @@ internal fun ChunkEntity.addTimelineEvent(roomId: String,
|
||||
}
|
||||
// numberOfTimelineEvents++
|
||||
timelineEvents.add(timelineEventEntity)
|
||||
return timelineEventEntity
|
||||
}
|
||||
|
||||
private fun computeIsUnique(
|
||||
|
@ -98,6 +98,7 @@ internal fun EventEntity.threadSummaryInThread(realm: Realm, rootThreadEventId:
|
||||
val messages = TimelineEventEntity
|
||||
.whereRoomId(realm, roomId = roomId)
|
||||
.equalTo(TimelineEventEntityFields.ROOT.ROOT_THREAD_EVENT_ID, rootThreadEventId)
|
||||
.distinct(TimelineEventEntityFields.ROOT.EVENT_ID)
|
||||
.count()
|
||||
.toInt()
|
||||
|
||||
@ -156,6 +157,7 @@ internal fun TimelineEventEntity.Companion.findAllThreadsForRoomId(realm: Realm,
|
||||
TimelineEventEntity
|
||||
.whereRoomId(realm, roomId = roomId)
|
||||
.equalTo(TimelineEventEntityFields.ROOT.IS_ROOT_THREAD, true)
|
||||
.equalTo(TimelineEventEntityFields.OWNED_BY_THREAD_CHUNK, false)
|
||||
.sort("${TimelineEventEntityFields.ROOT.THREAD_SUMMARY_LATEST_MESSAGE}.${TimelineEventEntityFields.ROOT.ORIGIN_SERVER_TS}", Sort.DESCENDING)
|
||||
|
||||
/**
|
||||
|
@ -46,6 +46,7 @@ internal class TimelineEventMapper @Inject constructor(private val readReceiptsS
|
||||
isUniqueDisplayName = timelineEventEntity.isUniqueDisplayName,
|
||||
avatarUrl = timelineEventEntity.senderAvatar
|
||||
),
|
||||
ownedByThreadChunk = timelineEventEntity.ownedByThreadChunk,
|
||||
readReceipts = readReceipts
|
||||
?.distinctBy {
|
||||
it.user
|
||||
|
@ -23,6 +23,7 @@ import io.realm.annotations.Index
|
||||
import io.realm.annotations.LinkingObjects
|
||||
import org.matrix.android.sdk.internal.extensions.assertIsManaged
|
||||
import org.matrix.android.sdk.internal.extensions.clearWith
|
||||
import timber.log.Timber
|
||||
|
||||
internal open class ChunkEntity(@Index var prevToken: String? = null,
|
||||
// Because of gaps we can have several chunks with nextToken == null
|
||||
@ -33,7 +34,10 @@ internal open class ChunkEntity(@Index var prevToken: String? = null,
|
||||
var timelineEvents: RealmList<TimelineEventEntity> = RealmList(),
|
||||
// Only one chunk will have isLastForward == true
|
||||
@Index var isLastForward: Boolean = false,
|
||||
@Index var isLastBackward: Boolean = false
|
||||
@Index var isLastBackward: Boolean = false,
|
||||
// Threads
|
||||
@Index var rootThreadEventId: String? = null,
|
||||
@Index var isLastForwardThread: Boolean = false,
|
||||
) : RealmObject() {
|
||||
|
||||
fun identifier() = "${prevToken}_$nextToken"
|
||||
@ -58,3 +62,19 @@ internal fun ChunkEntity.deleteOnCascade(deleteStateEvents: Boolean, canDeleteRo
|
||||
}
|
||||
deleteFromRealm()
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the chunk along with the thread events that were temporarily created
|
||||
*/
|
||||
internal fun ChunkEntity.deleteAndClearThreadEvents() {
|
||||
assertIsManaged()
|
||||
timelineEvents
|
||||
.filter { it.ownedByThreadChunk }
|
||||
.forEach {
|
||||
it.deleteOnCascade(false)
|
||||
}
|
||||
deleteFromRealm()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -32,6 +32,9 @@ internal open class TimelineEventEntity(var localId: Long = 0,
|
||||
var isUniqueDisplayName: Boolean = false,
|
||||
var senderAvatar: String? = null,
|
||||
var senderMembershipEventId: String? = null,
|
||||
// ownedByThreadChunk indicates that the current TimelineEventEntity belongs
|
||||
// to a thread chunk and is a temporarily event.
|
||||
var ownedByThreadChunk: Boolean = false,
|
||||
var readReceipts: ReadReceiptsSummaryEntity? = null
|
||||
) : RealmObject() {
|
||||
|
||||
|
@ -45,10 +45,22 @@ internal fun ChunkEntity.Companion.findLastForwardChunkOfRoom(realm: Realm, room
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD, true)
|
||||
.findFirst()
|
||||
}
|
||||
|
||||
internal fun ChunkEntity.Companion.findLastForwardChunkOfThread(realm: Realm, roomId: String, rootThreadEventId: String): ChunkEntity? {
|
||||
return where(realm, roomId)
|
||||
.equalTo(ChunkEntityFields.ROOT_THREAD_EVENT_ID, rootThreadEventId)
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD_THREAD, true)
|
||||
.findFirst()
|
||||
}
|
||||
internal fun ChunkEntity.Companion.findEventInThreadChunk(realm: Realm, roomId: String, event: String): ChunkEntity? {
|
||||
return where(realm, roomId)
|
||||
.`in`(ChunkEntityFields.TIMELINE_EVENTS.EVENT_ID, arrayListOf(event).toTypedArray())
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD_THREAD, true)
|
||||
.findFirst()
|
||||
}
|
||||
internal fun ChunkEntity.Companion.findAllIncludingEvents(realm: Realm, eventIds: List<String>): RealmResults<ChunkEntity> {
|
||||
return realm.where<ChunkEntity>()
|
||||
.`in`(ChunkEntityFields.TIMELINE_EVENTS.EVENT_ID, eventIds.toTypedArray())
|
||||
.isNull(ChunkEntityFields.ROOT_THREAD_EVENT_ID)
|
||||
.findAll()
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ internal fun EventAnnotationsSummaryEntity.Companion.create(realm: Realm, roomId
|
||||
this.roomId = roomId
|
||||
}
|
||||
// Denormalization
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = eventId).findFirst()?.let {
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = eventId).findAll()?.forEach {
|
||||
it.annotations = obj
|
||||
}
|
||||
return obj
|
||||
|
@ -57,6 +57,7 @@ import org.matrix.android.sdk.internal.database.model.ReactionAggregatedSummaryE
|
||||
import org.matrix.android.sdk.internal.database.model.ReactionAggregatedSummaryEntityFields
|
||||
import org.matrix.android.sdk.internal.database.model.ReferencesAggregatedSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import org.matrix.android.sdk.internal.database.query.create
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
@ -117,8 +118,8 @@ internal class EventRelationsAggregationProcessor @Inject constructor(
|
||||
|
||||
EventAnnotationsSummaryEntity.where(realm, roomId, event.eventId ?: "").findFirst()
|
||||
?.let {
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = event.eventId ?: "").findFirst()
|
||||
?.let { tet -> tet.annotations = it }
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = event.eventId ?: "").findAll()
|
||||
?.forEach { tet -> tet.annotations = it }
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,7 +336,10 @@ internal class EventRelationsAggregationProcessor @Inject constructor(
|
||||
}
|
||||
|
||||
if (!isLocalEcho) {
|
||||
val replaceEvent = TimelineEventEntity.where(realm, roomId, eventId).findFirst()
|
||||
val replaceEvent = TimelineEventEntity
|
||||
.where(realm, roomId, eventId)
|
||||
.equalTo(TimelineEventEntityFields.OWNED_BY_THREAD_CHUNK, false)
|
||||
.findFirst()
|
||||
handleThreadSummaryEdition(editedEvent, replaceEvent, existingSummary?.editions)
|
||||
}
|
||||
}
|
||||
|
@ -227,6 +227,8 @@ internal interface RoomAPI {
|
||||
@Path("eventId") eventId: String,
|
||||
@Path("relationType") relationType: String,
|
||||
@Path("eventType") eventType: String,
|
||||
@Query("from") from: String? = null,
|
||||
@Query("to") to: String? = null,
|
||||
@Query("limit") limit: Int? = null
|
||||
): RelationsResponse
|
||||
|
||||
|
@ -206,7 +206,13 @@ internal class DefaultRelationService @AssistedInject constructor(
|
||||
}
|
||||
|
||||
override suspend fun fetchThreadTimeline(rootThreadEventId: String): Boolean {
|
||||
return fetchThreadTimelineTask.execute(FetchThreadTimelineTask.Params(roomId, rootThreadEventId))
|
||||
fetchThreadTimelineTask.execute(FetchThreadTimelineTask.Params(
|
||||
roomId,
|
||||
rootThreadEventId,
|
||||
null,
|
||||
10
|
||||
))
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
* Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -27,7 +27,6 @@ import org.matrix.android.sdk.internal.crypto.CryptoSessionInfoProvider
|
||||
import org.matrix.android.sdk.internal.crypto.DefaultCryptoService
|
||||
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
|
||||
import org.matrix.android.sdk.internal.database.helper.addTimelineEvent
|
||||
import org.matrix.android.sdk.internal.database.helper.updateThreadSummaryIfNeeded
|
||||
import org.matrix.android.sdk.internal.database.mapper.asDomain
|
||||
import org.matrix.android.sdk.internal.database.mapper.toEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
@ -36,8 +35,10 @@ import org.matrix.android.sdk.internal.database.model.EventAnnotationsSummaryEnt
|
||||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.database.model.ReactionAggregatedSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfRoom
|
||||
import org.matrix.android.sdk.internal.database.query.find
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfThread
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.getOrNull
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
@ -47,16 +48,39 @@ import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
|
||||
import org.matrix.android.sdk.internal.network.executeRequest
|
||||
import org.matrix.android.sdk.internal.session.events.getFixedRoomMemberContent
|
||||
import org.matrix.android.sdk.internal.session.room.RoomAPI
|
||||
import org.matrix.android.sdk.internal.session.room.relation.RelationsResponse
|
||||
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
|
||||
import org.matrix.android.sdk.internal.task.Task
|
||||
import org.matrix.android.sdk.internal.util.awaitTransaction
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal interface FetchThreadTimelineTask : Task<FetchThreadTimelineTask.Params, Boolean> {
|
||||
/***
|
||||
* This class is responsible to Fetch paginated chunks of the thread timeline using the /relations API
|
||||
*
|
||||
*
|
||||
* How it works
|
||||
*
|
||||
* The problem?
|
||||
* - We cannot use the existing timeline architecture to paginate through the timeline
|
||||
* - We want our new events to be live, so any interactions with them like reactions will continue to work. We should
|
||||
* handle appropriately the existing events from /messages api with the new events from /relations.
|
||||
* - Handling edge cases like receiving an event from /messages while you have already created a new one from the /relations response
|
||||
*
|
||||
* The solution
|
||||
* We generate a temporarily thread chunk that will be used to store any new paginated results from the /relations api
|
||||
* We bind the timeline events from that chunk with the already existing ones. So we will have one common instance, and
|
||||
* all reactions, edits etc will continue to work. If the events do not exists we create them
|
||||
* and we will reuse the same EventEntity instance when (and if) the same event will be fetched from the main (/messages) timeline
|
||||
*
|
||||
*/
|
||||
internal interface FetchThreadTimelineTask : Task<FetchThreadTimelineTask.Params, DefaultFetchThreadTimelineTask.Result> {
|
||||
data class Params(
|
||||
val roomId: String,
|
||||
val rootThreadEventId: String
|
||||
val rootThreadEventId: String,
|
||||
val from: String?,
|
||||
val limit: Int
|
||||
|
||||
)
|
||||
}
|
||||
|
||||
@ -69,93 +93,133 @@ internal class DefaultFetchThreadTimelineTask @Inject constructor(
|
||||
private val cryptoService: DefaultCryptoService
|
||||
) : FetchThreadTimelineTask {
|
||||
|
||||
override suspend fun execute(params: FetchThreadTimelineTask.Params): Boolean {
|
||||
enum class Result {
|
||||
SHOULD_FETCH_MORE,
|
||||
REACHED_END,
|
||||
SUCCESS
|
||||
}
|
||||
|
||||
override suspend fun execute(params: FetchThreadTimelineTask.Params): Result {
|
||||
val isRoomEncrypted = cryptoSessionInfoProvider.isRoomEncrypted(params.roomId)
|
||||
val response = executeRequest(globalErrorReceiver) {
|
||||
roomAPI.getRelations(
|
||||
roomId = params.roomId,
|
||||
eventId = params.rootThreadEventId,
|
||||
relationType = RelationType.IO_THREAD,
|
||||
from = params.from,
|
||||
eventType = if (isRoomEncrypted) EventType.ENCRYPTED else EventType.MESSAGE,
|
||||
limit = 2000
|
||||
limit = params.limit
|
||||
)
|
||||
}
|
||||
|
||||
val threadList = response.chunks + listOfNotNull(response.originalEvent)
|
||||
Timber.i("###THREADS FetchThreadTimelineTask Fetched size:${response.chunks.size} nextBatch:${response.nextBatch} ")
|
||||
return handleRelationsResponse(response, params)
|
||||
}
|
||||
|
||||
return storeNewEventsIfNeeded(threadList, params.roomId)
|
||||
private suspend fun handleRelationsResponse(response: RelationsResponse,
|
||||
params: FetchThreadTimelineTask.Params): Result {
|
||||
|
||||
val threadList = response.chunks
|
||||
val threadRootEvent = response.originalEvent
|
||||
val hasReachEnd = response.nextBatch == null
|
||||
|
||||
monarchy.awaitTransaction { realm ->
|
||||
|
||||
val threadChunk = ChunkEntity.findLastForwardChunkOfThread(realm, params.roomId, params.rootThreadEventId)
|
||||
?: run {
|
||||
return@awaitTransaction
|
||||
}
|
||||
|
||||
threadChunk.prevToken = response.nextBatch
|
||||
val roomMemberContentsByUser = HashMap<String, RoomMemberContent?>()
|
||||
|
||||
for (event in threadList) {
|
||||
if (event.eventId == null || event.senderId == null || event.type == null) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (threadChunk.timelineEvents.find(event.eventId) != null) {
|
||||
// Event already exists in thread chunk, skip it
|
||||
Timber.i("###THREADS FetchThreadTimelineTask event: ${event.eventId} already exists in thread chunk, skip it")
|
||||
continue
|
||||
}
|
||||
|
||||
val timelineEvent = TimelineEventEntity
|
||||
.where(realm, roomId = params.roomId, event.eventId)
|
||||
.findFirst()
|
||||
|
||||
if (timelineEvent != null) {
|
||||
// Event already exists but not in the thread chunk
|
||||
// Lets added there
|
||||
Timber.i("###THREADS FetchThreadTimelineTask event: ${event.eventId} exists but not in the thread chunk, add it at the end")
|
||||
threadChunk.timelineEvents.add(timelineEvent)
|
||||
} else {
|
||||
Timber.i("###THREADS FetchThreadTimelineTask event: ${event.eventId} is brand NEW create an entity and add it!")
|
||||
val eventEntity = createEventEntity(params.roomId, event, realm)
|
||||
roomMemberContentsByUser.addSenderState(realm, params.roomId, event.senderId)
|
||||
threadChunk.addTimelineEvent(
|
||||
roomId = params.roomId,
|
||||
eventEntity = eventEntity,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
ownedByThreadChunk = true,
|
||||
roomMemberContentsByUser = roomMemberContentsByUser)
|
||||
}
|
||||
}
|
||||
|
||||
if (hasReachEnd) {
|
||||
val rootThread = TimelineEventEntity
|
||||
.where(realm, roomId = params.roomId, params.rootThreadEventId)
|
||||
.findFirst()
|
||||
if (rootThread != null) {
|
||||
// If root thread event already exists add it to our chunk
|
||||
threadChunk.timelineEvents.add(rootThread)
|
||||
Timber.i("###THREADS FetchThreadTimelineTask root thread event: ${params.rootThreadEventId} found and added!")
|
||||
} else if (threadRootEvent?.senderId != null) {
|
||||
// Case when thread event is not in the device
|
||||
Timber.i("###THREADS FetchThreadTimelineTask root thread event: ${params.rootThreadEventId} NOT FOUND! Lets create a temp one")
|
||||
val eventEntity = createEventEntity(params.roomId, threadRootEvent, realm)
|
||||
roomMemberContentsByUser.addSenderState(realm, params.roomId, threadRootEvent.senderId)
|
||||
threadChunk.addTimelineEvent(
|
||||
roomId = params.roomId,
|
||||
eventEntity = eventEntity,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
ownedByThreadChunk = true,
|
||||
roomMemberContentsByUser = roomMemberContentsByUser)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return if (hasReachEnd) {
|
||||
Result.REACHED_END
|
||||
} else {
|
||||
Result.SHOULD_FETCH_MORE
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Reuse this function to all the app
|
||||
/**
|
||||
* If we don't have any new state on this user, get it from db
|
||||
*/
|
||||
private fun HashMap<String, RoomMemberContent?>.addSenderState(realm: Realm, roomId: String, senderId: String) {
|
||||
getOrPut(senderId) {
|
||||
CurrentStateEventEntity
|
||||
.getOrNull(realm, roomId, senderId, EventType.STATE_ROOM_MEMBER)
|
||||
?.root?.asDomain()
|
||||
?.getFixedRoomMemberContent()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store new events if they are not already received, and returns weather or not,
|
||||
* a timeline update should be made
|
||||
* @param threadList is the list containing the thread replies
|
||||
* @param roomId the roomId of the the thread
|
||||
* @return
|
||||
* Create an EventEntity to be added in the TimelineEventEntity
|
||||
*/
|
||||
private suspend fun storeNewEventsIfNeeded(threadList: List<Event>, roomId: String): Boolean {
|
||||
var eventsSkipped = 0
|
||||
monarchy
|
||||
.awaitTransaction { realm ->
|
||||
val chunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomId)
|
||||
|
||||
val optimizedThreadSummaryMap = hashMapOf<String, EventEntity>()
|
||||
val roomMemberContentsByUser = HashMap<String, RoomMemberContent?>()
|
||||
|
||||
for (event in threadList.reversed()) {
|
||||
if (event.eventId == null || event.senderId == null || event.type == null) {
|
||||
eventsSkipped++
|
||||
continue
|
||||
}
|
||||
|
||||
if (EventEntity.where(realm, event.eventId).findFirst() != null) {
|
||||
// Skip if event already exists
|
||||
eventsSkipped++
|
||||
continue
|
||||
}
|
||||
if (event.isEncrypted()) {
|
||||
// Decrypt events that will be stored
|
||||
decryptIfNeeded(event, roomId)
|
||||
}
|
||||
|
||||
handleReaction(realm, event, roomId)
|
||||
|
||||
val ageLocalTs = event.unsignedData?.age?.let { System.currentTimeMillis() - it }
|
||||
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.INCREMENTAL_SYNC)
|
||||
|
||||
// Sender info
|
||||
roomMemberContentsByUser.getOrPut(event.senderId) {
|
||||
// If we don't have any new state on this user, get it from db
|
||||
val rootStateEvent = CurrentStateEventEntity.getOrNull(realm, roomId, event.senderId, EventType.STATE_ROOM_MEMBER)?.root
|
||||
rootStateEvent?.asDomain()?.getFixedRoomMemberContent()
|
||||
}
|
||||
|
||||
chunk?.addTimelineEvent(roomId, eventEntity, PaginationDirection.FORWARDS, roomMemberContentsByUser)
|
||||
eventEntity.rootThreadEventId?.let {
|
||||
// This is a thread event
|
||||
optimizedThreadSummaryMap[it] = eventEntity
|
||||
} ?: run {
|
||||
// This is a normal event or a root thread one
|
||||
optimizedThreadSummaryMap[eventEntity.eventId] = eventEntity
|
||||
}
|
||||
}
|
||||
|
||||
optimizedThreadSummaryMap.updateThreadSummaryIfNeeded(
|
||||
roomId = roomId,
|
||||
realm = realm,
|
||||
currentUserId = userId,
|
||||
shouldUpdateNotifications = false
|
||||
)
|
||||
}
|
||||
Timber.i("----> size: ${threadList.size} | skipped: $eventsSkipped | threads: ${threadList.map { it.eventId }}")
|
||||
|
||||
return eventsSkipped == threadList.size
|
||||
private fun createEventEntity(roomId: String, event: Event, realm: Realm): EventEntity {
|
||||
val ageLocalTs = event.unsignedData?.age?.let { System.currentTimeMillis() - it }
|
||||
return event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.INCREMENTAL_SYNC)
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the event decryption mechanism for a specific event
|
||||
*/
|
||||
|
||||
private fun decryptIfNeeded(event: Event, roomId: String) {
|
||||
try {
|
||||
// Event from sync does not have roomId, so add it to the event first
|
||||
|
@ -38,6 +38,7 @@ import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||
import org.matrix.android.sdk.internal.database.lightweight.LightweightSettingsStorage
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadTimelineTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer
|
||||
@ -58,6 +59,7 @@ internal class DefaultTimeline(private val roomId: String,
|
||||
paginationTask: PaginationTask,
|
||||
getEventTask: GetContextOfEventTask,
|
||||
fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
fetchThreadTimelineTask: FetchThreadTimelineTask,
|
||||
timelineEventMapper: TimelineEventMapper,
|
||||
timelineInput: TimelineInput,
|
||||
threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
@ -89,7 +91,9 @@ internal class DefaultTimeline(private val roomId: String,
|
||||
realm = backgroundRealm,
|
||||
eventDecryptor = eventDecryptor,
|
||||
paginationTask = paginationTask,
|
||||
realmConfiguration = realmConfiguration,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
fetchThreadTimelineTask = fetchThreadTimelineTask,
|
||||
getContextOfEventTask = getEventTask,
|
||||
timelineInput = timelineInput,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
|
@ -40,6 +40,7 @@ import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.di.UserId
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadTimelineTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
@ -55,6 +56,7 @@ internal class DefaultTimelineService @AssistedInject constructor(
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val fetchThreadTimelineTask: FetchThreadTimelineTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
@ -76,10 +78,11 @@ internal class DefaultTimelineService @AssistedInject constructor(
|
||||
realmConfiguration = monarchy.realmConfiguration,
|
||||
coroutineDispatchers = coroutineDispatchers,
|
||||
paginationTask = paginationTask,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
timelineInput = timelineInput,
|
||||
eventDecryptor = eventDecryptor,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
fetchThreadTimelineTask = fetchThreadTimelineTask,
|
||||
loadRoomMembersTask = loadRoomMembersTask,
|
||||
readReceiptHandler = readReceiptHandler,
|
||||
getEventTask = contextOfEventTask,
|
||||
|
@ -19,20 +19,28 @@ package org.matrix.android.sdk.internal.session.room.timeline
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.OrderedRealmCollectionChangeListener
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmConfiguration
|
||||
import io.realm.RealmResults
|
||||
import io.realm.kotlin.createObject
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import org.matrix.android.sdk.api.extensions.orFalse
|
||||
import org.matrix.android.sdk.api.session.room.send.SendState
|
||||
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||
import org.matrix.android.sdk.internal.database.helper.addIfNecessary
|
||||
import org.matrix.android.sdk.internal.database.lightweight.LightweightSettingsStorage
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.deleteAndClearThreadEvents
|
||||
import org.matrix.android.sdk.internal.database.query.findAllIncludingEvents
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfThread
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadTimelineTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
/**
|
||||
@ -76,6 +84,8 @@ internal class LoadTimelineStrategy(
|
||||
val realm: AtomicReference<Realm>,
|
||||
val eventDecryptor: TimelineEventDecryptor,
|
||||
val paginationTask: PaginationTask,
|
||||
val realmConfiguration: RealmConfiguration,
|
||||
val fetchThreadTimelineTask: FetchThreadTimelineTask,
|
||||
val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
val getContextOfEventTask: GetContextOfEventTask,
|
||||
val timelineInput: TimelineInput,
|
||||
@ -90,7 +100,6 @@ internal class LoadTimelineStrategy(
|
||||
private var getContextLatch: CompletableDeferred<Unit>? = null
|
||||
private var chunkEntity: RealmResults<ChunkEntity>? = null
|
||||
private var timelineChunk: TimelineChunk? = null
|
||||
|
||||
private val chunkEntityListener = OrderedRealmCollectionChangeListener { _: RealmResults<ChunkEntity>, changeSet: OrderedCollectionChangeSet ->
|
||||
// Can be call either when you open a permalink on an unknown event
|
||||
// or when there is a gap in the timeline.
|
||||
@ -170,6 +179,9 @@ internal class LoadTimelineStrategy(
|
||||
getContextLatch?.cancel()
|
||||
chunkEntity = null
|
||||
timelineChunk = null
|
||||
if(mode is Mode.Thread) {
|
||||
clearThreadChunkEntity(dependencies.realm.get(), mode.rootThreadEventId)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean = true): LoadMoreResult {
|
||||
@ -185,6 +197,9 @@ internal class LoadTimelineStrategy(
|
||||
return LoadMoreResult.FAILURE
|
||||
}
|
||||
}
|
||||
if (mode is Mode.Thread) {
|
||||
return timelineChunk?.loadMoreThread(count, Timeline.Direction.BACKWARDS) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
return timelineChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
|
||||
@ -201,7 +216,7 @@ internal class LoadTimelineStrategy(
|
||||
}
|
||||
|
||||
private fun buildSendingEvents(): List<TimelineEvent> {
|
||||
return if (hasReachedLastForward()) {
|
||||
return if (hasReachedLastForward() || mode is Mode.Thread) {
|
||||
sendingEventsDataSource.buildSendingEvents()
|
||||
} else {
|
||||
emptyList()
|
||||
@ -219,13 +234,48 @@ internal class LoadTimelineStrategy(
|
||||
ChunkEntity.findAllIncludingEvents(realm, listOf(mode.originEventId))
|
||||
}
|
||||
is Mode.Thread -> {
|
||||
recreateThreadChunkEntity(realm, mode.rootThreadEventId)
|
||||
ChunkEntity.where(realm, roomId)
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD, true)
|
||||
.equalTo(ChunkEntityFields.ROOT_THREAD_EVENT_ID, mode.rootThreadEventId)
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD_THREAD, true)
|
||||
.findAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear any existing thread chunk entity and create a new one, with the
|
||||
* rootThreadEventId included
|
||||
*/
|
||||
private fun recreateThreadChunkEntity(realm: Realm, rootThreadEventId: String) {
|
||||
realm.executeTransaction {
|
||||
// Lets delete the chunk and start a new one
|
||||
ChunkEntity.findLastForwardChunkOfThread(it, roomId, rootThreadEventId)?.deleteAndClearThreadEvents()?.let {
|
||||
Timber.i("###THREADS LoadTimelineStrategy [onStart] thread chunk cleared..")
|
||||
}
|
||||
val threadChunk = it.createObject<ChunkEntity>().apply {
|
||||
Timber.i("###THREADS LoadTimelineStrategy [onStart] Created new thread chunk with rootThreadEventId: $rootThreadEventId")
|
||||
this.rootThreadEventId = rootThreadEventId
|
||||
this.isLastForwardThread = true
|
||||
}
|
||||
if (threadChunk.isValid) {
|
||||
RoomEntity.where(it, roomId).findFirst()?.addIfNecessary(threadChunk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear any existing thread chunk
|
||||
*/
|
||||
private fun clearThreadChunkEntity(realm: Realm, rootThreadEventId: String) {
|
||||
realm.executeTransaction {
|
||||
ChunkEntity.findLastForwardChunkOfThread(it, roomId, rootThreadEventId)?.deleteAndClearThreadEvents()?.let {
|
||||
Timber.i("###THREADS LoadTimelineStrategy [onStop] thread chunk cleared..")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private fun hasReachedLastForward(): Boolean {
|
||||
return timelineChunk?.hasReachedLastForward().orFalse()
|
||||
}
|
||||
@ -237,8 +287,10 @@ internal class LoadTimelineStrategy(
|
||||
timelineSettings = dependencies.timelineSettings,
|
||||
roomId = roomId,
|
||||
timelineId = timelineId,
|
||||
fetchThreadTimelineTask = dependencies.fetchThreadTimelineTask,
|
||||
eventDecryptor = dependencies.eventDecryptor,
|
||||
paginationTask = dependencies.paginationTask,
|
||||
realmConfiguration = dependencies.realmConfiguration,
|
||||
fetchTokenAndPaginateTask = dependencies.fetchTokenAndPaginateTask,
|
||||
timelineEventMapper = dependencies.timelineEventMapper,
|
||||
uiEchoManager = uiEchoManager,
|
||||
|
@ -18,6 +18,7 @@ package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.OrderedRealmCollectionChangeListener
|
||||
import io.realm.RealmConfiguration
|
||||
import io.realm.RealmObjectChangeListener
|
||||
import io.realm.RealmQuery
|
||||
import io.realm.RealmResults
|
||||
@ -36,6 +37,8 @@ import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import org.matrix.android.sdk.internal.session.room.relation.threads.DefaultFetchThreadTimelineTask
|
||||
import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadTimelineTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import timber.log.Timber
|
||||
import java.util.Collections
|
||||
@ -50,8 +53,10 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
private val timelineSettings: TimelineSettings,
|
||||
private val roomId: String,
|
||||
private val timelineId: String,
|
||||
private val fetchThreadTimelineTask: FetchThreadTimelineTask,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val realmConfiguration: RealmConfiguration,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val uiEchoManager: UIEchoManager? = null,
|
||||
@ -142,6 +147,9 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
val loadFromStorage = loadFromStorage(count, direction).also {
|
||||
logLoadedFromStorage(it, direction)
|
||||
}
|
||||
if (loadFromStorage.numberOfEvents == 6) {
|
||||
Timber.i("here")
|
||||
}
|
||||
|
||||
val offsetCount = count - loadFromStorage.numberOfEvents
|
||||
|
||||
@ -158,6 +166,29 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will fetch more live thread timeline events using the /relations api. It will
|
||||
* always fetch results, while we want our data to be up to dated.
|
||||
*/
|
||||
suspend fun loadMoreThread(count: Int, direction: Timeline.Direction): LoadMoreResult {
|
||||
|
||||
return if (direction == Timeline.Direction.BACKWARDS) {
|
||||
try {
|
||||
fetchThreadTimelineTask.execute(FetchThreadTimelineTask.Params(
|
||||
roomId,
|
||||
timelineSettings.rootThreadEventId!!,
|
||||
chunkEntity.prevToken,
|
||||
count
|
||||
)).toLoadMoreResult()
|
||||
} catch (failure: Throwable) {
|
||||
Timber.e(failure, "Failed to fetch thread timeline events from the server")
|
||||
LoadMoreResult.FAILURE
|
||||
}
|
||||
} else {
|
||||
LoadMoreResult.FAILURE
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun delegateLoadMore(fetchFromServerIfNeeded: Boolean, offsetCount: Int, direction: Timeline.Direction): LoadMoreResult {
|
||||
return if (direction == Timeline.Direction.FORWARDS) {
|
||||
val nextChunkEntity = chunkEntity.nextChunk
|
||||
@ -287,7 +318,7 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
* @return the number of events loaded. If we are in a thread timeline it also returns
|
||||
* whether or not we reached the end/root message
|
||||
*/
|
||||
private suspend fun loadFromStorage(count: Int, direction: Timeline.Direction): LoadedFromStorage {
|
||||
private fun loadFromStorage(count: Int, direction: Timeline.Direction): LoadedFromStorage {
|
||||
val displayIndex = getNextDisplayIndex(direction) ?: return LoadedFromStorage()
|
||||
val baseQuery = timelineEventEntities.where()
|
||||
|
||||
@ -414,6 +445,14 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
}
|
||||
}
|
||||
|
||||
private fun DefaultFetchThreadTimelineTask.Result.toLoadMoreResult(): LoadMoreResult {
|
||||
return when (this) {
|
||||
DefaultFetchThreadTimelineTask.Result.REACHED_END -> LoadMoreResult.REACHED_END
|
||||
DefaultFetchThreadTimelineTask.Result.SHOULD_FETCH_MORE,
|
||||
DefaultFetchThreadTimelineTask.Result.SUCCESS -> LoadMoreResult.SUCCESS
|
||||
}
|
||||
}
|
||||
|
||||
private fun getOffsetIndex(): Int {
|
||||
var offset = 0
|
||||
var currentNextChunk = nextChunk
|
||||
@ -455,6 +494,7 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (insertions.isNotEmpty() || modifications.isNotEmpty()) {
|
||||
onBuiltEvents(true)
|
||||
}
|
||||
@ -489,6 +529,8 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
||||
timelineId = timelineId,
|
||||
eventDecryptor = eventDecryptor,
|
||||
paginationTask = paginationTask,
|
||||
realmConfiguration = realmConfiguration,
|
||||
fetchThreadTimelineTask = fetchThreadTimelineTask,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
uiEchoManager = uiEchoManager,
|
||||
@ -535,7 +577,6 @@ private fun ChunkEntity.sortedTimelineEvents(rootThreadEventId: String?): RealmR
|
||||
.or()
|
||||
.equalTo(TimelineEventEntityFields.ROOT.EVENT_ID, rootThreadEventId)
|
||||
.endGroup()
|
||||
.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
|
||||
.findAll()
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
|
||||
import org.matrix.android.sdk.internal.database.query.create
|
||||
import org.matrix.android.sdk.internal.database.query.find
|
||||
@ -49,10 +50,10 @@ import javax.inject.Inject
|
||||
* Insert Chunk in DB, and eventually link next and previous chunk in db.
|
||||
*/
|
||||
internal class TokenChunkEventPersistor @Inject constructor(
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
@UserId private val userId: String,
|
||||
private val lightweightSettingsStorage: LightweightSettingsStorage,
|
||||
private val liveEventManager: Lazy<StreamEventsManager>) {
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
@UserId private val userId: String,
|
||||
private val lightweightSettingsStorage: LightweightSettingsStorage,
|
||||
private val liveEventManager: Lazy<StreamEventsManager>) {
|
||||
|
||||
enum class Result {
|
||||
SHOULD_FETCH_MORE,
|
||||
@ -145,9 +146,12 @@ internal class TokenChunkEventPersistor @Inject constructor(
|
||||
if (event.eventId == null || event.senderId == null) {
|
||||
return@forEach
|
||||
}
|
||||
// We check for the timeline event with this id
|
||||
// We check for the timeline event with this id, but not in the thread chunk
|
||||
val eventId = event.eventId
|
||||
val existingTimelineEvent = TimelineEventEntity.where(realm, roomId, eventId).findFirst()
|
||||
val existingTimelineEvent = TimelineEventEntity
|
||||
.where(realm, roomId, eventId)
|
||||
.equalTo(TimelineEventEntityFields.OWNED_BY_THREAD_CHUNK, false)
|
||||
.findFirst()
|
||||
// If it exists, we want to stop here, just link the prevChunk
|
||||
val existingChunk = existingTimelineEvent?.chunk?.firstOrNull()
|
||||
if (existingChunk != null) {
|
||||
@ -173,7 +177,7 @@ internal class TokenChunkEventPersistor @Inject constructor(
|
||||
return@processTimelineEvents
|
||||
}
|
||||
val ageLocalTs = event.unsignedData?.age?.let { now - it }
|
||||
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
|
||||
var eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
|
||||
if (event.type == EventType.STATE_ROOM_MEMBER && event.stateKey != null) {
|
||||
val contentToUse = if (direction == PaginationDirection.BACKWARDS) {
|
||||
event.prevContent
|
||||
@ -183,7 +187,11 @@ internal class TokenChunkEventPersistor @Inject constructor(
|
||||
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>()
|
||||
}
|
||||
liveEventManager.get().dispatchPaginatedEventReceived(event, roomId)
|
||||
currentChunk.addTimelineEvent(roomId, eventEntity, direction, roomMemberContentsByUser)
|
||||
currentChunk.addTimelineEvent(
|
||||
roomId = roomId,
|
||||
eventEntity = eventEntity,
|
||||
direction = direction,
|
||||
roomMemberContentsByUser = roomMemberContentsByUser)
|
||||
if (lightweightSettingsStorage.areThreadMessagesEnabled()) {
|
||||
eventEntity.rootThreadEventId?.let {
|
||||
// This is a thread event
|
||||
|
@ -46,10 +46,12 @@ import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomMemberSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.deleteOnCascade
|
||||
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
|
||||
import org.matrix.android.sdk.internal.database.query.find
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfRoom
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfThread
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.getOrNull
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
@ -343,6 +345,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
val customList = arrayListOf<String>()
|
||||
private fun handleTimelineEvents(realm: Realm,
|
||||
roomId: String,
|
||||
roomEntity: RoomEntity,
|
||||
@ -406,11 +409,18 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
rootStateEvent?.asDomain()?.getFixedRoomMemberContent()
|
||||
}
|
||||
|
||||
chunkEntity.addTimelineEvent(roomId, eventEntity, PaginationDirection.FORWARDS, roomMemberContentsByUser)
|
||||
val timelineEventAdded = chunkEntity.addTimelineEvent(
|
||||
roomId = roomId,
|
||||
eventEntity = eventEntity,
|
||||
direction = PaginationDirection.FORWARDS,
|
||||
roomMemberContentsByUser = roomMemberContentsByUser)
|
||||
if (lightweightSettingsStorage.areThreadMessagesEnabled()) {
|
||||
eventEntity.rootThreadEventId?.let {
|
||||
// This is a thread event
|
||||
optimizedThreadSummaryMap[it] = eventEntity
|
||||
// Add the same thread timeline event to Thread Chunk
|
||||
addToThreadChunkIfNeeded(realm, roomId, it, timelineEventAdded, roomEntity)
|
||||
|
||||
} ?: run {
|
||||
// This is a normal event or a root thread one
|
||||
optimizedThreadSummaryMap[eventEntity.eventId] = eventEntity
|
||||
@ -455,6 +465,29 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds new event to the appropriate thread chunk. If the event is already in
|
||||
* the thread timeline and /relations api, we should not added it
|
||||
*/
|
||||
private fun addToThreadChunkIfNeeded(realm: Realm,
|
||||
roomId: String,
|
||||
threadId: String,
|
||||
timelineEventEntity: TimelineEventEntity?,
|
||||
roomEntity: RoomEntity) {
|
||||
|
||||
val eventId = timelineEventEntity?.eventId ?: return
|
||||
|
||||
ChunkEntity.findLastForwardChunkOfThread(realm, roomId, threadId)?.let { threadChunk ->
|
||||
val existingEvent = threadChunk.timelineEvents.find(eventId)
|
||||
if (existingEvent?.ownedByThreadChunk == true) {
|
||||
Timber.i("###THREADS RoomSyncHandler event:${timelineEventEntity.eventId} already exists, do not add")
|
||||
return@addToThreadChunkIfNeeded
|
||||
}
|
||||
threadChunk.timelineEvents.add(0, timelineEventEntity)
|
||||
roomEntity.addIfNecessary(threadChunk)
|
||||
}
|
||||
}
|
||||
|
||||
private fun decryptIfNeeded(event: Event, roomId: String) {
|
||||
try {
|
||||
// Event from sync does not have roomId, so add it to the event first
|
||||
|
@ -65,7 +65,7 @@ class ThreadListController @Inject constructor(
|
||||
id(timelineEvent.eventId)
|
||||
avatarRenderer(host.avatarRenderer)
|
||||
matrixItem(timelineEvent.senderInfo.toMatrixItem())
|
||||
title(timelineEvent.senderInfo.displayName)
|
||||
title(timelineEvent.senderInfo.displayName.orEmpty())
|
||||
date(date)
|
||||
rootMessageDeleted(timelineEvent.root.isRedacted())
|
||||
threadNotificationState(timelineEvent.root.threadDetails?.threadNotificationState ?: ThreadNotificationState.NO_NEW_MESSAGE)
|
||||
|
Loading…
x
Reference in New Issue
Block a user