Fix live timeline event race condition

This commit is contained in:
ganfra 2021-03-11 15:21:20 +01:00
parent 4b8c59a23b
commit a407ed1903
4 changed files with 32 additions and 46 deletions

View File

@ -87,7 +87,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
}
override fun getTimeLineEventLive(eventId: String): LiveData<Optional<TimelineEvent>> {
return LiveTimelineEvent(timelineInput, monarchy, taskExecutor.executorScope, timelineEventMapper, roomId, eventId)
return LiveTimelineEvent(monarchy, taskExecutor.executorScope, timelineEventMapper, roomId, eventId)
}
override fun getAttachmentMessages(): List<TimelineEvent> {

View File

@ -18,8 +18,9 @@ package org.matrix.android.sdk.internal.session.room.timeline
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
import androidx.lifecycle.Transformations
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import io.realm.RealmQuery
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
@ -29,66 +30,57 @@ import org.matrix.android.sdk.api.util.Optional
import org.matrix.android.sdk.api.util.toOptional
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
import org.matrix.android.sdk.internal.database.query.where
import java.util.concurrent.atomic.AtomicBoolean
/**
* This class takes care of handling case where local echo is replaced by the synced event in the db.
*/
internal class LiveTimelineEvent(private val timelineInput: TimelineInput,
private val monarchy: Monarchy,
internal class LiveTimelineEvent(private val monarchy: Monarchy,
private val coroutineScope: CoroutineScope,
private val timelineEventMapper: TimelineEventMapper,
private val roomId: String,
private val eventId: String)
: TimelineInput.Listener,
MediatorLiveData<Optional<TimelineEvent>>() {
private var queryLiveData: LiveData<Optional<TimelineEvent>>? = null
// If we are listening to local echo, we want to be aware when event is synced
private var shouldObserveSync = AtomicBoolean(LocalEcho.isLocalEchoId(eventId))
: MediatorLiveData<Optional<TimelineEvent>>() {
init {
buildAndObserveQuery(eventId)
buildAndObserveQuery()
}
private var initialLiveData: LiveData<List<TimelineEvent>>? = null
// Makes sure it's made on the main thread
private fun buildAndObserveQuery(eventIdToObserve: String) = coroutineScope.launch(Dispatchers.Main) {
queryLiveData?.also {
removeSource(it)
}
private fun buildAndObserveQuery() = coroutineScope.launch(Dispatchers.Main) {
val liveData = monarchy.findAllMappedWithChanges(
{ TimelineEventEntity.where(it, roomId = roomId, eventId = eventIdToObserve) },
{ TimelineEventEntity.where(it, roomId = roomId, eventId = eventId) },
{ timelineEventMapper.map(it) }
)
queryLiveData = Transformations.map(liveData) { events ->
events.firstOrNull().toOptional()
}.also {
addSource(it) { newValue -> value = newValue }
addSource(liveData) { newValue ->
value = newValue.firstOrNull().toOptional()
}
initialLiveData = liveData
if (LocalEcho.isLocalEchoId(eventId)) {
observeTimelineEventWithTxId()
}
}
override fun onLocalEchoSynced(roomId: String, localEchoEventId: String, syncedEventId: String) {
if (this.roomId == roomId && localEchoEventId == this.eventId) {
timelineInput.listeners.remove(this)
shouldObserveSync.set(false)
// rebuild the query with the new eventId
buildAndObserveQuery(syncedEventId)
private fun observeTimelineEventWithTxId() {
val liveData = monarchy.findAllMappedWithChanges(
{ it.queryTimelineEventWithTxId() },
{ timelineEventMapper.map(it) }
)
addSource(liveData) { newValue ->
val optionalValue = newValue.firstOrNull().toOptional()
if (optionalValue.hasValue()) {
initialLiveData?.also { removeSource(it) }
value = optionalValue
}
}
}
override fun onActive() {
super.onActive()
if (shouldObserveSync.get()) {
timelineInput.listeners.add(this)
}
}
override fun onInactive() {
super.onInactive()
if (shouldObserveSync.get()) {
timelineInput.listeners.remove(this)
}
private fun Realm.queryTimelineEventWithTxId(): RealmQuery<TimelineEventEntity> {
return where(TimelineEventEntity::class.java)
.equalTo(TimelineEventEntityFields.ROOM_ID, roomId)
.like(TimelineEventEntityFields.ROOT.UNSIGNED_DATA, """{*"transaction_id":*"$eventId"*}""")
}
}

View File

@ -35,16 +35,11 @@ internal class TimelineInput @Inject constructor() {
listeners.toSet().forEach { it.onNewTimelineEvents(roomId, eventIds) }
}
fun onLocalEchoSynced(roomId: String, localEchoEventId: String, syncEventId: String) {
listeners.toSet().forEach { it.onLocalEchoSynced(roomId, localEchoEventId, syncEventId) }
}
val listeners = mutableSetOf<Listener>()
internal interface Listener {
fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) = Unit
fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) = Unit
fun onNewTimelineEvents(roomId: String, eventIds: List<String>) = Unit
fun onLocalEchoSynced(roomId: String, localEchoEventId: String, syncedEventId: String) = Unit
}
}

View File

@ -400,7 +400,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
event.mxDecryptionResult = adapter.fromJson(json)
}
}
timelineInput.onLocalEchoSynced(roomId, it, event.eventId)
// Finally delete the local echo
sendingEventEntity.deleteOnCascade(true)
} else {