commit
c80564e1b1
|
@ -5,6 +5,7 @@ Features ✨:
|
|||
-
|
||||
|
||||
Improvements 🙌:
|
||||
- Lazy storage of ReadReceipts
|
||||
- Do not load room members in e2e after init sync
|
||||
|
||||
Bugfix 🐛:
|
||||
|
|
|
@ -117,7 +117,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
|
|||
}
|
||||
if (readReceiptId != null) {
|
||||
val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId)
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, false)
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, false, null)
|
||||
}
|
||||
if (shouldUpdateRoomSummary) {
|
||||
val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst()
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendState
|
|||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.database.query.whereRoomId
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import org.matrix.android.sdk.internal.task.configureWith
|
||||
import org.matrix.android.sdk.internal.util.Debouncer
|
||||
|
@ -73,7 +74,8 @@ internal class DefaultTimeline(
|
|||
private val timelineInput: TimelineInput,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val realmSessionProvider: RealmSessionProvider,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val readReceiptHandler: ReadReceiptHandler
|
||||
) : Timeline,
|
||||
TimelineHiddenReadReceipts.Delegate,
|
||||
TimelineInput.Listener,
|
||||
|
@ -182,11 +184,27 @@ internal class DefaultTimeline(
|
|||
}
|
||||
.executeBy(taskExecutor)
|
||||
|
||||
// Ensure ReadReceipt from init sync are loaded
|
||||
ensureReadReceiptAreLoaded(realm)
|
||||
|
||||
isReady.set(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun ensureReadReceiptAreLoaded(realm: Realm) {
|
||||
readReceiptHandler.getContentFromInitSync(roomId)
|
||||
?.also {
|
||||
Timber.w("INIT_SYNC Insert when opening timeline RR for room $roomId")
|
||||
}
|
||||
?.let { readReceiptContent ->
|
||||
realm.executeTransactionAsync {
|
||||
readReceiptHandler.handle(it, roomId, readReceiptContent, false, null)
|
||||
readReceiptHandler.onContentFromInitSyncHandled(roomId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun TimelineSettings.shouldHandleHiddenReadReceipts(): Boolean {
|
||||
return buildReadReceipts && (filters.filterEdits || filters.filterTypes)
|
||||
}
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import androidx.lifecycle.LiveData
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedInject
|
||||
import dagger.assisted.AssistedFactory
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedFactory
|
||||
import dagger.assisted.AssistedInject
|
||||
import io.realm.Sort
|
||||
import io.realm.kotlin.where
|
||||
import org.matrix.android.sdk.api.session.events.model.isImageMessage
|
||||
|
@ -38,20 +38,23 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
|||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
|
||||
internal class DefaultTimelineService @AssistedInject constructor(@Assisted private val roomId: String,
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
private val realmSessionProvider: RealmSessionProvider,
|
||||
private val timelineInput: TimelineInput,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val contextOfEventTask: GetContextOfEventTask,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask
|
||||
internal class DefaultTimelineService @AssistedInject constructor(
|
||||
@Assisted private val roomId: String,
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
private val realmSessionProvider: RealmSessionProvider,
|
||||
private val timelineInput: TimelineInput,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val contextOfEventTask: GetContextOfEventTask,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val readReceiptHandler: ReadReceiptHandler
|
||||
) : TimelineService {
|
||||
|
||||
@AssistedFactory
|
||||
|
@ -74,7 +77,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
|
|||
eventDecryptor = eventDecryptor,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
realmSessionProvider = realmSessionProvider,
|
||||
loadRoomMembersTask = loadRoomMembersTask
|
||||
loadRoomMembersTask = loadRoomMembersTask,
|
||||
readReceiptHandler = readReceiptHandler
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -42,9 +42,9 @@ sealed class InitialSyncStrategy {
|
|||
val minSizeToSplit: Long = 1024 * 1024,
|
||||
/**
|
||||
* Limit per room to reach to decide to store a join room ephemeral Events into a file
|
||||
* Empiric value: 6 kilobytes
|
||||
* Empiric value: 1 kilobytes
|
||||
*/
|
||||
val minSizeToStoreInFile: Long = 6 * 1024,
|
||||
val minSizeToStoreInFile: Long = 1024,
|
||||
/**
|
||||
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
|
||||
*/
|
||||
|
|
|
@ -16,12 +16,13 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import io.realm.Realm
|
||||
import org.matrix.android.sdk.api.session.events.model.EventType
|
||||
import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.query.createUnmanaged
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import io.realm.Realm
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
|
@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map<String, Map<String, Map<String, Map<String, D
|
|||
private const val READ_KEY = "m.read"
|
||||
private const val TIMESTAMP_KEY = "ts"
|
||||
|
||||
internal class ReadReceiptHandler @Inject constructor() {
|
||||
internal class ReadReceiptHandler @Inject constructor(
|
||||
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
|
||||
) {
|
||||
|
||||
companion object {
|
||||
|
||||
|
@ -52,22 +55,29 @@ internal class ReadReceiptHandler @Inject constructor() {
|
|||
}
|
||||
}
|
||||
|
||||
fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) {
|
||||
if (content == null) {
|
||||
return
|
||||
}
|
||||
fun handle(realm: Realm,
|
||||
roomId: String,
|
||||
content: ReadReceiptContent?,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator?) {
|
||||
content ?: return
|
||||
|
||||
try {
|
||||
handleReadReceiptContent(realm, roomId, content, isInitialSync)
|
||||
handleReadReceiptContent(realm, roomId, content, isInitialSync, aggregator)
|
||||
} catch (exception: Exception) {
|
||||
Timber.e("Fail to handle read receipt for room $roomId")
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent, isInitialSync: Boolean) {
|
||||
private fun handleReadReceiptContent(realm: Realm,
|
||||
roomId: String,
|
||||
content: ReadReceiptContent,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator?) {
|
||||
if (isInitialSync) {
|
||||
initialSyncStrategy(realm, roomId, content)
|
||||
} else {
|
||||
incrementalSyncStrategy(realm, roomId, content)
|
||||
incrementalSyncStrategy(realm, roomId, content, aggregator)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,7 +97,21 @@ internal class ReadReceiptHandler @Inject constructor() {
|
|||
realm.insertOrUpdate(readReceiptSummaries)
|
||||
}
|
||||
|
||||
private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
|
||||
private fun incrementalSyncStrategy(realm: Realm,
|
||||
roomId: String,
|
||||
content: ReadReceiptContent,
|
||||
aggregator: SyncResponsePostTreatmentAggregator?) {
|
||||
// First check if we have data from init sync to handle
|
||||
getContentFromInitSync(roomId)?.let {
|
||||
Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId")
|
||||
doIncrementalSyncStrategy(realm, roomId, it)
|
||||
aggregator?.ephemeralFilesToDelete?.add(roomId)
|
||||
}
|
||||
|
||||
doIncrementalSyncStrategy(realm, roomId, content)
|
||||
}
|
||||
|
||||
private fun doIncrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
|
||||
for ((eventId, receiptDict) in content) {
|
||||
val userIdsDict = receiptDict[READ_KEY] ?: continue
|
||||
val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst()
|
||||
|
@ -110,4 +134,27 @@ internal class ReadReceiptHandler @Inject constructor() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun getContentFromInitSync(roomId: String): ReadReceiptContent? {
|
||||
val dataFromFile = roomSyncEphemeralTemporaryStore.read(roomId)
|
||||
|
||||
dataFromFile ?: return null
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val content = dataFromFile
|
||||
.events
|
||||
.firstOrNull { it.type == EventType.RECEIPT }
|
||||
?.content as? ReadReceiptContent
|
||||
|
||||
if (content == null) {
|
||||
// We can delete the file now
|
||||
roomSyncEphemeralTemporaryStore.delete(roomId)
|
||||
}
|
||||
|
||||
return content
|
||||
}
|
||||
|
||||
fun onContentFromInitSyncHandled(roomId: String) {
|
||||
roomSyncEphemeralTemporaryStore.delete(roomId)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import com.squareup.moshi.JsonReader
|
||||
import com.squareup.moshi.Moshi
|
||||
import okio.buffer
|
||||
import okio.source
|
||||
import org.matrix.android.sdk.internal.di.SessionFilesDirectory
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
|
||||
import org.matrix.android.sdk.internal.util.md5
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import javax.inject.Inject
|
||||
|
||||
internal interface RoomSyncEphemeralTemporaryStore {
|
||||
fun write(roomId: String, roomSyncEphemeralJson: String)
|
||||
fun read(roomId: String): RoomSyncEphemeral?
|
||||
fun reset()
|
||||
fun delete(roomId: String)
|
||||
}
|
||||
|
||||
internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor(
|
||||
@SessionFilesDirectory fileDirectory: File,
|
||||
moshi: Moshi
|
||||
) : RoomSyncEphemeralTemporaryStore {
|
||||
|
||||
private val workingDir = File(fileDirectory, "rr")
|
||||
.also { it.mkdirs() }
|
||||
|
||||
private val roomSyncEphemeralAdapter = moshi.adapter(RoomSyncEphemeral::class.java)
|
||||
|
||||
/**
|
||||
* Write RoomSyncEphemeral to a file
|
||||
*/
|
||||
override fun write(roomId: String, roomSyncEphemeralJson: String) {
|
||||
Timber.w("INIT_SYNC Store ephemeral events for room $roomId")
|
||||
getFile(roomId).writeText(roomSyncEphemeralJson)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read RoomSyncEphemeral from a file, or null if there is no file to read
|
||||
*/
|
||||
override fun read(roomId: String): RoomSyncEphemeral? {
|
||||
return getFile(roomId)
|
||||
.takeIf { it.exists() }
|
||||
?.inputStream()
|
||||
?.use { pos ->
|
||||
roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))
|
||||
}
|
||||
}
|
||||
|
||||
override fun delete(roomId: String) {
|
||||
getFile(roomId).delete()
|
||||
}
|
||||
|
||||
override fun reset() {
|
||||
workingDir.deleteRecursively()
|
||||
workingDir.mkdirs()
|
||||
}
|
||||
|
||||
private fun getFile(roomId: String): File {
|
||||
return File(workingDir, "${roomId.md5()}.json")
|
||||
}
|
||||
}
|
|
@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
|
|||
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
|
||||
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
|
||||
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
||||
|
@ -87,29 +88,21 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
fun handle(realm: Realm,
|
||||
roomsSyncResponse: RoomsSyncResponse,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter? = null) {
|
||||
Timber.v("Execute transaction from $this")
|
||||
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
|
||||
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
|
||||
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
|
||||
}
|
||||
|
||||
fun handleInitSyncEphemeral(realm: Realm,
|
||||
roomsSyncResponse: RoomsSyncResponse) {
|
||||
roomsSyncResponse.join.forEach { roomSync ->
|
||||
val ephemeralResult = roomSync.value.ephemeral
|
||||
?.roomSyncEphemeral
|
||||
?.events
|
||||
?.takeIf { it.isNotEmpty() }
|
||||
?.let { events -> handleEphemeral(realm, roomSync.key, events, true) }
|
||||
|
||||
roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult)
|
||||
}
|
||||
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
|
||||
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
|
||||
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter)
|
||||
}
|
||||
|
||||
// PRIVATE METHODS *****************************************************************************
|
||||
|
||||
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
|
||||
private fun handleRoomSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
val insertType = if (isInitialSync) {
|
||||
EventInsertType.INITIAL_SYNC
|
||||
} else {
|
||||
|
@ -119,12 +112,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
val rooms = when (handlingStrategy) {
|
||||
is HandlingStrategy.JOINED -> {
|
||||
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter)
|
||||
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, aggregator, reporter)
|
||||
// Rooms are already inserted, return an empty list
|
||||
emptyList()
|
||||
} else {
|
||||
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis)
|
||||
handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,6 +138,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
private fun insertJoinRoomsFromInitSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy.JOINED,
|
||||
syncLocalTimeStampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||
val listSize = handlingStrategy.data.keys.size
|
||||
|
@ -165,9 +159,9 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
realm = realm,
|
||||
roomId = it,
|
||||
roomSync = handlingStrategy.data[it] ?: error("Should not happen"),
|
||||
handleEphemeralEvents = false,
|
||||
insertType = EventInsertType.INITIAL_SYNC,
|
||||
syncLocalTimestampMillis = syncLocalTimeStampMillis
|
||||
syncLocalTimestampMillis = syncLocalTimeStampMillis,
|
||||
aggregator
|
||||
)
|
||||
}
|
||||
realm.insertOrUpdate(roomEntities)
|
||||
|
@ -177,7 +171,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
} else {
|
||||
// No need to split
|
||||
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis)
|
||||
handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis, aggregator)
|
||||
}
|
||||
realm.insertOrUpdate(rooms)
|
||||
}
|
||||
|
@ -186,17 +180,16 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
private fun handleJoinedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
handleEphemeralEvents: Boolean,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long): RoomEntity {
|
||||
syncLocalTimestampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
|
||||
Timber.v("Handle join sync for room $roomId")
|
||||
|
||||
var ephemeralResult: EphemeralResult? = null
|
||||
if (handleEphemeralEvents) {
|
||||
ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events
|
||||
?.takeIf { it.isNotEmpty() }
|
||||
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) }
|
||||
}
|
||||
val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed)
|
||||
?._roomSyncEphemeral
|
||||
?.events
|
||||
?.takeIf { it.isNotEmpty() }
|
||||
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC, aggregator) }
|
||||
|
||||
if (roomSync.accountData?.events?.isNotEmpty() == true) {
|
||||
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
|
||||
|
@ -436,14 +429,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
private fun handleEphemeral(realm: Realm,
|
||||
roomId: String,
|
||||
ephemeralEvents: List<Event>,
|
||||
isInitialSync: Boolean): EphemeralResult {
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): EphemeralResult {
|
||||
var result = EphemeralResult()
|
||||
for (event in ephemeralEvents) {
|
||||
when (event.type) {
|
||||
EventType.RECEIPT -> {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
(event.content as? ReadReceiptContent)?.let { readReceiptContent ->
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync)
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync, aggregator)
|
||||
}
|
||||
}
|
||||
EventType.TYPING -> {
|
||||
|
|
|
@ -26,6 +26,7 @@ import javax.inject.Inject
|
|||
internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String,
|
||||
private val typingUsersTracker: DefaultTypingUsersTracker) {
|
||||
|
||||
// TODO This could be handled outside of the Realm transaction. Use the new aggregator?
|
||||
fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) {
|
||||
val roomMemberHelper = RoomMemberHelper(realm, roomId)
|
||||
val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty()
|
||||
|
|
|
@ -37,4 +37,7 @@ internal abstract class SyncModule {
|
|||
|
||||
@Binds
|
||||
abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask
|
||||
|
||||
@Binds
|
||||
abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore
|
||||
}
|
||||
|
|
|
@ -41,17 +41,19 @@ import kotlin.system.measureTimeMillis
|
|||
|
||||
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
|
||||
|
||||
internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
|
||||
@SessionId private val sessionId: String,
|
||||
private val workManagerProvider: WorkManagerProvider,
|
||||
private val roomSyncHandler: RoomSyncHandler,
|
||||
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
|
||||
private val groupSyncHandler: GroupSyncHandler,
|
||||
private val cryptoSyncHandler: CryptoSyncHandler,
|
||||
private val cryptoService: DefaultCryptoService,
|
||||
private val tokenStore: SyncTokenStore,
|
||||
private val processEventForPushTask: ProcessEventForPushTask,
|
||||
private val pushRuleService: PushRuleService) {
|
||||
internal class SyncResponseHandler @Inject constructor(
|
||||
@SessionDatabase private val monarchy: Monarchy,
|
||||
@SessionId private val sessionId: String,
|
||||
private val workManagerProvider: WorkManagerProvider,
|
||||
private val roomSyncHandler: RoomSyncHandler,
|
||||
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
|
||||
private val groupSyncHandler: GroupSyncHandler,
|
||||
private val cryptoSyncHandler: CryptoSyncHandler,
|
||||
private val aggregatorHandler: SyncResponsePostTreatmentAggregatorHandler,
|
||||
private val cryptoService: DefaultCryptoService,
|
||||
private val tokenStore: SyncTokenStore,
|
||||
private val processEventForPushTask: ProcessEventForPushTask,
|
||||
private val pushRuleService: PushRuleService) {
|
||||
|
||||
suspend fun handleResponse(syncResponse: SyncResponse,
|
||||
fromToken: String?,
|
||||
|
@ -81,13 +83,14 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
|
|||
}.also {
|
||||
Timber.v("Finish handling toDevice in $it ms")
|
||||
}
|
||||
val aggregator = SyncResponsePostTreatmentAggregator()
|
||||
// Start one big transaction
|
||||
monarchy.awaitTransaction { realm ->
|
||||
measureTimeMillis {
|
||||
Timber.v("Handle rooms")
|
||||
reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) {
|
||||
if (syncResponse.rooms != null) {
|
||||
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter)
|
||||
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator, reporter)
|
||||
}
|
||||
}
|
||||
}.also {
|
||||
|
@ -115,7 +118,10 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
|
|||
}
|
||||
tokenStore.saveToken(realm, syncResponse.nextBatch)
|
||||
}
|
||||
|
||||
// Everything else we need to do outside the transaction
|
||||
aggregatorHandler.handle(aggregator)
|
||||
|
||||
syncResponse.rooms?.let {
|
||||
checkPushRules(it, isInitialSync)
|
||||
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
|
||||
|
@ -128,15 +134,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
|
|||
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
||||
}
|
||||
|
||||
suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) {
|
||||
// Start another transaction to handle the ephemeral events
|
||||
monarchy.awaitTransaction { realm ->
|
||||
if (syncResponse.rooms != null) {
|
||||
roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* At the moment we don't get any group data through the sync, so we poll where every hour.
|
||||
* You can also force to refetch group data using [Group] API.
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
internal class SyncResponsePostTreatmentAggregator {
|
||||
// List of RoomId
|
||||
val ephemeralFilesToDelete = mutableListOf<String>()
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
|
||||
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
|
||||
) {
|
||||
fun handle(synResHaResponsePostTreatmentAggregator: SyncResponsePostTreatmentAggregator) {
|
||||
cleanupEphemeralFiles(synResHaResponsePostTreatmentAggregator.ephemeralFilesToDelete)
|
||||
}
|
||||
|
||||
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) {
|
||||
ephemeralFilesToDelete.forEach {
|
||||
ephemeralTemporaryStore.delete(it)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
private val globalErrorReceiver: GlobalErrorReceiver,
|
||||
@SessionFilesDirectory
|
||||
private val fileDirectory: File,
|
||||
private val syncResponseParser: InitialSyncResponseParser
|
||||
private val syncResponseParser: InitialSyncResponseParser,
|
||||
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
|
||||
) : SyncTask {
|
||||
|
||||
private val workingDir = File(fileDirectory, "is")
|
||||
|
@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
if (isInitialSync) {
|
||||
Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}")
|
||||
val initSyncStrategy = initialSyncStrategy
|
||||
var syncResp: SyncResponse? = null
|
||||
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
|
||||
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||
roomSyncEphemeralTemporaryStore.reset()
|
||||
workingDir.mkdirs()
|
||||
val file = downloadInitSyncResponse(requestParams)
|
||||
syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
|
||||
reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
|
||||
handleSyncFile(file, initSyncStrategy)
|
||||
}
|
||||
// Delete all files
|
||||
workingDir.deleteRecursively()
|
||||
} else {
|
||||
val syncResponse = logDuration("INIT_SYNC Request") {
|
||||
executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||
|
@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
}
|
||||
}
|
||||
initialSyncProgressService.endAll()
|
||||
|
||||
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||
logDuration("INIT_SYNC Handle ephemeral") {
|
||||
syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!)
|
||||
}
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
|
||||
// Delete all files
|
||||
workingDir.deleteRecursively()
|
||||
}
|
||||
} else {
|
||||
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||
apiCall = syncAPI.sync(
|
||||
|
@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
}
|
||||
|
||||
private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File {
|
||||
workingDir.mkdirs()
|
||||
val workingFile = File(workingDir, "initSync.json")
|
||||
val status = initialSyncStatusRepository.getStep()
|
||||
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
|
||||
|
@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse {
|
||||
return logDuration("INIT_SYNC handleSyncFile()") {
|
||||
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) {
|
||||
logDuration("INIT_SYNC handleSyncFile()") {
|
||||
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
|
||||
syncResponseParser.parse(initSyncStrategy, workingFile)
|
||||
}
|
||||
|
@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
logDuration("INIT_SYNC Database insertion") {
|
||||
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
|
||||
}
|
||||
syncResponse
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,28 +16,10 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync.model
|
||||
|
||||
import com.squareup.moshi.JsonAdapter
|
||||
import com.squareup.moshi.JsonClass
|
||||
import com.squareup.moshi.JsonReader
|
||||
import okio.buffer
|
||||
import okio.source
|
||||
import java.io.File
|
||||
|
||||
@JsonClass(generateAdapter = false)
|
||||
internal sealed class LazyRoomSyncEphemeral {
|
||||
data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral()
|
||||
data class Stored(val roomSyncEphemeralAdapter: JsonAdapter<RoomSyncEphemeral>, val file: File) : LazyRoomSyncEphemeral()
|
||||
|
||||
val roomSyncEphemeral: RoomSyncEphemeral
|
||||
get() {
|
||||
return when (this) {
|
||||
is Parsed -> _roomSyncEphemeral
|
||||
is Stored -> {
|
||||
// Parse the file now
|
||||
file.inputStream().use { pos ->
|
||||
roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!!
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
object Stored : LazyRoomSyncEphemeral()
|
||||
}
|
||||
|
|
|
@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader
|
|||
import com.squareup.moshi.JsonWriter
|
||||
import com.squareup.moshi.ToJson
|
||||
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
||||
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
|
||||
|
||||
|
@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
internal class SplitLazyRoomSyncJsonAdapter(
|
||||
private val workingDirectory: File,
|
||||
internal class SplitLazyRoomSyncEphemeralJsonAdapter(
|
||||
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore,
|
||||
private val syncStrategy: InitialSyncStrategy.Optimized
|
||||
) {
|
||||
private val atomicInteger = AtomicInteger(0)
|
||||
|
||||
private fun createFile(): File {
|
||||
val index = atomicInteger.getAndIncrement()
|
||||
return File(workingDirectory, "room_$index.json")
|
||||
}
|
||||
|
||||
@FromJson
|
||||
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSyncEphemeral>): LazyRoomSyncEphemeral? {
|
||||
val path = reader.path
|
||||
val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral")
|
||||
|
||||
val json = reader.nextSource().inputStream().bufferedReader().use {
|
||||
it.readText()
|
||||
}
|
||||
|
@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter(
|
|||
return if (json.length > limit) {
|
||||
Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file")
|
||||
// Copy the source to a file
|
||||
val file = createFile()
|
||||
file.writeText(json)
|
||||
LazyRoomSyncEphemeral.Stored(delegate, file)
|
||||
roomSyncEphemeralTemporaryStore.write(roomId, json)
|
||||
LazyRoomSyncEphemeral.Stored
|
||||
} else {
|
||||
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
|
||||
val roomSync = delegate.fromJson(json) ?: return null
|
||||
|
|
|
@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi
|
|||
import okio.buffer
|
||||
import okio.source
|
||||
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
||||
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
|
||||
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) {
|
||||
internal class InitialSyncResponseParser @Inject constructor(
|
||||
private val moshi: Moshi,
|
||||
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
|
||||
) {
|
||||
|
||||
fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse {
|
||||
val syncResponseLength = workingFile.length().toInt()
|
||||
Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes")
|
||||
val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit
|
||||
Timber.v("INIT_SYNC should split in several files: $shouldSplit")
|
||||
return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit)
|
||||
return getMoshi(syncStrategy, shouldSplit)
|
||||
.adapter(SyncResponse::class.java)
|
||||
.fromJson(workingFile.source().buffer())!!
|
||||
}
|
||||
|
||||
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi {
|
||||
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi {
|
||||
// If we don't have to split we'll rely on the already default moshi
|
||||
if (!shouldSplit) return moshi
|
||||
// Otherwise, we create a new adapter for handling Map of Lazy sync
|
||||
return moshi.newBuilder()
|
||||
.add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy))
|
||||
.add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy))
|
||||
.build()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue