Make Android app thread aware. Handling also extreme cases like really old messages that the root thread message is not fetched in the device and initial sync
This commit is contained in:
parent
d1f3e3f958
commit
45a63b73bd
|
@ -19,6 +19,7 @@ package org.matrix.android.sdk.internal.database.model
|
|||
import io.realm.RealmObject
|
||||
import io.realm.annotations.Index
|
||||
import org.matrix.android.sdk.api.session.room.send.SendState
|
||||
import org.matrix.android.sdk.api.util.JsonDict
|
||||
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
|
||||
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
|
@ -56,10 +57,10 @@ internal open class EventEntity(@Index var eventId: String = "",
|
|||
|
||||
companion object
|
||||
|
||||
fun setDecryptionResult(result: MXEventDecryptionResult) {
|
||||
fun setDecryptionResult(result: MXEventDecryptionResult, clearEvent: JsonDict? = null) {
|
||||
assertIsManaged()
|
||||
val decryptionResult = OlmDecryptionResult(
|
||||
payload = result.clearEvent,
|
||||
payload = clearEvent ?: result.clearEvent,
|
||||
senderKey = result.senderCurve25519Key,
|
||||
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
|
||||
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
|
||||
|
|
|
@ -23,6 +23,7 @@ import io.realm.RealmConfiguration
|
|||
import io.realm.RealmQuery
|
||||
import io.realm.RealmResults
|
||||
import io.realm.Sort
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.matrix.android.sdk.api.MatrixCallback
|
||||
import org.matrix.android.sdk.api.extensions.orFalse
|
||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||
|
@ -33,6 +34,7 @@ import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
|||
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||
import org.matrix.android.sdk.api.util.CancelableBag
|
||||
import org.matrix.android.sdk.internal.database.RealmSessionProvider
|
||||
import org.matrix.android.sdk.internal.database.mapper.EventMapper
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
|
@ -43,6 +45,7 @@ import org.matrix.android.sdk.internal.database.query.where
|
|||
import org.matrix.android.sdk.internal.database.query.whereRoomId
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import org.matrix.android.sdk.internal.task.configureWith
|
||||
import org.matrix.android.sdk.internal.util.Debouncer
|
||||
|
@ -72,6 +75,7 @@ internal class DefaultTimeline(
|
|||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val realmSessionProvider: RealmSessionProvider,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
private val readReceiptHandler: ReadReceiptHandler
|
||||
) : Timeline,
|
||||
TimelineInput.Listener,
|
||||
|
@ -577,6 +581,10 @@ internal class DefaultTimeline(
|
|||
} else {
|
||||
nextDisplayIndex = offsetIndex + 1
|
||||
}
|
||||
|
||||
// Prerequisite to in order for the ThreadsAwarenessHandler to work properly
|
||||
fetchRootThreadEventsIfNeeded(offsetResults)
|
||||
|
||||
offsetResults.forEach { eventEntity ->
|
||||
|
||||
val timelineEvent = buildTimelineEvent(eventEntity)
|
||||
|
@ -601,6 +609,20 @@ internal class DefaultTimeline(
|
|||
return offsetResults.size
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is responsible to fetch and store the root event of a thread event
|
||||
* in order to be able to display the event to the user appropriately
|
||||
*/
|
||||
private fun fetchRootThreadEventsIfNeeded(offsetResults: RealmResults<TimelineEventEntity>) = runBlocking {
|
||||
val eventEntityList = offsetResults
|
||||
.mapNotNull {
|
||||
it?.root
|
||||
}.map {
|
||||
EventMapper.map(it)
|
||||
}.toList()
|
||||
threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(eventEntityList)
|
||||
}
|
||||
|
||||
private fun buildTimelineEvent(eventEntity: TimelineEventEntity): TimelineEvent {
|
||||
return timelineEventMapper.map(
|
||||
timelineEventEntity = eventEntity,
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.matrix.android.sdk.internal.database.query.where
|
|||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
|
||||
internal class DefaultTimelineService @AssistedInject constructor(
|
||||
|
@ -52,6 +53,7 @@ internal class DefaultTimelineService @AssistedInject constructor(
|
|||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
private val readReceiptHandler: ReadReceiptHandler
|
||||
) : TimelineService {
|
||||
|
||||
|
@ -75,6 +77,7 @@ internal class DefaultTimelineService @AssistedInject constructor(
|
|||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
realmSessionProvider = realmSessionProvider,
|
||||
loadRoomMembersTask = loadRoomMembersTask,
|
||||
threadsAwarenessHandler = threadsAwarenessHandler,
|
||||
readReceiptHandler = readReceiptHandler
|
||||
)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.matrix.android.sdk.internal.crypto.model.event.EncryptedEventContent
|
|||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
|
@ -34,7 +35,8 @@ import javax.inject.Inject
|
|||
internal class TimelineEventDecryptor @Inject constructor(
|
||||
@SessionDatabase
|
||||
private val realmConfiguration: RealmConfiguration,
|
||||
private val cryptoService: CryptoService
|
||||
private val cryptoService: CryptoService,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler
|
||||
) {
|
||||
|
||||
private val newSessionListener = object : NewSessionListener {
|
||||
|
@ -106,10 +108,19 @@ internal class TimelineEventDecryptor @Inject constructor(
|
|||
val result = cryptoService.decryptEvent(request.event, timelineId)
|
||||
Timber.v("Successfully decrypted event ${event.eventId}")
|
||||
realm.executeTransaction {
|
||||
val eventId = event.eventId ?: ""
|
||||
EventEntity.where(it, eventId = eventId)
|
||||
val eventId = event.eventId ?: return@executeTransaction
|
||||
val eventEntity = EventEntity
|
||||
.where(it, eventId = eventId)
|
||||
.findFirst()
|
||||
?.setDecryptionResult(result)
|
||||
|
||||
eventEntity?.apply {
|
||||
val decryptedPayload = threadsAwarenessHandler.handleIfNeededDuringDecryption(
|
||||
it,
|
||||
roomId = event.roomId,
|
||||
event,
|
||||
result)
|
||||
setDecryptionResult(result, decryptedPayload)
|
||||
}
|
||||
}
|
||||
} catch (e: MXCryptoError) {
|
||||
Timber.v("Failed to decrypt event ${event.eventId} : ${e.localizedMessage}")
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.matrix.android.sdk.internal.session.sync.handler.PresenceSyncHandler
|
|||
import org.matrix.android.sdk.internal.session.sync.handler.SyncResponsePostTreatmentAggregatorHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.UserAccountDataSyncHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.RoomSyncHandler
|
||||
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
|
||||
import org.matrix.android.sdk.internal.util.awaitTransaction
|
||||
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
|
||||
import timber.log.Timber
|
||||
|
@ -62,6 +63,7 @@ internal class SyncResponseHandler @Inject constructor(
|
|||
private val tokenStore: SyncTokenStore,
|
||||
private val processEventForPushTask: ProcessEventForPushTask,
|
||||
private val pushRuleService: PushRuleService,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
private val presenceSyncHandler: PresenceSyncHandler
|
||||
) {
|
||||
|
||||
|
@ -94,6 +96,10 @@ internal class SyncResponseHandler @Inject constructor(
|
|||
Timber.v("Finish handling toDevice in $it ms")
|
||||
}
|
||||
val aggregator = SyncResponsePostTreatmentAggregator()
|
||||
|
||||
// Prerequisite for thread events handling in RoomSyncHandler
|
||||
threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(syncResponse)
|
||||
|
||||
// Start one big transaction
|
||||
monarchy.awaitTransaction { realm ->
|
||||
measureTimeMillis {
|
||||
|
|
|
@ -372,9 +372,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
threadsAwarenessHandler.handleIfNeeded(
|
||||
realm = realm,
|
||||
roomId = roomId,
|
||||
event = event,
|
||||
isInitialSync = isInitialSync,
|
||||
::decryptIfNeeded)
|
||||
event = event)
|
||||
|
||||
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
|
||||
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
|
||||
|
|
|
@ -16,25 +16,32 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync.handler.room
|
||||
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import io.realm.Realm
|
||||
import org.matrix.android.sdk.api.session.events.model.Content
|
||||
import org.matrix.android.sdk.api.session.Session
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.events.model.toContent
|
||||
import org.matrix.android.sdk.api.session.events.model.toModel
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageFormat
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageRelationContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageTextContent
|
||||
import org.matrix.android.sdk.api.session.room.model.tag.RoomTagContent
|
||||
import org.matrix.android.sdk.api.session.room.send.SendState
|
||||
import org.matrix.android.sdk.api.session.sync.model.SyncResponse
|
||||
import org.matrix.android.sdk.api.util.JsonDict
|
||||
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
|
||||
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
|
||||
import org.matrix.android.sdk.internal.database.mapper.EventMapper
|
||||
import org.matrix.android.sdk.internal.database.mapper.toEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomTagEntity
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.session.permalinks.PermalinkFactory
|
||||
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
|
||||
import org.matrix.android.sdk.internal.util.awaitTransaction
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
|
@ -44,31 +51,134 @@ import javax.inject.Inject
|
|||
* threads response as replies to the original message
|
||||
*/
|
||||
internal class ThreadsAwarenessHandler @Inject constructor(
|
||||
private val permalinkFactory: PermalinkFactory
|
||||
private val permalinkFactory: PermalinkFactory,
|
||||
private val cryptoService: CryptoService,
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
private val session: Session
|
||||
) {
|
||||
|
||||
/**
|
||||
* Fetch root thread events if they are missing from the local storage
|
||||
* @param syncResponse the sync response
|
||||
*/
|
||||
suspend fun fetchRootThreadEventsIfNeeded(syncResponse: SyncResponse) {
|
||||
val handlingStrategy = syncResponse.rooms?.join?.let {
|
||||
RoomSyncHandler.HandlingStrategy.JOINED(it)
|
||||
}
|
||||
if (handlingStrategy !is RoomSyncHandler.HandlingStrategy.JOINED) return
|
||||
val eventList = handlingStrategy.data
|
||||
.mapNotNull { (roomId, roomSync) ->
|
||||
roomSync.timeline?.events?.map {
|
||||
it.copy(roomId = roomId)
|
||||
}
|
||||
}.flatten()
|
||||
|
||||
fetchRootThreadEventsIfNeeded(eventList)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch root thread events if they are missing from the local storage
|
||||
* @param eventList a list with the events to examine
|
||||
*/
|
||||
suspend fun fetchRootThreadEventsIfNeeded(eventList: List<Event>) {
|
||||
if (eventList.isNullOrEmpty()) return
|
||||
|
||||
val threadsToFetch = emptyMap<String, String>().toMutableMap()
|
||||
monarchy.awaitTransaction { realm ->
|
||||
eventList.asSequence()
|
||||
.filter {
|
||||
isThreadEvent(it) && it.roomId != null
|
||||
}.mapNotNull { event ->
|
||||
getRootThreadEventId(event)?.let {
|
||||
Pair(it, event.roomId!!)
|
||||
}
|
||||
}.forEach { (rootThreadEventId, roomId) ->
|
||||
EventEntity.where(realm, rootThreadEventId).findFirst() ?: run { threadsToFetch[rootThreadEventId] = roomId }
|
||||
}
|
||||
}
|
||||
fetchThreadsEvents(threadsToFetch)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch multiple unique events using the fetchEvent function
|
||||
*/
|
||||
private suspend fun fetchThreadsEvents(threadsToFetch: Map<String, String>) {
|
||||
val eventEntityList = threadsToFetch.mapNotNull { (eventId, roomId) ->
|
||||
fetchEvent(eventId, roomId)?.let {
|
||||
it.toEntity(roomId, SendState.SYNCED, it.ageLocalTs)
|
||||
}
|
||||
}
|
||||
|
||||
if (eventEntityList.isNullOrEmpty()) return
|
||||
|
||||
// Transaction should be done on its own thread, like below
|
||||
monarchy.awaitTransaction { realm ->
|
||||
eventEntityList.forEach {
|
||||
it.copyToRealmOrIgnore(realm, EventInsertType.INCREMENTAL_SYNC)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will fetch the event from the homeserver, this is mandatory when the
|
||||
* initial thread message is too old and is not saved in the device, so in order to
|
||||
* construct the "reply to" format we need to know the event thread.
|
||||
* @return the Event or null otherwise
|
||||
*/
|
||||
private suspend fun fetchEvent(eventId: String, roomId: String): Event? {
|
||||
return runCatching {
|
||||
Timber.i("------> Fetching event[$eventId]....")
|
||||
session.getEvent(roomId = roomId, eventId = eventId)
|
||||
}.fold(
|
||||
onSuccess = {
|
||||
it
|
||||
},
|
||||
onFailure = {
|
||||
null
|
||||
})
|
||||
}
|
||||
|
||||
fun handleIfNeeded(realm: Realm,
|
||||
roomId: String,
|
||||
event: Event,
|
||||
isInitialSync: Boolean,
|
||||
decryptIfNeeded: (event: Event, roomId: String) -> Unit) {
|
||||
event: Event) {
|
||||
val payload = transformThreadToReplyIfNeeded(
|
||||
realm = realm,
|
||||
roomId = roomId,
|
||||
event = event,
|
||||
decryptedResult = event.mxDecryptionResult?.payload) ?: return
|
||||
|
||||
if (!isThreadEvent(event)) return
|
||||
val rootThreadEventId = getRootThreadEventId(event) ?: return
|
||||
val payload = event.mxDecryptionResult?.payload?.toMutableMap() ?: return
|
||||
val body = getValueFromPayload(payload, "body") ?: return
|
||||
val msgType = getValueFromPayload(payload, "msgtype") ?: return
|
||||
val rootThreadEventEntity = EventEntity.where(realm, eventId = rootThreadEventId).findFirst() ?: return
|
||||
val rootThreadEvent = EventMapper.map(rootThreadEventEntity)
|
||||
val rootThreadEventSenderId = rootThreadEvent.senderId ?: return
|
||||
|
||||
Timber.i("------> Thread event detected! - isInitialSync: $isInitialSync")
|
||||
|
||||
if (rootThreadEvent.isEncrypted()) {
|
||||
decryptIfNeeded(rootThreadEvent, roomId)
|
||||
event.mxDecryptionResult = event.mxDecryptionResult?.copy(payload = payload)
|
||||
}
|
||||
|
||||
val rootThreadEventBody = getValueFromPayload(rootThreadEvent.mxDecryptionResult?.payload?.toMutableMap(),"body")
|
||||
fun handleIfNeededDuringDecryption(realm: Realm,
|
||||
roomId: String?,
|
||||
event: Event,
|
||||
result: MXEventDecryptionResult): JsonDict? {
|
||||
return transformThreadToReplyIfNeeded(
|
||||
realm = realm,
|
||||
roomId = roomId,
|
||||
event = event,
|
||||
decryptedResult = result.clearEvent)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the event is a thread event then transform/enhance it to a visual Reply Event,
|
||||
* If the event is not a thread event, null value will be returned
|
||||
* If there is an error (ex. the root/origin thread event is not found), null willl be returend
|
||||
*/
|
||||
private fun transformThreadToReplyIfNeeded(realm: Realm, roomId: String?, event: Event, decryptedResult: JsonDict?): JsonDict? {
|
||||
roomId ?: return null
|
||||
if (!isThreadEvent(event)) return null
|
||||
val rootThreadEventId = getRootThreadEventId(event) ?: return null
|
||||
val payload = decryptedResult?.toMutableMap() ?: return null
|
||||
val body = getValueFromPayload(payload, "body") ?: return null
|
||||
val msgType = getValueFromPayload(payload, "msgtype") ?: return null
|
||||
val rootThreadEvent = getEventFromDB(realm, rootThreadEventId) ?: return null
|
||||
val rootThreadEventSenderId = rootThreadEvent.senderId ?: return null
|
||||
|
||||
decryptIfNeeded(rootThreadEvent, roomId)
|
||||
|
||||
val rootThreadEventBody = getValueFromPayload(rootThreadEvent.mxDecryptionResult?.payload?.toMutableMap(), "body")
|
||||
|
||||
val permalink = permalinkFactory.createPermalink(roomId, rootThreadEventId, false)
|
||||
val userLink = permalinkFactory.createPermalink(rootThreadEventSenderId, false) ?: ""
|
||||
|
@ -90,8 +200,40 @@ internal class ThreadsAwarenessHandler @Inject constructor(
|
|||
|
||||
payload["content"] = messageTextContent
|
||||
|
||||
event.mxDecryptionResult = event.mxDecryptionResult?.copy(payload = payload )
|
||||
return payload
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypt the event
|
||||
*/
|
||||
|
||||
private fun decryptIfNeeded(event: Event, roomId: String) {
|
||||
try {
|
||||
if (!event.isEncrypted() || event.mxDecryptionResult != null) return
|
||||
|
||||
// Event from sync does not have roomId, so add it to the event first
|
||||
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
|
||||
event.mxDecryptionResult = OlmDecryptionResult(
|
||||
payload = result.clearEvent,
|
||||
senderKey = result.senderCurve25519Key,
|
||||
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
|
||||
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
|
||||
)
|
||||
} catch (e: MXCryptoError) {
|
||||
if (e is MXCryptoError.Base) {
|
||||
event.mCryptoError = e.errorType
|
||||
event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to get the event form the local DB, if the event does not exist null
|
||||
* will be returned
|
||||
*/
|
||||
private fun getEventFromDB(realm: Realm, eventId: String): Event? {
|
||||
val eventEntity = EventEntity.where(realm, eventId = eventId).findFirst() ?: return null
|
||||
return EventMapper.map(eventEntity)
|
||||
}
|
||||
|
||||
private fun isThreadEvent(event: Event): Boolean =
|
||||
|
|
Loading…
Reference in New Issue