From 7ca532a5f6ccaedd8e916ab7820e51d3c9ee58b5 Mon Sep 17 00:00:00 2001 From: Maxime NATUREL <46314705+mnaturel@users.noreply.github.com> Date: Wed, 18 Jan 2023 15:24:09 +0100 Subject: [PATCH] Filter and store poll events --- .../sdk/internal/session/room/RoomModule.kt | 5 ++ .../room/event/FilterAndStoreEventsTask.kt | 80 +++++++++++++++++++ .../session/room/poll/LoadMorePollsTask.kt | 24 ++++-- .../poll/FetchPollResponseEventsTask.kt | 56 +++---------- 4 files changed, 114 insertions(+), 51 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/event/FilterAndStoreEventsTask.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt index 17e57aa2ee..56925d55d5 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt @@ -59,6 +59,8 @@ import org.matrix.android.sdk.internal.session.room.directory.DefaultSetRoomDire import org.matrix.android.sdk.internal.session.room.directory.GetPublicRoomTask import org.matrix.android.sdk.internal.session.room.directory.GetRoomDirectoryVisibilityTask import org.matrix.android.sdk.internal.session.room.directory.SetRoomDirectoryVisibilityTask +import org.matrix.android.sdk.internal.session.room.event.DefaultFilterAndStoreEventsTask +import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask import org.matrix.android.sdk.internal.session.room.location.CheckIfExistingActiveLiveTask import org.matrix.android.sdk.internal.session.room.location.DefaultCheckIfExistingActiveLiveTask import org.matrix.android.sdk.internal.session.room.location.DefaultGetActiveBeaconInfoForUserTask @@ -369,4 +371,7 @@ internal abstract class RoomModule { @Binds abstract fun bindGetLoadedPollsStatusTask(task: DefaultGetLoadedPollsStatusTask): GetLoadedPollsStatusTask + + @Binds + abstract fun bindFilterAndStoreEventsTask(task: DefaultFilterAndStoreEventsTask): FilterAndStoreEventsTask } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/event/FilterAndStoreEventsTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/event/FilterAndStoreEventsTask.kt new file mode 100644 index 0000000000..aa836c8491 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/event/FilterAndStoreEventsTask.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2023 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.event + +import com.zhuinden.monarchy.Monarchy +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.EventDecryptor +import org.matrix.android.sdk.internal.database.mapper.toEntity +import org.matrix.android.sdk.internal.database.model.EventEntity +import org.matrix.android.sdk.internal.database.model.EventInsertType +import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore +import org.matrix.android.sdk.internal.database.query.where +import org.matrix.android.sdk.internal.di.SessionDatabase +import org.matrix.android.sdk.internal.task.Task +import org.matrix.android.sdk.internal.util.awaitTransaction +import org.matrix.android.sdk.internal.util.time.Clock +import javax.inject.Inject + +internal interface FilterAndStoreEventsTask : Task { + data class Params( + val roomId: String, + val events: List, + val filterPredicate: (Event) -> Boolean, + ) +} + +internal class DefaultFilterAndStoreEventsTask @Inject constructor( + @SessionDatabase private val monarchy: Monarchy, + private val clock: Clock, + private val eventDecryptor: EventDecryptor, +) : FilterAndStoreEventsTask { + + override suspend fun execute(params: FilterAndStoreEventsTask.Params) { + val filteredEvents = params.events + .map { decryptEventIfNeeded(it) } + .filter { params.filterPredicate(it) } + + addMissingEventsInDB(params.roomId, filteredEvents) + } + + private suspend fun addMissingEventsInDB(roomId: String, events: List) { + monarchy.awaitTransaction { realm -> + val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() } + if (eventIdsToCheck.isNotEmpty()) { + val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId } + + events.filterNot { it.eventId in existingIds } + .map { it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = computeLocalTs(it)) } + .forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) } + } + } + } + + private suspend fun decryptEventIfNeeded(event: Event): Event { + if (event.isEncrypted()) { + eventDecryptor.decryptEventAndSaveResult(event, timeline = "") + } + + event.ageLocalTs = computeLocalTs(event) + + return event + } + + private fun computeLocalTs(event: Event) = clock.epochMillis() - (event.unsignedData?.age ?: 0) +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/poll/LoadMorePollsTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/poll/LoadMorePollsTask.kt index 03b6c31fec..7870897ace 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/poll/LoadMorePollsTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/poll/LoadMorePollsTask.kt @@ -17,6 +17,8 @@ package org.matrix.android.sdk.internal.session.room.poll import com.zhuinden.monarchy.Monarchy +import org.matrix.android.sdk.api.session.events.model.isPoll +import org.matrix.android.sdk.api.session.events.model.isPollResponse import org.matrix.android.sdk.api.session.room.poll.LoadedPollsStatus import org.matrix.android.sdk.internal.database.model.PollHistoryStatusEntity import org.matrix.android.sdk.internal.database.query.getOrCreate @@ -24,6 +26,7 @@ import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.network.GlobalErrorReceiver import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.session.room.RoomAPI +import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask import org.matrix.android.sdk.internal.session.room.poll.PollConstants.MILLISECONDS_PER_DAY import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection import org.matrix.android.sdk.internal.session.room.timeline.PaginationResponse @@ -44,6 +47,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor( @SessionDatabase private val monarchy: Monarchy, private val roomAPI: RoomAPI, private val globalErrorReceiver: GlobalErrorReceiver, + private val filterAndStoreEventsTask: FilterAndStoreEventsTask, ) : LoadMorePollsTask { override suspend fun execute(params: LoadMorePollsTask.Params): LoadedPollsStatus { @@ -53,9 +57,10 @@ internal class DefaultLoadMorePollsTask @Inject constructor( currentPollHistoryStatus = fetchMorePollEventsBackward(params, currentPollHistoryStatus) } // TODO - // unmock and check how it behaves when cancelling the process: it should resume where it was stopped + // check how it behaves when cancelling the process: it should resume where it was stopped // check the network calls done using Flipper // check forward of error in case of call api failure + // test on large room return LoadedPollsStatus( canLoadMore = currentPollHistoryStatus.isEndOfPollsBackward.not(), @@ -89,7 +94,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor( params: LoadMorePollsTask.Params, status: PollHistoryStatusEntity ): PollHistoryStatusEntity { - val chunk = executeRequest(globalErrorReceiver) { + val response = executeRequest(globalErrorReceiver) { roomAPI.getRoomMessagesFrom( roomId = params.roomId, from = status.tokenEndBackward, @@ -99,9 +104,18 @@ internal class DefaultLoadMorePollsTask @Inject constructor( ) } - // TODO decrypt events and filter in only polls to store them in local: see to mutualize with FetchPollResponseEventsTask + filterAndStorePollEvents(roomId = params.roomId, paginationResponse = response) - return updatePollHistoryStatus(roomId = params.roomId, paginationResponse = chunk) + return updatePollHistoryStatus(roomId = params.roomId, paginationResponse = response) + } + + private suspend fun filterAndStorePollEvents(roomId: String, paginationResponse: PaginationResponse) { + val filterTaskParams = FilterAndStoreEventsTask.Params( + roomId = roomId, + events = paginationResponse.events, + filterPredicate = { it.isPoll() || it.isPollResponse() } + ) + filterAndStoreEventsTask.execute(filterTaskParams) } private suspend fun updatePollHistoryStatus(roomId: String, paginationResponse: PaginationResponse): PollHistoryStatusEntity { @@ -124,7 +138,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor( // start of the timeline is reached, there are no more events status.isEndOfPollsBackward = true status.oldestTimestampReachedMs = oldestEventTimestamp - } else if(oldestEventTimestamp != null && currentTargetTimestamp != null && oldestEventTimestamp <= currentTargetTimestamp) { + } else if (oldestEventTimestamp != null && currentTargetTimestamp != null && oldestEventTimestamp <= currentTargetTimestamp) { // target has been reached status.oldestTimestampReachedMs = oldestEventTimestamp status.tokenEndBackward = paginationResponse.end diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt index e7dd8c57eb..347c9fbf12 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt @@ -17,25 +17,14 @@ package org.matrix.android.sdk.internal.session.room.relation.poll import androidx.annotation.VisibleForTesting -import com.zhuinden.monarchy.Monarchy -import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.RelationType import org.matrix.android.sdk.api.session.events.model.isPollResponse -import org.matrix.android.sdk.api.session.room.send.SendState -import org.matrix.android.sdk.internal.crypto.EventDecryptor -import org.matrix.android.sdk.internal.database.mapper.toEntity -import org.matrix.android.sdk.internal.database.model.EventEntity -import org.matrix.android.sdk.internal.database.model.EventInsertType -import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore -import org.matrix.android.sdk.internal.database.query.where -import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.network.GlobalErrorReceiver import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.session.room.RoomAPI +import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask import org.matrix.android.sdk.internal.session.room.relation.RelationsResponse import org.matrix.android.sdk.internal.task.Task -import org.matrix.android.sdk.internal.util.awaitTransaction -import org.matrix.android.sdk.internal.util.time.Clock import javax.inject.Inject @VisibleForTesting @@ -54,10 +43,9 @@ internal interface FetchPollResponseEventsTask : Task = runCatching { var nextBatch: String? = fetchAndProcessRelatedEventsFrom(params) @@ -70,11 +58,12 @@ internal class DefaultFetchPollResponseEventsTask @Inject constructor( private suspend fun fetchAndProcessRelatedEventsFrom(params: FetchPollResponseEventsTask.Params, from: String? = null): String? { val response = getRelatedEvents(params, from) - val filteredEvents = response.chunks - .map { decryptEventIfNeeded(it) } - .filter { it.isPollResponse() } - - addMissingEventsInDB(params.roomId, filteredEvents) + val filterTaskParams = FilterAndStoreEventsTask.Params( + roomId = params.roomId, + events = response.chunks, + filterPredicate = { it.isPollResponse() } + ) + filterAndStoreEventsTask.execute(filterTaskParams) return response.nextBatch } @@ -90,29 +79,4 @@ internal class DefaultFetchPollResponseEventsTask @Inject constructor( ) } } - - private suspend fun addMissingEventsInDB(roomId: String, events: List) { - monarchy.awaitTransaction { realm -> - val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() } - if (eventIdsToCheck.isNotEmpty()) { - val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId } - - events.filterNot { it.eventId in existingIds } - .map { it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = computeLocalTs(it)) } - .forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) } - } - } - } - - private suspend fun decryptEventIfNeeded(event: Event): Event { - if (event.isEncrypted()) { - eventDecryptor.decryptEventAndSaveResult(event, timeline = "") - } - - event.ageLocalTs = computeLocalTs(event) - - return event - } - - private fun computeLocalTs(event: Event) = clock.epochMillis() - (event.unsignedData?.age ?: 0) }