diff --git a/CHANGES.md b/CHANGES.md index 8144e0a478..97e78e0a9c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ Features ✨: - Improvements 🙌: + - Lazy storage of ReadReceipts - Do not load room members in e2e after init sync Bugfix 🐛: diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt index c7f962a699..54d2307dd4 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt @@ -117,7 +117,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor( } if (readReceiptId != null) { val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId) - readReceiptHandler.handle(realm, roomId, readReceiptContent, false) + readReceiptHandler.handle(realm, roomId, readReceiptContent, false, null) } if (shouldUpdateRoomSummary) { val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt index d0946abe28..61f770b956 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt @@ -44,6 +44,7 @@ import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendState import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.database.query.whereRoomId import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask +import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.task.configureWith import org.matrix.android.sdk.internal.util.Debouncer @@ -73,7 +74,8 @@ internal class DefaultTimeline( private val timelineInput: TimelineInput, private val eventDecryptor: TimelineEventDecryptor, private val realmSessionProvider: RealmSessionProvider, - private val loadRoomMembersTask: LoadRoomMembersTask + private val loadRoomMembersTask: LoadRoomMembersTask, + private val readReceiptHandler: ReadReceiptHandler ) : Timeline, TimelineHiddenReadReceipts.Delegate, TimelineInput.Listener, @@ -182,11 +184,27 @@ internal class DefaultTimeline( } .executeBy(taskExecutor) + // Ensure ReadReceipt from init sync are loaded + ensureReadReceiptAreLoaded(realm) + isReady.set(true) } } } + private fun ensureReadReceiptAreLoaded(realm: Realm) { + readReceiptHandler.getContentFromInitSync(roomId) + ?.also { + Timber.w("INIT_SYNC Insert when opening timeline RR for room $roomId") + } + ?.let { readReceiptContent -> + realm.executeTransactionAsync { + readReceiptHandler.handle(it, roomId, readReceiptContent, false, null) + readReceiptHandler.onContentFromInitSyncHandled(roomId) + } + } + } + private fun TimelineSettings.shouldHandleHiddenReadReceipts(): Boolean { return buildReadReceipts && (filters.filterEdits || filters.filterTypes) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt index bb774022d4..c3714a1303 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt @@ -17,10 +17,10 @@ package org.matrix.android.sdk.internal.session.room.timeline import androidx.lifecycle.LiveData -import dagger.assisted.Assisted -import dagger.assisted.AssistedInject -import dagger.assisted.AssistedFactory import com.zhuinden.monarchy.Monarchy +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject import io.realm.Sort import io.realm.kotlin.where import org.matrix.android.sdk.api.session.events.model.isImageMessage @@ -38,20 +38,23 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask +import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler import org.matrix.android.sdk.internal.task.TaskExecutor -internal class DefaultTimelineService @AssistedInject constructor(@Assisted private val roomId: String, - @SessionDatabase private val monarchy: Monarchy, - private val realmSessionProvider: RealmSessionProvider, - private val timelineInput: TimelineInput, - private val taskExecutor: TaskExecutor, - private val contextOfEventTask: GetContextOfEventTask, - private val eventDecryptor: TimelineEventDecryptor, - private val paginationTask: PaginationTask, - private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask, - private val timelineEventMapper: TimelineEventMapper, - private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper, - private val loadRoomMembersTask: LoadRoomMembersTask +internal class DefaultTimelineService @AssistedInject constructor( + @Assisted private val roomId: String, + @SessionDatabase private val monarchy: Monarchy, + private val realmSessionProvider: RealmSessionProvider, + private val timelineInput: TimelineInput, + private val taskExecutor: TaskExecutor, + private val contextOfEventTask: GetContextOfEventTask, + private val eventDecryptor: TimelineEventDecryptor, + private val paginationTask: PaginationTask, + private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask, + private val timelineEventMapper: TimelineEventMapper, + private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper, + private val loadRoomMembersTask: LoadRoomMembersTask, + private val readReceiptHandler: ReadReceiptHandler ) : TimelineService { @AssistedFactory @@ -74,7 +77,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv eventDecryptor = eventDecryptor, fetchTokenAndPaginateTask = fetchTokenAndPaginateTask, realmSessionProvider = realmSessionProvider, - loadRoomMembersTask = loadRoomMembersTask + loadRoomMembersTask = loadRoomMembersTask, + readReceiptHandler = readReceiptHandler ) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt index 297cc213ed..7d93e30191 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt @@ -42,9 +42,9 @@ sealed class InitialSyncStrategy { val minSizeToSplit: Long = 1024 * 1024, /** * Limit per room to reach to decide to store a join room ephemeral Events into a file - * Empiric value: 6 kilobytes + * Empiric value: 1 kilobytes */ - val minSizeToStoreInFile: Long = 6 * 1024, + val minSizeToStoreInFile: Long = 1024, /** * Max number of rooms to insert at a time in database (to avoid too much RAM usage) */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index a3c5891f68..e5d9217db7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -16,12 +16,13 @@ package org.matrix.android.sdk.internal.session.sync +import io.realm.Realm +import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity import org.matrix.android.sdk.internal.database.query.createUnmanaged import org.matrix.android.sdk.internal.database.query.getOrCreate import org.matrix.android.sdk.internal.database.query.where -import io.realm.Realm import timber.log.Timber import javax.inject.Inject @@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map + roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer())) + } + } + + override fun delete(roomId: String) { + getFile(roomId).delete() + } + + override fun reset() { + workingDir.deleteRecursively() + workingDir.mkdirs() + } + + private fun getFile(roomId: String): File { + return File(workingDir, "${roomId.md5()}.json") + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index b2db6320f1..a96d55d028 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSync import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse @@ -87,29 +88,21 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle fun handle(realm: Realm, roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, reporter: ProgressReporter? = null) { Timber.v("Execute transaction from $this") - handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) - } - - fun handleInitSyncEphemeral(realm: Realm, - roomsSyncResponse: RoomsSyncResponse) { - roomsSyncResponse.join.forEach { roomSync -> - val ephemeralResult = roomSync.value.ephemeral - ?.roomSyncEphemeral - ?.events - ?.takeIf { it.isNotEmpty() } - ?.let { events -> handleEphemeral(realm, roomSync.key, events, true) } - - roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult) - } + handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter) + handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter) + handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter) } // PRIVATE METHODS ***************************************************************************** - private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) { + private fun handleRoomSync(realm: Realm, + handlingStrategy: HandlingStrategy, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val insertType = if (isInitialSync) { EventInsertType.INITIAL_SYNC } else { @@ -119,12 +112,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle val rooms = when (handlingStrategy) { is HandlingStrategy.JOINED -> { if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) { - insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter) + insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, aggregator, reporter) // Rooms are already inserted, return an empty list emptyList() } else { handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator) } } } @@ -145,6 +138,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun insertJoinRoomsFromInitSync(realm: Realm, handlingStrategy: HandlingStrategy.JOINED, syncLocalTimeStampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator, reporter: ProgressReporter?) { val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE val listSize = handlingStrategy.data.keys.size @@ -165,9 +159,9 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm = realm, roomId = it, roomSync = handlingStrategy.data[it] ?: error("Should not happen"), - handleEphemeralEvents = false, insertType = EventInsertType.INITIAL_SYNC, - syncLocalTimestampMillis = syncLocalTimeStampMillis + syncLocalTimestampMillis = syncLocalTimeStampMillis, + aggregator ) } realm.insertOrUpdate(roomEntities) @@ -177,7 +171,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } else { // No need to split val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis, aggregator) } realm.insertOrUpdate(rooms) } @@ -186,17 +180,16 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun handleJoinedRoom(realm: Realm, roomId: String, roomSync: RoomSync, - handleEphemeralEvents: Boolean, insertType: EventInsertType, - syncLocalTimestampMillis: Long): RoomEntity { + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { Timber.v("Handle join sync for room $roomId") - var ephemeralResult: EphemeralResult? = null - if (handleEphemeralEvents) { - ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events - ?.takeIf { it.isNotEmpty() } - ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } - } + val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) + ?._roomSyncEphemeral + ?.events + ?.takeIf { it.isNotEmpty() } + ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC, aggregator) } if (roomSync.accountData?.events?.isNotEmpty() == true) { handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) @@ -436,14 +429,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun handleEphemeral(realm: Realm, roomId: String, ephemeralEvents: List, - isInitialSync: Boolean): EphemeralResult { + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator): EphemeralResult { var result = EphemeralResult() for (event in ephemeralEvents) { when (event.type) { EventType.RECEIPT -> { @Suppress("UNCHECKED_CAST") (event.content as? ReadReceiptContent)?.let { readReceiptContent -> - readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync) + readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync, aggregator) } } EventType.TYPING -> { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt index f4f3e6ce43..b7851031ad 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt @@ -26,6 +26,7 @@ import javax.inject.Inject internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String, private val typingUsersTracker: DefaultTypingUsersTracker) { + // TODO This could be handled outside of the Realm transaction. Use the new aggregator? fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) { val roomMemberHelper = RoomMemberHelper(realm, roomId) val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt index 010c029c97..4b31dc4d9b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt @@ -37,4 +37,7 @@ internal abstract class SyncModule { @Binds abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask + + @Binds + abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index d17a672485..8e243c3443 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -41,17 +41,19 @@ import kotlin.system.measureTimeMillis private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER" -internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy, - @SessionId private val sessionId: String, - private val workManagerProvider: WorkManagerProvider, - private val roomSyncHandler: RoomSyncHandler, - private val userAccountDataSyncHandler: UserAccountDataSyncHandler, - private val groupSyncHandler: GroupSyncHandler, - private val cryptoSyncHandler: CryptoSyncHandler, - private val cryptoService: DefaultCryptoService, - private val tokenStore: SyncTokenStore, - private val processEventForPushTask: ProcessEventForPushTask, - private val pushRuleService: PushRuleService) { +internal class SyncResponseHandler @Inject constructor( + @SessionDatabase private val monarchy: Monarchy, + @SessionId private val sessionId: String, + private val workManagerProvider: WorkManagerProvider, + private val roomSyncHandler: RoomSyncHandler, + private val userAccountDataSyncHandler: UserAccountDataSyncHandler, + private val groupSyncHandler: GroupSyncHandler, + private val cryptoSyncHandler: CryptoSyncHandler, + private val aggregatorHandler: SyncResponsePostTreatmentAggregatorHandler, + private val cryptoService: DefaultCryptoService, + private val tokenStore: SyncTokenStore, + private val processEventForPushTask: ProcessEventForPushTask, + private val pushRuleService: PushRuleService) { suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?, @@ -81,13 +83,14 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private }.also { Timber.v("Finish handling toDevice in $it ms") } + val aggregator = SyncResponsePostTreatmentAggregator() // Start one big transaction monarchy.awaitTransaction { realm -> measureTimeMillis { Timber.v("Handle rooms") reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) { if (syncResponse.rooms != null) { - roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter) + roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator, reporter) } } }.also { @@ -115,7 +118,10 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private } tokenStore.saveToken(realm, syncResponse.nextBatch) } + // Everything else we need to do outside the transaction + aggregatorHandler.handle(aggregator) + syncResponse.rooms?.let { checkPushRules(it, isInitialSync) userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite) @@ -128,15 +134,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private cryptoSyncHandler.onSyncCompleted(syncResponse) } - suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) { - // Start another transaction to handle the ephemeral events - monarchy.awaitTransaction { realm -> - if (syncResponse.rooms != null) { - roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms) - } - } - } - /** * At the moment we don't get any group data through the sync, so we poll where every hour. * You can also force to refetch group data using [Group] API. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt new file mode 100644 index 0000000000..ea10a32f3e --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +internal class SyncResponsePostTreatmentAggregator { + // List of RoomId + val ephemeralFilesToDelete = mutableListOf() +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt new file mode 100644 index 0000000000..12b77c706b --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +import javax.inject.Inject + +internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor( + private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore +) { + fun handle(synResHaResponsePostTreatmentAggregator: SyncResponsePostTreatmentAggregator) { + cleanupEphemeralFiles(synResHaResponsePostTreatmentAggregator.ephemeralFilesToDelete) + } + + private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List) { + ephemeralFilesToDelete.forEach { + ephemeralTemporaryStore.delete(it) + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt index 00060a33b1..d47ca8fa68 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt @@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor( private val globalErrorReceiver: GlobalErrorReceiver, @SessionFilesDirectory private val fileDirectory: File, - private val syncResponseParser: InitialSyncResponseParser + private val syncResponseParser: InitialSyncResponseParser, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore ) : SyncTask { private val workingDir = File(fileDirectory, "is") @@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor( if (isInitialSync) { Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}") val initSyncStrategy = initialSyncStrategy - var syncResp: SyncResponse? = null logDuration("INIT_SYNC strategy: $initSyncStrategy") { if (initSyncStrategy is InitialSyncStrategy.Optimized) { + roomSyncEphemeralTemporaryStore.reset() + workingDir.mkdirs() val file = downloadInitSyncResponse(requestParams) - syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { + reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { handleSyncFile(file, initSyncStrategy) } + // Delete all files + workingDir.deleteRecursively() } else { val syncResponse = logDuration("INIT_SYNC Request") { executeRequest(globalErrorReceiver) { @@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor( } } initialSyncProgressService.endAll() - - if (initSyncStrategy is InitialSyncStrategy.Optimized) { - logDuration("INIT_SYNC Handle ephemeral") { - syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!) - } - initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) - // Delete all files - workingDir.deleteRecursively() - } } else { val syncResponse = executeRequest(globalErrorReceiver) { apiCall = syncAPI.sync( @@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor( } private suspend fun downloadInitSyncResponse(requestParams: Map): File { - workingDir.mkdirs() val workingFile = File(workingDir, "initSync.json") val status = initialSyncStatusRepository.getStep() if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) { @@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor( } } - private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse { - return logDuration("INIT_SYNC handleSyncFile()") { + private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) { + logDuration("INIT_SYNC handleSyncFile()") { val syncResponse = logDuration("INIT_SYNC Read file and parse") { syncResponseParser.parse(initSyncStrategy, workingFile) } @@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor( logDuration("INIT_SYNC Database insertion") { syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService) } - syncResponse + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt index 938168b5f4..83006c646b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt @@ -16,28 +16,10 @@ package org.matrix.android.sdk.internal.session.sync.model -import com.squareup.moshi.JsonAdapter import com.squareup.moshi.JsonClass -import com.squareup.moshi.JsonReader -import okio.buffer -import okio.source -import java.io.File @JsonClass(generateAdapter = false) internal sealed class LazyRoomSyncEphemeral { data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral() - data class Stored(val roomSyncEphemeralAdapter: JsonAdapter, val file: File) : LazyRoomSyncEphemeral() - - val roomSyncEphemeral: RoomSyncEphemeral - get() { - return when (this) { - is Parsed -> _roomSyncEphemeral - is Stored -> { - // Parse the file now - file.inputStream().use { pos -> - roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!! - } - } - } - } + object Stored : LazyRoomSyncEphemeral() } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt index ef56802a66..22ac4f911d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt @@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader import com.squareup.moshi.JsonWriter import com.squareup.moshi.ToJson import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral import timber.log.Timber -import java.io.File -import java.util.concurrent.atomic.AtomicInteger internal class DefaultLazyRoomSyncEphemeralJsonAdapter { @@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter { } } -internal class SplitLazyRoomSyncJsonAdapter( - private val workingDirectory: File, +internal class SplitLazyRoomSyncEphemeralJsonAdapter( + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore, private val syncStrategy: InitialSyncStrategy.Optimized ) { - private val atomicInteger = AtomicInteger(0) - - private fun createFile(): File { - val index = atomicInteger.getAndIncrement() - return File(workingDirectory, "room_$index.json") - } - @FromJson fun fromJson(reader: JsonReader, delegate: JsonAdapter): LazyRoomSyncEphemeral? { val path = reader.path + val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral") + val json = reader.nextSource().inputStream().bufferedReader().use { it.readText() } @@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter( return if (json.length > limit) { Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file") // Copy the source to a file - val file = createFile() - file.writeText(json) - LazyRoomSyncEphemeral.Stored(delegate, file) + roomSyncEphemeralTemporaryStore.write(roomId, json) + LazyRoomSyncEphemeral.Stored } else { Timber.v("INIT_SYNC $path content length: ${json.length} parse it now") val roomSync = delegate.fromJson(json) ?: return null diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt index ae7b2a4468..bfa9974b77 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt @@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi import okio.buffer import okio.source import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.SyncResponse import timber.log.Timber import java.io.File import javax.inject.Inject -internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) { +internal class InitialSyncResponseParser @Inject constructor( + private val moshi: Moshi, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore +) { fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse { val syncResponseLength = workingFile.length().toInt() Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes") val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit Timber.v("INIT_SYNC should split in several files: $shouldSplit") - return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit) + return getMoshi(syncStrategy, shouldSplit) .adapter(SyncResponse::class.java) .fromJson(workingFile.source().buffer())!! } - private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi { + private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi { // If we don't have to split we'll rely on the already default moshi if (!shouldSplit) return moshi // Otherwise, we create a new adapter for handling Map of Lazy sync return moshi.newBuilder() - .add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy)) + .add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy)) .build() } }