From 8b7c8e33519f22e287be3c251edbffb9159b6f9c Mon Sep 17 00:00:00 2001 From: Maxime NATUREL Date: Wed, 14 Dec 2022 11:26:17 +0100 Subject: [PATCH] Task to ensure aggregation of all poll responses when receiving ending poll event --- .../sdk/api/session/events/model/Event.kt | 8 +- .../sdk/internal/session/room/RoomModule.kt | 5 + .../poll/DefaultPollAggregationProcessor.kt | 29 +++- .../poll/FetchPollResponseEventsTask.kt | 130 ++++++++++++++++++ 4 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/events/model/Event.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/events/model/Event.kt index 40ce6ecb5c..9b5f4ac19f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/events/model/Event.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/events/model/Event.kt @@ -388,7 +388,13 @@ fun Event.isLocationMessage(): Boolean { } } -fun Event.isPoll(): Boolean = getClearType() in EventType.POLL_START.values || getClearType() in EventType.POLL_END.values +fun Event.isPoll(): Boolean = isPollStart() || isPollEnd() + +fun Event.isPollStart(): Boolean = getClearType() in EventType.POLL_START.values + +fun Event.isPollResponse(): Boolean = getClearType() in EventType.POLL_RESPONSE.values + +fun Event.isPollEnd(): Boolean = getClearType() in EventType.POLL_END.values fun Event.isSticker(): Boolean = getClearType() == EventType.STICKER diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt index 1475b67276..c28d24995f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/RoomModule.kt @@ -99,6 +99,8 @@ import org.matrix.android.sdk.internal.session.room.relation.DefaultUpdateQuickR import org.matrix.android.sdk.internal.session.room.relation.FetchEditHistoryTask import org.matrix.android.sdk.internal.session.room.relation.FindReactionEventForUndoTask import org.matrix.android.sdk.internal.session.room.relation.UpdateQuickReactionTask +import org.matrix.android.sdk.internal.session.room.relation.poll.DefaultFetchPollResponseEventsTask +import org.matrix.android.sdk.internal.session.room.relation.poll.FetchPollResponseEventsTask import org.matrix.android.sdk.internal.session.room.relation.threads.DefaultFetchThreadSummariesTask import org.matrix.android.sdk.internal.session.room.relation.threads.DefaultFetchThreadTimelineTask import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadSummariesTask @@ -354,4 +356,7 @@ internal abstract class RoomModule { @Binds abstract fun bindRedactLiveLocationShareTask(task: DefaultRedactLiveLocationShareTask): RedactLiveLocationShareTask + + @Binds + abstract fun bindFetchPollResponseEventsTask(task: DefaultFetchPollResponseEventsTask): FetchPollResponseEventsTask } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/aggregation/poll/DefaultPollAggregationProcessor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/aggregation/poll/DefaultPollAggregationProcessor.kt index 455ccabbc6..163d8e5e81 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/aggregation/poll/DefaultPollAggregationProcessor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/aggregation/poll/DefaultPollAggregationProcessor.kt @@ -17,6 +17,7 @@ package org.matrix.android.sdk.internal.session.room.aggregation.poll import io.realm.Realm +import kotlinx.coroutines.launch import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.session.Session import org.matrix.android.sdk.api.session.events.model.Event @@ -40,9 +41,14 @@ import org.matrix.android.sdk.internal.database.model.PollResponseAggregatedSumm import org.matrix.android.sdk.internal.database.query.create import org.matrix.android.sdk.internal.database.query.getOrCreate import org.matrix.android.sdk.internal.database.query.where +import org.matrix.android.sdk.internal.session.room.relation.poll.FetchPollResponseEventsTask +import org.matrix.android.sdk.internal.task.TaskExecutor import javax.inject.Inject -class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationProcessor { +internal class DefaultPollAggregationProcessor @Inject constructor( + private val taskExecutor: TaskExecutor, + private val fetchPollResponseEventsTask: FetchPollResponseEventsTask, +) : PollAggregationProcessor { override fun handlePollStartEvent(realm: Realm, event: Event): Boolean { val content = event.getClearContent()?.toModel() @@ -174,6 +180,10 @@ class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationPro aggregatedPollSummaryEntity.sourceEvents.add(event.eventId) } + if (!isLocalEcho) { + ensurePollIsFullyAggregated(roomId, pollEventId) + } + return true } @@ -200,4 +210,21 @@ class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationPro eventAnnotationsSummaryEntity.pollResponseSummary = it } } + + // TODO add unit tests + /** + * Check that all related votes to a given poll are all retrieved and aggregated. + */ + private fun ensurePollIsFullyAggregated( + roomId: String, + pollEventId: String + ) { + taskExecutor.executorScope.launch { + val params = FetchPollResponseEventsTask.Params( + roomId = roomId, + startPollEventId = pollEventId, + ) + fetchPollResponseEventsTask.execute(params) + } + } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt new file mode 100644 index 0000000000..781a0a4d55 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/poll/FetchPollResponseEventsTask.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.relation.poll + +import com.zhuinden.monarchy.Monarchy +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.events.model.RelationType +import org.matrix.android.sdk.api.session.events.model.isPollResponse +import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.EventDecryptor +import org.matrix.android.sdk.internal.database.mapper.toEntity +import org.matrix.android.sdk.internal.database.model.EventEntity +import org.matrix.android.sdk.internal.database.model.EventInsertType +import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore +import org.matrix.android.sdk.internal.database.query.where +import org.matrix.android.sdk.internal.di.SessionDatabase +import org.matrix.android.sdk.internal.network.GlobalErrorReceiver +import org.matrix.android.sdk.internal.network.executeRequest +import org.matrix.android.sdk.internal.session.room.RoomAPI +import org.matrix.android.sdk.internal.session.room.relation.RelationsResponse +import org.matrix.android.sdk.internal.task.Task +import org.matrix.android.sdk.internal.util.awaitTransaction +import org.matrix.android.sdk.internal.util.time.Clock +import javax.inject.Inject + +private const val FETCH_RELATED_EVENTS_LIMIT = 50 + +/** + * Task to fetch all the vote events to ensure full aggregation for a given poll. + */ +internal interface FetchPollResponseEventsTask : Task> { + data class Params( + val roomId: String, + val startPollEventId: String, + ) +} + +// TODO add unit tests +internal class DefaultFetchPollResponseEventsTask @Inject constructor( + private val roomAPI: RoomAPI, + private val globalErrorReceiver: GlobalErrorReceiver, + @SessionDatabase private val monarchy: Monarchy, + private val clock: Clock, + private val eventDecryptor: EventDecryptor, +) : FetchPollResponseEventsTask { + + override suspend fun execute(params: FetchPollResponseEventsTask.Params): Result = runCatching { + var nextBatch: String? = fetchAndProcessRelatedEventsFrom(params) + + while (nextBatch?.isNotEmpty() == true) { + nextBatch = fetchAndProcessRelatedEventsFrom(params, from = nextBatch) + } + } + + private suspend fun fetchAndProcessRelatedEventsFrom(params: FetchPollResponseEventsTask.Params, from: String? = null): String? { + val response = getRelatedEvents(params, from) + + val filteredEvents = response.chunks + .map { decryptEventIfNeeded(it) } + .filter { it.isPollResponse() } + + addMissingEventsInDB(params.roomId, filteredEvents) + + return response.nextBatch + } + + private suspend fun getRelatedEvents(params: FetchPollResponseEventsTask.Params, from: String? = null): RelationsResponse { + return executeRequest(globalErrorReceiver, canRetry = true) { + roomAPI.getRelations( + roomId = params.roomId, + eventId = params.startPollEventId, + relationType = RelationType.REFERENCE, + from = from, + limit = FETCH_RELATED_EVENTS_LIMIT, + ) + } + } + + private suspend fun addMissingEventsInDB(roomId: String, events: List) { + monarchy.awaitTransaction { realm -> + val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() } + val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId } + + events.filterNot { it.eventId in existingIds } + .map { + val ageLocalTs = clock.epochMillis() - (it.unsignedData?.age ?: 0) + it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = ageLocalTs) + } + .forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) } + } + } + + private suspend fun decryptEventIfNeeded(event: Event): Event { + // TODO move into a reusable task + if (event.isEncrypted()) { + tryOrNull(message = "Unable to decrypt the event") { + eventDecryptor.decryptEvent(event, "") + } + ?.let { result -> + event.mxDecryptionResult = OlmDecryptionResult( + payload = result.clearEvent, + senderKey = result.senderCurve25519Key, + keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) }, + forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain, + isSafe = result.isSafe + ) + } + } + + event.ageLocalTs = clock.epochMillis() - (event.unsignedData?.age ?: 0) + + return event + } +}