From c938795576b098a4288a1f4025e59a689ed64701 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Fri, 22 Jan 2021 17:17:42 +0100 Subject: [PATCH] Read init sync to a file and split into smaller files to handle it --- CHANGES.md | 1 + ...ventInsertType.java => EventInsertType.kt} | 4 +- .../android/sdk/internal/di/MoshiProvider.kt | 2 + .../notification/ProcessEventForPushTask.kt | 4 +- .../sync/InitialSyncStatusRepository.kt | 111 +++++++++++++ .../session/sync/InitialSyncStrategy.kt | 53 ++++++ .../internal/session/sync/RoomSyncHandler.kt | 70 ++++++-- .../sdk/internal/session/sync/SyncAPI.kt | 13 ++ .../sdk/internal/session/sync/SyncTask.kt | 152 +++++++++++++++++- .../session/sync/model/LazyRoomSync.kt | 44 +++++ .../sync/model/LazyRoomSyncJsonAdapter.kt | 90 +++++++++++ .../session/sync/model/RoomsSyncResponse.kt | 2 +- .../android/sdk/internal/util/LogUtil.kt | 29 ++++ .../src/main/res/values/strings.xml | 2 + .../vector/app/features/home/HomeActivity.kt | 18 +++ vector/src/main/res/menu/home.xml | 13 +- 16 files changed, 583 insertions(+), 25 deletions(-) rename matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/{EventInsertType.java => EventInsertType.kt} (88%) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStatusRepository.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSync.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncJsonAdapter.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/util/LogUtil.kt diff --git a/CHANGES.md b/CHANGES.md index 46057a37ec..930068dd6c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -90,6 +90,7 @@ Improvements 🙌: - SSO support for cross signing (#1062) - Deactivate account when logged in with SSO (#1264) - SSO UIA doesn't work (#2754) + - Improve initial sync performance (#983) Bugfix 🐛: - Fix clear cache issue: sometimes, after a clear cache, there is still a token, so the init sync service is not started. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.java b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.kt similarity index 88% rename from matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.java rename to matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.kt index 05153c5734..463ccb2f46 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.java +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/model/EventInsertType.kt @@ -14,9 +14,9 @@ * limitations under the License. */ -package org.matrix.android.sdk.internal.database.model; +package org.matrix.android.sdk.internal.database.model -public enum EventInsertType { +internal enum class EventInsertType { INITIAL_SYNC, INCREMENTAL_SYNC, PAGINATION, diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/di/MoshiProvider.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/di/MoshiProvider.kt index 48fa41b350..c0595a9273 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/di/MoshiProvider.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/di/MoshiProvider.kt @@ -36,6 +36,7 @@ import org.matrix.android.sdk.internal.network.parsing.ForceToBooleanJsonAdapter import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory import org.matrix.android.sdk.internal.network.parsing.TlsVersionMoshiAdapter import org.matrix.android.sdk.internal.network.parsing.UriMoshiAdapter +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncJsonAdapter object MoshiProvider { @@ -44,6 +45,7 @@ object MoshiProvider { .add(ForceToBooleanJsonAdapter()) .add(CipherSuiteMoshiAdapter()) .add(TlsVersionMoshiAdapter()) + .add(LazyRoomSyncJsonAdapter()) .add(RuntimeJsonAdapterFactory.of(MessageContent::class.java, "msgtype", MessageDefaultContent::class.java) .registerSubtype(MessageTextContent::class.java, MessageType.MSGTYPE_TEXT) .registerSubtype(MessageNoticeContent::class.java, MessageType.MSGTYPE_NOTICE) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/notification/ProcessEventForPushTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/notification/ProcessEventForPushTask.kt index 7763251a01..4754265c49 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/notification/ProcessEventForPushTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/notification/ProcessEventForPushTask.kt @@ -50,7 +50,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor( } val newJoinEvents = params.syncResponse.join .mapNotNull { (key, value) -> - value.timeline?.events?.map { it.copy(roomId = key) } + value.roomSync.timeline?.events?.map { it.copy(roomId = key) } } .flatten() val inviteEvents = params.syncResponse.invite @@ -80,7 +80,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor( val allRedactedEvents = params.syncResponse.join .asSequence() - .mapNotNull { (_, value) -> value.timeline?.events } + .mapNotNull { (_, value) -> value.roomSync.timeline?.events } .flatten() .filter { it.type == EventType.REDACTION } .mapNotNull { it.redacts } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStatusRepository.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStatusRepository.kt new file mode 100644 index 0000000000..4b82ecc3e5 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStatusRepository.kt @@ -0,0 +1,111 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +import com.squareup.moshi.JsonClass +import okio.buffer +import okio.source +import org.matrix.android.sdk.internal.di.MoshiProvider +import timber.log.Timber +import java.io.File + +@JsonClass(generateAdapter = true) +internal data class InitialSyncStatus( + val step: Int = STEP_INIT, + val downloadedDate: Long = 0 +) { + companion object { + const val STEP_INIT = 0 + const val STEP_DOWNLOADING = 1 + const val STEP_DOWNLOADED = 2 + const val STEP_PARSED = 3 + const val STEP_SUCCESS = 4 + } +} + +internal interface InitialSyncStatusRepository { + fun getStep(): Int + + fun setStep(step: Int) +} + +/** + * This class handle the current status of an initial sync and persist it on the disk, to be robust against crash + */ +internal class FileInitialSyncStatusRepository(directory: File) : InitialSyncStatusRepository { + + companion object { + // After 2 hours, we consider that the downloaded file is outdated: + // - if a problem occurs, it's for big accounts, and big accounts have lots of new events in 2 hours + // - For small accounts, there should be no problem, so 2 hours delay will never be used. + private const val INIT_SYNC_FILE_LIFETIME = 2 * 60 * 60 * 1_000L + } + + private val file = File(directory, "status.json") + private val jsonAdapter = MoshiProvider.providesMoshi().adapter(InitialSyncStatus::class.java) + + private var cache: InitialSyncStatus? = null + + override fun getStep(): Int { + ensureCache() + val state = cache?.step ?: InitialSyncStatus.STEP_INIT + return if (state >= InitialSyncStatus.STEP_DOWNLOADED + && System.currentTimeMillis() > (cache?.downloadedDate ?: 0) + INIT_SYNC_FILE_LIFETIME) { + Timber.v("INIT_SYNC downloaded file is outdated, download it again") + // The downloaded file is outdated + setStep(InitialSyncStatus.STEP_INIT) + InitialSyncStatus.STEP_INIT + } else { + state + } + } + + override fun setStep(step: Int) { + var newStatus = cache?.copy(step = step) ?: InitialSyncStatus(step = step) + if (step == InitialSyncStatus.STEP_DOWNLOADED) { + // Also store the downloaded date + newStatus = newStatus.copy( + downloadedDate = System.currentTimeMillis() + ) + } + cache = newStatus + writeFile() + } + + private fun ensureCache() { + if (cache == null) readFile() + } + + /** + * File -> Cache + */ + private fun readFile() { + cache = file + .takeIf { it.exists() } + ?.let { jsonAdapter.fromJson(it.source().buffer()) } + } + + /** + * Cache -> File + */ + private fun writeFile() { + file.delete() + cache + ?.let { jsonAdapter.toJson(it) } + ?.let { file.writeText(it) } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt new file mode 100644 index 0000000000..fca92870ca --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +var initialSyncStrategy: InitialSyncStrategy = InitialSyncStrategy.Optimized() + +sealed class InitialSyncStrategy { + /** + * Parse the result in its entirety + * Pros: + * - Faster to handle parsed data + * Cons: + * - Slower to download and parse data + * - big RAM usage + * - not robust to crash + */ + object Legacy : InitialSyncStrategy() + + /** + * Optimized. + * First store the request result in a file, to avoid doing it again in case of crash + */ + data class Optimized( + /** + * Limit to reach to decide to split the init sync response into smaller files + * Empiric value: 1 megabytes + */ + val minSizeToSplit: Long = 1024 * 1024, + /** + * Limit per room to reach to decide to store a join room into a file + * Empiric value: 10 kilobytes + */ + val minSizeToStoreInFile: Long = 10 * 1024, + /** + * Max number of rooms to insert at a time in database (to avoid too much RAM usage) + */ + val maxRoomsToInsert: Int = 100 + ) : InitialSyncStrategy() +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index 6d1b3ae034..979c2888d3 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -51,6 +51,7 @@ import org.matrix.android.sdk.internal.di.UserId import org.matrix.android.sdk.internal.extensions.clearWith import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService import org.matrix.android.sdk.internal.session.mapWithProgress +import org.matrix.android.sdk.internal.session.reportSubtask import org.matrix.android.sdk.internal.session.room.membership.RoomChangeMembershipStateDataSource import org.matrix.android.sdk.internal.session.room.membership.RoomMemberEventHandler import org.matrix.android.sdk.internal.session.room.read.FullyReadContent @@ -59,12 +60,14 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync import org.matrix.android.sdk.internal.session.sync.model.RoomSync import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse import timber.log.Timber import javax.inject.Inject +import kotlin.math.ceil internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler, private val roomSummaryUpdater: RoomSummaryUpdater, @@ -78,7 +81,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private val timelineInput: TimelineInput) { sealed class HandlingStrategy { - data class JOINED(val data: Map) : HandlingStrategy() + data class JOINED(val data: Map) : HandlingStrategy() data class INVITED(val data: Map) : HandlingStrategy() data class LEFT(val data: Map) : HandlingStrategy() } @@ -105,10 +108,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } val syncLocalTimeStampMillis = System.currentTimeMillis() val rooms = when (handlingStrategy) { - is HandlingStrategy.JOINED -> - handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, isInitialSync, insertType, syncLocalTimeStampMillis) + is HandlingStrategy.JOINED -> { + if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) { + insertJoinRooms(realm, handlingStrategy, insertType, syncLocalTimeStampMillis, reporter) + // Rooms are already inserted, return an empty list + emptyList() + } else { + handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) { + handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis) + } } + } is HandlingStrategy.INVITED -> handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) { handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis) @@ -123,17 +133,57 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm.insertOrUpdate(rooms) } + private fun insertJoinRooms(realm: Realm, + handlingStrategy: HandlingStrategy.JOINED, + insertType: EventInsertType, + syncLocalTimeStampMillis: Long, + reporter: DefaultInitialSyncProgressService?) { + val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE + val listSize = handlingStrategy.data.keys.size + val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt() + + if (numberOfChunks > 1) { + reportSubtask(reporter, R.string.initial_sync_start_importing_account_joined_rooms, numberOfChunks, 0.6f) { + val chunkSize = listSize / numberOfChunks + Timber.v("INIT_SYNC $listSize rooms to insert, split into $numberOfChunks sublists of $chunkSize items") + // I cannot find a better way to chunk a map, so chunk the keys and then create new maps + handlingStrategy.data.keys + .chunked(chunkSize) + .forEachIndexed { index, roomIds -> + val roomEntities = roomIds + .also { Timber.v("INIT_SYNC insert ${roomIds.size} rooms") } + .map { + handleJoinedRoom( + realm, + it, + (handlingStrategy.data[it] ?: error("Should not happen")).roomSync, + insertType, + syncLocalTimeStampMillis + ) + } + realm.insertOrUpdate(roomEntities) + reporter?.reportProgress(index + 1) + } + } + } else { + // No need to split + val rooms = handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) { + handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis) + } + realm.insertOrUpdate(rooms) + } + } + private fun handleJoinedRoom(realm: Realm, roomId: String, roomSync: RoomSync, - isInitialSync: Boolean, insertType: EventInsertType, syncLocalTimestampMillis: Long): RoomEntity { Timber.v("Handle join sync for room $roomId") var ephemeralResult: EphemeralResult? = null if (roomSync.ephemeral?.events?.isNotEmpty() == true) { - ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync) + ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, insertType == EventInsertType.INITIAL_SYNC) } if (roomSync.accountData?.events?.isNotEmpty() == true) { @@ -173,8 +223,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle roomSync.timeline.prevToken, roomSync.timeline.limited, insertType, - syncLocalTimestampMillis, - isInitialSync + syncLocalTimestampMillis ) roomEntity.addIfNecessary(chunkEntity) } @@ -278,8 +327,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle prevToken: String? = null, isLimited: Boolean = true, insertType: EventInsertType, - syncLocalTimestampMillis: Long, - isInitialSync: Boolean): ChunkEntity { + syncLocalTimestampMillis: Long): ChunkEntity { val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId) val chunkEntity = if (!isLimited && lastChunk != null) { lastChunk @@ -299,7 +347,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } eventIds.add(event.eventId) - if (event.isEncrypted() && !isInitialSync) { + if (event.isEncrypted() && insertType != EventInsertType.INITIAL_SYNC) { decryptIfNeeded(event, roomId) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncAPI.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncAPI.kt index 77289f04b4..8e3523bc57 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncAPI.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncAPI.kt @@ -16,6 +16,7 @@ package org.matrix.android.sdk.internal.session.sync +import okhttp3.ResponseBody import org.matrix.android.sdk.internal.network.NetworkConstants import org.matrix.android.sdk.internal.network.TimeOutInterceptor import org.matrix.android.sdk.internal.session.sync.model.SyncResponse @@ -23,6 +24,7 @@ import retrofit2.Call import retrofit2.http.GET import retrofit2.http.Header import retrofit2.http.QueryMap +import retrofit2.http.Streaming internal interface SyncAPI { /** @@ -34,4 +36,15 @@ internal interface SyncAPI { @Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT, @Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT ): Call + + /** + * Set all the timeouts to 1 minute by default + */ + @Streaming + @GET(NetworkConstants.URI_API_PREFIX_PATH_R0 + "sync") + fun syncStream(@QueryMap params: Map, + @Header(TimeOutInterceptor.CONNECT_TIMEOUT) connectTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT, + @Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT, + @Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT + ): Call } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt index bfe3799771..ef766f6d42 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt @@ -16,18 +16,32 @@ package org.matrix.android.sdk.internal.session.sync +import okhttp3.ResponseBody +import okio.buffer +import okio.source import org.matrix.android.sdk.R +import org.matrix.android.sdk.internal.di.MoshiProvider +import org.matrix.android.sdk.internal.di.SessionFilesDirectory import org.matrix.android.sdk.internal.di.UserId import org.matrix.android.sdk.internal.network.GlobalErrorReceiver import org.matrix.android.sdk.internal.network.TimeOutInterceptor import org.matrix.android.sdk.internal.network.executeRequest +import org.matrix.android.sdk.internal.network.toFailure import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService import org.matrix.android.sdk.internal.session.filter.FilterRepository import org.matrix.android.sdk.internal.session.homeserver.GetHomeServerCapabilitiesTask +import org.matrix.android.sdk.internal.session.reportSubtask +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncJsonAdapter import org.matrix.android.sdk.internal.session.sync.model.SyncResponse import org.matrix.android.sdk.internal.session.user.UserStore import org.matrix.android.sdk.internal.task.Task +import org.matrix.android.sdk.internal.util.logDuration +import retrofit2.Response +import retrofit2.awaitResponse import timber.log.Timber +import java.io.File +import java.net.SocketTimeoutException import javax.inject.Inject internal interface SyncTask : Task { @@ -48,9 +62,14 @@ internal class DefaultSyncTask @Inject constructor( private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask, private val userStore: UserStore, private val syncTaskSequencer: SyncTaskSequencer, - private val globalErrorReceiver: GlobalErrorReceiver + private val globalErrorReceiver: GlobalErrorReceiver, + @SessionFilesDirectory + private val fileDirectory: File ) : SyncTask { + private val workingDir = File(fileDirectory, "is") + private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir) + override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post { doSync(params) } @@ -81,20 +100,137 @@ internal class DefaultSyncTask @Inject constructor( val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT) - val syncResponse = executeRequest(globalErrorReceiver) { - apiCall = syncAPI.sync( - params = requestParams, - readTimeOut = readTimeOut - ) - } - syncResponseHandler.handleResponse(syncResponse, token) if (isInitialSync) { + logDuration("INIT_SYNC strategy: $initialSyncStrategy") { + if (initialSyncStrategy is InitialSyncStrategy.Optimized) { + safeInitialSync(requestParams) + } else { + val syncResponse = logDuration("INIT_SYNC Request") { + executeRequest(globalErrorReceiver) { + apiCall = syncAPI.sync( + params = requestParams, + readTimeOut = readTimeOut + ) + } + } + + logDuration("INIT_SYNC Database insertion") { + syncResponseHandler.handleResponse(syncResponse, token) + } + } + } initialSyncProgressService.endAll() + } else { + val syncResponse = executeRequest(globalErrorReceiver) { + apiCall = syncAPI.sync( + params = requestParams, + readTimeOut = readTimeOut + ) + } + syncResponseHandler.handleResponse(syncResponse, token) } Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}") } + private suspend fun safeInitialSync(requestParams: Map) { + workingDir.mkdirs() + val workingFile = File(workingDir, "initSync.json") + val status = initialSyncStatusRepository.getStep() + if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) { + // Go directly to the parse step + Timber.v("INIT_SYNC file is already here") + } else { + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADING) + val syncResponse = logDuration("INIT_SYNC Perform server request") { + reportSubtask(initialSyncProgressService, R.string.initial_sync_start_server_computing, 0, 0.5f) { + getSyncResponse(requestParams, MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT) + } + } + + if (syncResponse.isSuccessful) { + logDuration("INIT_SYNC Download and save to file") { + reportSubtask(initialSyncProgressService, R.string.initial_sync_start_downloading, 0, 0.5f) { + syncResponse.body()?.byteStream()?.use { inputStream -> + workingFile.outputStream().use { outputStream -> + inputStream.copyTo(outputStream) + } + } + } + } + } else { + throw syncResponse.toFailure(globalErrorReceiver) + .also { Timber.w("INIT_SYNC request failure: $this") } + } + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADED) + } + handleSyncFile(workingFile) + + // Delete all files + workingDir.deleteRecursively() + } + + private suspend fun getSyncResponse(requestParams: Map, maxNumberOfRetries: Int): Response { + var retry = maxNumberOfRetries + while (true) { + retry-- + try { + return syncAPI.syncStream( + params = requestParams + ).awaitResponse() + } catch (throwable: Throwable) { + if (throwable is SocketTimeoutException && retry > 0) { + Timber.w("INIT_SYNC timeout retry left: $retry") + } else { + Timber.e(throwable, "INIT_SYNC timeout, no retry left, or other error") + throw throwable + } + } + } + } + + private suspend fun handleSyncFile(workingFile: File) { + val syncResponseLength = workingFile.length().toInt() + + logDuration("INIT_SYNC handleSyncFile() file size $syncResponseLength bytes") { + if (syncResponseLength < (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.minSizeToSplit ?: Long.MAX_VALUE) { + // OK, no need to split just handle as a regular sync response + Timber.v("INIT_SYNC no need to split") + handleInitialSyncFile(workingFile) + } else { + Timber.v("INIT_SYNC Split into several smaller files") + // Set file mode + // TODO This is really ugly, I should improve that + LazyRoomSyncJsonAdapter.initWith(workingFile) + + handleInitialSyncFile(workingFile) + + // Reset file mode + LazyRoomSyncJsonAdapter.reset() + } + } + } + + private suspend fun handleInitialSyncFile(workingFile: File) { + val syncResponse = logDuration("INIT_SYNC Read file and parse") { + MoshiProvider.providesMoshi().adapter(SyncResponse::class.java) + .fromJson(workingFile.source().buffer())!! + } + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_PARSED) + + // Log some stats + val nbOfJoinedRooms = syncResponse.rooms?.join?.size ?: 0 + val nbOfJoinedRoomsInFile = syncResponse.rooms?.join?.values?.count { it is LazyRoomSync.Stored } + Timber.v("INIT_SYNC $nbOfJoinedRooms rooms, $nbOfJoinedRoomsInFile stored into files") + + logDuration("INIT_SYNC Database insertion") { + syncResponseHandler.handleResponse(syncResponse, null) + } + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) + } + companion object { + private const val MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT = 50 + private const val TIMEOUT_MARGIN: Long = 10_000 } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSync.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSync.kt new file mode 100644 index 0000000000..e7e0dcb8b9 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSync.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync.model + +import com.squareup.moshi.JsonClass +import com.squareup.moshi.JsonReader +import okio.buffer +import okio.source +import org.matrix.android.sdk.internal.di.MoshiProvider +import java.io.File + +@JsonClass(generateAdapter = false) +internal sealed class LazyRoomSync { + data class Parsed(val _roomSync: RoomSync) : LazyRoomSync() + data class Stored(val file: File) : LazyRoomSync() + + val roomSync: RoomSync + get() { + return when (this) { + is Parsed -> _roomSync + is Stored -> { + // Parse the file now + file.inputStream().use { pos -> + MoshiProvider.providesMoshi().adapter(RoomSync::class.java) + .fromJson(JsonReader.of(pos.source().buffer()))!! + } + } + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncJsonAdapter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncJsonAdapter.kt new file mode 100644 index 0000000000..edf76d2e77 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncJsonAdapter.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync.model + +import com.squareup.moshi.FromJson +import com.squareup.moshi.JsonAdapter +import com.squareup.moshi.JsonReader +import com.squareup.moshi.JsonWriter +import com.squareup.moshi.ToJson +import org.matrix.android.sdk.internal.di.MoshiProvider +import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.initialSyncStrategy +import timber.log.Timber +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +internal class LazyRoomSyncJsonAdapter : JsonAdapter() { + + @FromJson + override fun fromJson(reader: JsonReader): LazyRoomSync { + return if (workingDirectory != null) { + val path = reader.path + // val roomId = reader.path.substringAfter("\$.rooms.join.") + + // inputStream.available() return 0... So read it to a String then decide to store in a file or to parse it now + val json = reader.nextSource().inputStream().bufferedReader().use { + it.readText() + } + + val limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.minSizeToStoreInFile ?: Long.MAX_VALUE + if (json.length > limit) { + Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file") + // Copy the source to a file + val file = createFile() + file.writeText(json) + LazyRoomSync.Stored(file) + } else { + Timber.v("INIT_SYNC $path content length: ${json.length} parse it now") + // Parse it now + val roomSync = MoshiProvider.providesMoshi().adapter(RoomSync::class.java).fromJson(json)!! + LazyRoomSync.Parsed(roomSync) + } + } else { + // Parse it now + val roomSync = MoshiProvider.providesMoshi().adapter(RoomSync::class.java).fromJson(reader)!! + LazyRoomSync.Parsed(roomSync) + } + } + + @ToJson + override fun toJson(writer: JsonWriter, value: LazyRoomSync?) { + // This Adapter is not supposed to serialize object + throw UnsupportedOperationException() + } + + companion object { + fun initWith(file: File) { + workingDirectory = file.parentFile + atomicInteger.set(0) + } + + fun reset() { + workingDirectory = null + } + + private fun createFile(): File { + val parent = workingDirectory ?: error("workingDirectory is not initialized") + val index = atomicInteger.getAndIncrement() + + return File(parent, "room_$index.json") + } + + private var workingDirectory: File? = null + private val atomicInteger = AtomicInteger(0) + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/RoomsSyncResponse.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/RoomsSyncResponse.kt index dd2f96c988..a62d80a088 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/RoomsSyncResponse.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/RoomsSyncResponse.kt @@ -24,7 +24,7 @@ internal data class RoomsSyncResponse( /** * Joined rooms: keys are rooms ids. */ - @Json(name = "join") val join: Map = emptyMap(), + @Json(name = "join") val join: Map = emptyMap(), /** * Invitations. The rooms that the user has been invited to: keys are rooms ids. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/util/LogUtil.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/util/LogUtil.kt new file mode 100644 index 0000000000..1a4f42a533 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/util/LogUtil.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.util + +import timber.log.Timber + +internal suspend fun logDuration(message: String, + block: suspend () -> T): T { + Timber.v("$message -- BEGIN") + val start = System.currentTimeMillis() + val result = block() + val duration = System.currentTimeMillis() - start + Timber.v("$message -- END duration: $duration ms") + return result +} diff --git a/matrix-sdk-android/src/main/res/values/strings.xml b/matrix-sdk-android/src/main/res/values/strings.xml index 26b9bc19d9..b93d9b680e 100644 --- a/matrix-sdk-android/src/main/res/values/strings.xml +++ b/matrix-sdk-android/src/main/res/values/strings.xml @@ -199,6 +199,8 @@ Empty room Empty room (was %s) + Initial Sync:\nWaiting for server response… + Initial Sync:\nDownloading data… Initial Sync:\nImporting account… Initial Sync:\nImporting crypto Initial Sync:\nImporting Rooms diff --git a/vector/src/main/java/im/vector/app/features/home/HomeActivity.kt b/vector/src/main/java/im/vector/app/features/home/HomeActivity.kt index 6a381ec049..2d884f1ba7 100644 --- a/vector/src/main/java/im/vector/app/features/home/HomeActivity.kt +++ b/vector/src/main/java/im/vector/app/features/home/HomeActivity.kt @@ -40,6 +40,8 @@ import im.vector.app.core.platform.ToolbarConfigurable import im.vector.app.core.platform.VectorBaseActivity import im.vector.app.core.pushers.PushersManager import im.vector.app.databinding.ActivityHomeBinding +import im.vector.app.features.MainActivity +import im.vector.app.features.MainActivityArgs import im.vector.app.features.disclaimer.showDisclaimerDialog import im.vector.app.features.matrixto.MatrixToBottomSheet import im.vector.app.features.notifications.NotificationDrawerManager @@ -60,6 +62,8 @@ import kotlinx.parcelize.Parcelize import org.matrix.android.sdk.api.session.InitialSyncProgressService import org.matrix.android.sdk.api.session.permalinks.PermalinkService import org.matrix.android.sdk.api.util.MatrixItem +import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.initialSyncStrategy import timber.log.Timber import javax.inject.Inject @@ -368,6 +372,20 @@ class HomeActivity : bugReporter.openBugReportScreen(this, false) return true } + R.id.menu_home_init_sync_legacy -> { + // Configure the SDK + initialSyncStrategy = InitialSyncStrategy.Legacy + // And clear cache + MainActivity.restartApp(this, MainActivityArgs(clearCache = true)) + return true + } + R.id.menu_home_init_sync_optimized -> { + // Configure the SDK + initialSyncStrategy = InitialSyncStrategy.Optimized() + // And clear cache + MainActivity.restartApp(this, MainActivityArgs(clearCache = true)) + return true + } R.id.menu_home_filter -> { navigator.openRoomsFiltering(this) return true diff --git a/vector/src/main/res/menu/home.xml b/vector/src/main/res/menu/home.xml index 7a77c45240..66cbf53bfd 100644 --- a/vector/src/main/res/menu/home.xml +++ b/vector/src/main/res/menu/home.xml @@ -1,6 +1,7 @@ + xmlns:app="http://schemas.android.com/apk/res-auto" + xmlns:tools="http://schemas.android.com/tools"> + + + +