Introduce state index trying to replicate RoomState logic. WIP
This commit is contained in:
parent
d46ce8245d
commit
6115b05d97
|
@ -1,10 +1,10 @@
|
|||
package im.vector.matrix.android.api.session.events.model
|
||||
|
||||
import com.google.gson.JsonObject
|
||||
import com.squareup.moshi.Json
|
||||
import com.squareup.moshi.JsonClass
|
||||
import com.squareup.moshi.Types
|
||||
import im.vector.matrix.android.internal.di.MoshiProvider
|
||||
import im.vector.matrix.android.internal.legacy.util.JsonUtils
|
||||
import java.lang.reflect.ParameterizedType
|
||||
|
||||
typealias Content = Map<String, Any>
|
||||
|
||||
|
@ -23,14 +23,8 @@ data class Event(
|
|||
|
||||
) {
|
||||
|
||||
val contentAsJsonObject: JsonObject? by lazy {
|
||||
val gson = JsonUtils.getGson(true)
|
||||
gson.toJsonTree(content).asJsonObject
|
||||
}
|
||||
|
||||
val prevContentAsJsonObject: JsonObject? by lazy {
|
||||
val gson = JsonUtils.getGson(true)
|
||||
gson.toJsonTree(prevContent).asJsonObject
|
||||
fun isStateEvent(): Boolean {
|
||||
return EventType.isStateEvent(type)
|
||||
}
|
||||
|
||||
inline fun <reified T> content(): T? {
|
||||
|
@ -47,4 +41,7 @@ data class Event(
|
|||
return moshiAdapter.fromJsonValue(data)
|
||||
}
|
||||
|
||||
companion object {
|
||||
internal val CONTENT_TYPE: ParameterizedType = Types.newParameterizedType(Map::class.java, String::class.java, Any::class.java)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package im.vector.matrix.android.internal.database.helper
|
||||
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.internal.database.mapper.asEntity
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.query.fastContains
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
|
||||
internal fun ChunkEntity.add(event: Event, stateIndex: Int, paginationDirection: PaginationDirection) {
|
||||
if (!this.isManaged) {
|
||||
throw IllegalStateException("Chunk entity should be managed to use fast contains")
|
||||
}
|
||||
|
||||
val eventEntity = event.asEntity()
|
||||
eventEntity.stateIndex = stateIndex
|
||||
|
||||
if (!this.events.fastContains(eventEntity)) {
|
||||
val position = if (paginationDirection == PaginationDirection.FORWARDS) 0 else this.events.size
|
||||
this.events.add(position, eventEntity)
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package im.vector.matrix.android.internal.database.helper
|
||||
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.internal.database.mapper.asEntity
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.query.fastContains
|
||||
|
||||
internal fun List<Event>.addManagedToChunk(chunkEntity: ChunkEntity) {
|
||||
if (!chunkEntity.isManaged) {
|
||||
throw IllegalStateException("Chunk entity should be managed to use fast contains")
|
||||
}
|
||||
this.forEach { event ->
|
||||
val eventEntity = event.asEntity()
|
||||
if (!chunkEntity.events.fastContains(eventEntity)) {
|
||||
chunkEntity.events.add(eventEntity)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
package im.vector.matrix.android.internal.database.mapper
|
||||
|
||||
import com.squareup.moshi.Types
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.api.session.events.model.UnsignedData
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
|
@ -10,8 +9,7 @@ import im.vector.matrix.android.internal.di.MoshiProvider
|
|||
internal object EventMapper {
|
||||
|
||||
private val moshi = MoshiProvider.providesMoshi()
|
||||
private val type = Types.newParameterizedType(Map::class.java, String::class.java, Any::class.java)
|
||||
private val adapter = moshi.adapter<Map<String, Any>>(type)
|
||||
private val adapter = moshi.adapter<Map<String, Any>>(Event.CONTENT_TYPE)
|
||||
|
||||
fun map(event: Event): EventEntity {
|
||||
val eventEntity = EventEntity()
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package im.vector.matrix.android.internal.database.model
|
||||
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
import io.realm.RealmList
|
||||
import io.realm.RealmObject
|
||||
import io.realm.RealmResults
|
||||
|
@ -7,6 +8,8 @@ import io.realm.annotations.LinkingObjects
|
|||
|
||||
internal open class ChunkEntity(var prevToken: String? = null,
|
||||
var nextToken: String? = null,
|
||||
var prevStateIndex: Int = -1,
|
||||
var nextStateIndex: Int = 1,
|
||||
var isLast: Boolean = false,
|
||||
var events: RealmList<EventEntity> = RealmList()
|
||||
) : RealmObject() {
|
||||
|
@ -15,4 +18,20 @@ internal open class ChunkEntity(var prevToken: String? = null,
|
|||
val room: RealmResults<RoomEntity>? = null
|
||||
|
||||
companion object
|
||||
|
||||
fun stateIndex(direction: PaginationDirection): Int {
|
||||
return when (direction) {
|
||||
PaginationDirection.FORWARDS -> nextStateIndex
|
||||
PaginationDirection.BACKWARDS -> prevStateIndex
|
||||
}
|
||||
}
|
||||
|
||||
fun updateStateIndex(stateIndex: Int, direction: PaginationDirection){
|
||||
when (direction) {
|
||||
PaginationDirection.FORWARDS -> nextStateIndex = stateIndex
|
||||
PaginationDirection.BACKWARDS -> prevStateIndex = stateIndex
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -3,20 +3,22 @@ package im.vector.matrix.android.internal.database.model
|
|||
import io.realm.RealmObject
|
||||
import io.realm.RealmResults
|
||||
import io.realm.annotations.LinkingObjects
|
||||
import io.realm.annotations.PrimaryKey
|
||||
|
||||
internal open class EventEntity(@PrimaryKey var eventId: String = "",
|
||||
var type: String = "",
|
||||
var content: String = "",
|
||||
var prevContent: String? = null,
|
||||
var stateKey: String? = null,
|
||||
var originServerTs: Long? = null,
|
||||
var sender: String? = null,
|
||||
var age: Long? = 0,
|
||||
var redacts: String? = null
|
||||
internal open class EventEntity(var eventId: String = "",
|
||||
var type: String = "",
|
||||
var content: String = "",
|
||||
var prevContent: String? = null,
|
||||
var stateKey: String? = null,
|
||||
var originServerTs: Long? = null,
|
||||
var sender: String? = null,
|
||||
var age: Long? = 0,
|
||||
var redacts: String? = null,
|
||||
var stateIndex: Int = 0
|
||||
) : RealmObject() {
|
||||
|
||||
companion object
|
||||
companion object {
|
||||
const val DEFAULT_STATE_INDEX = Int.MIN_VALUE
|
||||
}
|
||||
|
||||
@LinkingObjects("events")
|
||||
val chunk: RealmResults<ChunkEntity>? = null
|
||||
|
|
|
@ -26,18 +26,13 @@ internal fun EventEntity.Companion.where(realm: Realm, roomId: String? = null, t
|
|||
return query
|
||||
}
|
||||
|
||||
internal fun EventEntity.Companion.stateEvents(realm: Realm, roomId: String): RealmQuery<EventEntity> {
|
||||
return realm.where<EventEntity>()
|
||||
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.ROOM}.${RoomEntityFields.ROOM_ID}", roomId)
|
||||
.isNotNull(EventEntityFields.STATE_KEY)
|
||||
}
|
||||
|
||||
internal fun RealmQuery<EventEntity>.last(from: Long? = null): EventEntity? {
|
||||
internal fun RealmQuery<EventEntity>.last(from: Int? = null): EventEntity? {
|
||||
if (from != null) {
|
||||
this.lessThan(EventEntityFields.ORIGIN_SERVER_TS, from)
|
||||
this.lessThanOrEqualTo(EventEntityFields.STATE_INDEX, from)
|
||||
}
|
||||
return this
|
||||
.sort(EventEntityFields.ORIGIN_SERVER_TS, Sort.DESCENDING)
|
||||
.sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING)
|
||||
.findFirst()
|
||||
}
|
||||
|
||||
|
|
|
@ -19,10 +19,18 @@ internal class MessageEventInterceptor(val monarchy: Monarchy) : EnrichedEventIn
|
|||
|
||||
override fun enrich(roomId: String, event: EnrichedEvent) {
|
||||
monarchy.doWithRealm { realm ->
|
||||
|
||||
if (event.root.eventId == null) {
|
||||
return@doWithRealm
|
||||
}
|
||||
|
||||
val rootEntity = EventEntity.where(realm, eventId = event.root.eventId).findFirst()
|
||||
?: return@doWithRealm
|
||||
|
||||
val roomMember = EventEntity
|
||||
.where(realm, roomId, EventType.STATE_ROOM_MEMBER)
|
||||
.equalTo(EventEntityFields.STATE_KEY, event.root.sender)
|
||||
.last(from = event.root.originServerTs)
|
||||
.last(from = rootEntity.stateIndex)
|
||||
?.asDomain()
|
||||
event.enrichWith(roomMember)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ internal class RoomMembers(private val realm: Realm,
|
|||
fun getLoaded(): Map<String, RoomMember> {
|
||||
return EventEntity
|
||||
.where(realm, roomId, EventType.STATE_ROOM_MEMBER)
|
||||
.sort(EventEntityFields.ORIGIN_SERVER_TS)
|
||||
.sort(EventEntityFields.STATE_INDEX)
|
||||
.findAll()
|
||||
.map { it.asDomain() }
|
||||
.associateBy { it.stateKey!! }
|
||||
|
|
|
@ -8,12 +8,9 @@ import im.vector.matrix.android.api.session.events.interceptor.EnrichedEventInte
|
|||
import im.vector.matrix.android.api.session.events.model.EnrichedEvent
|
||||
import im.vector.matrix.android.api.session.room.TimelineHolder
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
||||
import im.vector.matrix.android.internal.session.events.interceptor.MessageEventInterceptor
|
||||
import io.realm.Sort
|
||||
|
||||
private const val PAGE_SIZE = 30
|
||||
|
||||
|
@ -31,9 +28,9 @@ internal class DefaultTimelineHolder(private val roomId: String,
|
|||
|
||||
override fun liveTimeline(): LiveData<PagedList<EnrichedEvent>> {
|
||||
val realmDataSourceFactory = monarchy.createDataSourceFactory { realm ->
|
||||
EventEntity.where(realm, roomId = roomId)
|
||||
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST}", true)
|
||||
.sort(EventEntityFields.ORIGIN_SERVER_TS, Sort.DESCENDING)
|
||||
ChunkEntity.findLastLiveChunkFromRoom(realm, roomId = roomId)
|
||||
?.events
|
||||
?.where()
|
||||
}
|
||||
val domainSourceFactory = realmDataSourceFactory
|
||||
.map { it.asDomain() }
|
||||
|
|
|
@ -11,5 +11,13 @@ internal enum class PaginationDirection(val value: String) {
|
|||
* Backwards when the event is added to the start of the timeline.
|
||||
* These events come from a back pagination.
|
||||
*/
|
||||
BACKWARDS("b")
|
||||
BACKWARDS("b");
|
||||
|
||||
val incrementStateIndex: Int by lazy {
|
||||
when (this) {
|
||||
FORWARDS -> 1
|
||||
BACKWARDS -> -1
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -4,8 +4,9 @@ import arrow.core.Try
|
|||
import arrow.core.failure
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.MatrixCallback
|
||||
import im.vector.matrix.android.api.session.events.model.EventType
|
||||
import im.vector.matrix.android.api.util.Cancelable
|
||||
import im.vector.matrix.android.internal.database.helper.addManagedToChunk
|
||||
import im.vector.matrix.android.internal.database.helper.add
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.query.fastContains
|
||||
|
@ -26,9 +27,9 @@ import kotlinx.coroutines.launch
|
|||
import kotlinx.coroutines.withContext
|
||||
|
||||
internal class PaginationRequest(private val roomAPI: RoomAPI,
|
||||
private val monarchy: Monarchy,
|
||||
private val coroutineDispatchers: MatrixCoroutineDispatchers,
|
||||
private val stateEventsChunkHandler: StateEventsChunkHandler) {
|
||||
private val monarchy: Monarchy,
|
||||
private val coroutineDispatchers: MatrixCoroutineDispatchers,
|
||||
private val stateEventsChunkHandler: StateEventsChunkHandler) {
|
||||
|
||||
fun execute(roomId: String,
|
||||
from: String?,
|
||||
|
@ -56,11 +57,11 @@ internal class PaginationRequest(private val roomAPI: RoomAPI,
|
|||
executeRequest<TokenChunkEvent> {
|
||||
apiCall = roomAPI.getRoomMessagesFrom(roomId, from, direction.value, limit, filter)
|
||||
}.flatMap { chunk ->
|
||||
insertInDb(chunk, roomId)
|
||||
insertInDb(chunk, roomId, direction)
|
||||
}
|
||||
}
|
||||
|
||||
private fun insertInDb(receivedChunk: TokenChunkEvent, roomId: String): Try<TokenChunkEvent> {
|
||||
private fun insertInDb(receivedChunk: TokenChunkEvent, roomId: String, direction: PaginationDirection): Try<TokenChunkEvent> {
|
||||
return monarchy
|
||||
.tryTransactionSync { realm ->
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
|
||||
|
@ -74,15 +75,18 @@ internal class PaginationRequest(private val roomAPI: RoomAPI,
|
|||
val prevChunk = ChunkEntity.findWithNextToken(realm, roomId, receivedChunk.prevToken)
|
||||
|
||||
val eventIds = receivedChunk.events.filter { it.eventId != null }.map { it.eventId!! }
|
||||
val chunksOverlapped = ChunkEntity.findAllIncludingEvents(realm, eventIds)
|
||||
val chunksOverlapped = realm.copyFromRealm(ChunkEntity.findAllIncludingEvents(realm, eventIds))
|
||||
val hasOverlapped = chunksOverlapped.isNotEmpty()
|
||||
|
||||
val stateEventsChunk = stateEventsChunkHandler.handle(realm, roomId, receivedChunk.stateEvents)
|
||||
if (!roomEntity.chunks.contains(stateEventsChunk)) {
|
||||
roomEntity.chunks.add(stateEventsChunk)
|
||||
}
|
||||
var currentStateIndex = currentChunk.stateIndex(direction)
|
||||
val incrementStateIndex = direction.incrementStateIndex
|
||||
|
||||
receivedChunk.events.addManagedToChunk(currentChunk)
|
||||
receivedChunk.events.forEach { event ->
|
||||
currentChunk.add(event, currentStateIndex, direction)
|
||||
if (EventType.isStateEvent(event.type)) {
|
||||
currentStateIndex += incrementStateIndex
|
||||
}
|
||||
}
|
||||
|
||||
if (prevChunk != null) {
|
||||
currentChunk.events.addAll(prevChunk.events)
|
||||
|
@ -94,14 +98,26 @@ internal class PaginationRequest(private val roomAPI: RoomAPI,
|
|||
if (!currentChunk.events.fastContains(event)) {
|
||||
currentChunk.events.add(event)
|
||||
}
|
||||
if (EventType.isStateEvent(event.type)) {
|
||||
currentStateIndex += incrementStateIndex
|
||||
}
|
||||
}
|
||||
currentChunk.prevToken = overlapped.prevToken
|
||||
roomEntity.chunks.remove(overlapped)
|
||||
}
|
||||
}
|
||||
|
||||
if (!roomEntity.chunks.contains(currentChunk)) {
|
||||
roomEntity.chunks.add(currentChunk)
|
||||
}
|
||||
|
||||
currentChunk.updateStateIndex(currentStateIndex, direction)
|
||||
|
||||
|
||||
val stateEventsChunk = stateEventsChunkHandler.handle(realm, roomId, receivedChunk.stateEvents)
|
||||
if (!roomEntity.chunks.contains(stateEventsChunk)) {
|
||||
roomEntity.chunks.add(stateEventsChunk)
|
||||
}
|
||||
}
|
||||
.map { receivedChunk }
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ import com.zhuinden.monarchy.Monarchy
|
|||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.api.session.events.model.EventType
|
||||
import im.vector.matrix.android.api.session.room.model.MyMembership
|
||||
import im.vector.matrix.android.internal.database.helper.addManagedToChunk
|
||||
import im.vector.matrix.android.internal.database.helper.add
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
|
||||
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
|
||||
import im.vector.matrix.android.internal.session.sync.model.RoomSync
|
||||
import im.vector.matrix.android.internal.session.sync.model.RoomSyncEphemeral
|
||||
|
@ -68,6 +69,7 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
|
|||
roomEntity.chunks.add(chunkEntity)
|
||||
}
|
||||
}
|
||||
|
||||
if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) {
|
||||
val chunkEntity = handleTimelineEvents(realm, roomId, roomSync.timeline.events, roomSync.timeline.prevToken, isLimited = roomSync.timeline.limited)
|
||||
if (!roomEntity.chunks.contains(chunkEntity)) {
|
||||
|
@ -148,7 +150,15 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
|
|||
lastChunk?.isLast = false
|
||||
chunkEntity.isLast = true
|
||||
chunkEntity.nextToken = nextToken
|
||||
eventList.addManagedToChunk(chunkEntity)
|
||||
|
||||
var currentStateIndex = chunkEntity.nextStateIndex
|
||||
eventList.forEach { event ->
|
||||
if (event.isStateEvent() && currentStateIndex != 0) {
|
||||
currentStateIndex += 1
|
||||
}
|
||||
chunkEntity.add(event, currentStateIndex, PaginationDirection.FORWARDS)
|
||||
}
|
||||
chunkEntity.nextStateIndex = currentStateIndex
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
|
|
|
@ -2,9 +2,11 @@ package im.vector.matrix.android.internal.session.sync
|
|||
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.internal.database.DBConstants
|
||||
import im.vector.matrix.android.internal.database.helper.addManagedToChunk
|
||||
import im.vector.matrix.android.internal.database.helper.add
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.query.findWithNextToken
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
import io.realm.Realm
|
||||
import io.realm.kotlin.createObject
|
||||
|
||||
|
@ -18,7 +20,10 @@ internal class StateEventsChunkHandler {
|
|||
nextToken = DBConstants.STATE_EVENTS_CHUNK_TOKEN
|
||||
}
|
||||
|
||||
stateEvents.addManagedToChunk(chunkEntity)
|
||||
|
||||
stateEvents.forEach { event ->
|
||||
chunkEntity.add(event, EventEntity.DEFAULT_STATE_INDEX, PaginationDirection.FORWARDS)
|
||||
}
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue