Merge pull request #1634 from vector-im/feature/db_clean_up

Feature/db clean up
This commit is contained in:
Benoit Marty 2020-07-07 11:59:28 +02:00 committed by GitHub
commit 9dc831d8e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 1103 additions and 1130 deletions

View File

@ -16,9 +16,20 @@
package im.vector.matrix.android.api.session.group
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
/**
* This interface defines methods to interact within a group.
*/
interface Group {
val groupId: String
/**
* This methods allows you to refresh data about this group. It will be reflected on the GroupSummary.
* The SDK also takes care of refreshing group data every hour.
* @param callback : the matrix callback to be notified of success or failure
* @return a Cancelable to be able to cancel requests.
*/
fun fetchGroupData(callback: MatrixCallback<Unit>): Cancelable
}

View File

@ -1,161 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.crypto.tasks
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.crypto.verification.VerificationService
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.api.session.room.model.message.MessageType
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationReadyContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationRequestContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationStartContent
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.crypto.verification.DefaultVerificationService
import im.vector.matrix.android.internal.di.DeviceId
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.task.Task
import timber.log.Timber
import java.util.ArrayList
import javax.inject.Inject
internal interface RoomVerificationUpdateTask : Task<RoomVerificationUpdateTask.Params, Unit> {
data class Params(
val events: List<Event>,
val verificationService: DefaultVerificationService,
val cryptoService: CryptoService
)
}
internal class DefaultRoomVerificationUpdateTask @Inject constructor(
@UserId private val userId: String,
@DeviceId private val deviceId: String?,
private val cryptoService: CryptoService) : RoomVerificationUpdateTask {
companion object {
// XXX what about multi-account?
private val transactionsHandledByOtherDevice = ArrayList<String>()
}
override suspend fun execute(params: RoomVerificationUpdateTask.Params) {
// TODO ignore initial sync or back pagination?
params.events.forEach { event ->
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
// If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
// the message should be ignored by the receiver.
if (!VerificationService.isValidRequest(event.ageLocalTs
?: event.originServerTs)) return@forEach Unit.also {
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated")
}
// decrypt if needed?
if (event.isEncrypted() && event.mxDecryptionResult == null) {
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
// for now decrypt sync
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.e("## SAS Failed to decrypt event: ${event.eventId}")
params.verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
}
}
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} type: ${event.getClearType()}")
// Relates to is not encrypted
val relatesToEventId = event.content.toModel<MessageRelationContent>()?.relatesTo?.eventId
if (event.senderId == userId) {
// If it's send from me, we need to keep track of Requests or Start
// done from another device of mine
if (EventType.MESSAGE == event.getClearType()) {
val msgType = event.getClearContent().toModel<MessageContent>()?.msgType
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == msgType) {
event.getClearContent().toModel<MessageVerificationRequestContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is requested from another device
Timber.v("## SAS Verification live observer: Transaction requested from other device tid:${event.eventId} ")
event.eventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
}
}
}
} else if (EventType.KEY_VERIFICATION_START == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationStartContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_READY == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationReadyContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_CANCEL == event.getClearType() || EventType.KEY_VERIFICATION_DONE == event.getClearType()) {
relatesToEventId?.let {
transactionsHandledByOtherDevice.remove(it)
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
Timber.v("## SAS Verification ignoring message sent by me: ${event.eventId} type: ${event.getClearType()}")
return@forEach
}
if (relatesToEventId != null && transactionsHandledByOtherDevice.contains(relatesToEventId)) {
// Ignore this event, it is directed to another of my devices
Timber.v("## SAS Verification live observer: Ignore Transaction handled by other device tid:$relatesToEventId ")
return@forEach
}
when (event.getClearType()) {
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_DONE -> {
params.verificationService.onRoomEvent(event)
}
EventType.MESSAGE -> {
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == event.getClearContent().toModel<MessageContent>()?.msgType) {
params.verificationService.onRoomRequestReceived(event)
}
}
}
}
}
}

View File

@ -1,73 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.crypto.verification
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.LocalEcho
import im.vector.matrix.android.internal.crypto.tasks.DefaultRoomVerificationUpdateTask
import im.vector.matrix.android.internal.crypto.tasks.RoomVerificationUpdateTask
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import javax.inject.Inject
internal class VerificationMessageLiveObserver @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
private val roomVerificationUpdateTask: DefaultRoomVerificationUpdateTask,
private val cryptoService: CryptoService,
private val verificationService: DefaultVerificationService,
private val taskExecutor: TaskExecutor
) : RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query {
EventEntity.whereTypes(it, listOf(
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_READY,
EventType.MESSAGE,
EventType.ENCRYPTED)
)
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
// Should we ignore when it's an initial sync?
val events = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.filterNot {
// ignore local echos
LocalEcho.isLocalEchoId(it.eventId ?: "")
}
.toList()
roomVerificationUpdateTask.configureWith(
RoomVerificationUpdateTask.Params(events, verificationService, cryptoService)
).executeBy(taskExecutor)
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.crypto.verification
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.crypto.verification.VerificationService
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.LocalEcho
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.api.session.room.model.message.MessageType
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationReadyContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationRequestContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationStartContent
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.di.DeviceId
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import java.util.ArrayList
import javax.inject.Inject
internal class VerificationMessageProcessor @Inject constructor(
private val cryptoService: CryptoService,
private val verificationService: DefaultVerificationService,
@UserId private val userId: String,
@DeviceId private val deviceId: String?
) : EventInsertLiveProcessor {
private val transactionsHandledByOtherDevice = ArrayList<String>()
private val allowedTypes = listOf(
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_READY,
EventType.MESSAGE,
EventType.ENCRYPTED
)
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
if (insertType != EventInsertType.INCREMENTAL_SYNC) {
return false
}
return allowedTypes.contains(eventType) && !LocalEcho.isLocalEchoId(eventId)
}
override suspend fun process(realm: Realm, event: Event) {
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
// If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
// the message should be ignored by the receiver.
if (!VerificationService.isValidRequest(event.ageLocalTs
?: event.originServerTs)) return Unit.also {
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated")
}
// decrypt if needed?
if (event.isEncrypted() && event.mxDecryptionResult == null) {
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
// for now decrypt sync
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.e("## SAS Failed to decrypt event: ${event.eventId}")
verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
}
}
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} type: ${event.getClearType()}")
// Relates to is not encrypted
val relatesToEventId = event.content.toModel<MessageRelationContent>()?.relatesTo?.eventId
if (event.senderId == userId) {
// If it's send from me, we need to keep track of Requests or Start
// done from another device of mine
if (EventType.MESSAGE == event.getClearType()) {
val msgType = event.getClearContent().toModel<MessageContent>()?.msgType
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == msgType) {
event.getClearContent().toModel<MessageVerificationRequestContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is requested from another device
Timber.v("## SAS Verification live observer: Transaction requested from other device tid:${event.eventId} ")
event.eventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
}
}
}
} else if (EventType.KEY_VERIFICATION_START == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationStartContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_READY == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationReadyContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_CANCEL == event.getClearType() || EventType.KEY_VERIFICATION_DONE == event.getClearType()) {
relatesToEventId?.let {
transactionsHandledByOtherDevice.remove(it)
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
Timber.v("## SAS Verification ignoring message sent by me: ${event.eventId} type: ${event.getClearType()}")
return
}
if (relatesToEventId != null && transactionsHandledByOtherDevice.contains(relatesToEventId)) {
// Ignore this event, it is directed to another of my devices
Timber.v("## SAS Verification live observer: Ignore Transaction handled by other device tid:$relatesToEventId ")
return
}
when (event.getClearType()) {
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_DONE -> {
verificationService.onRoomEvent(event)
}
EventType.MESSAGE -> {
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == event.getClearContent().toModel<MessageContent>()?.msgType) {
verificationService.onRoomRequestReceived(event)
}
}
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database
import im.vector.matrix.android.internal.database.helper.nextDisplayIndex
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.session.SessionLifecycleObserver
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.internal.task.TaskExecutor
import io.realm.Realm
import io.realm.RealmConfiguration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
private const val MAX_NUMBER_OF_EVENTS_IN_DB = 35_000L
private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300
/**
* This class makes sure to stay under a maximum number of events as it makes Realm to be unusable when listening to events
* when the database is getting too big. This will try incrementally to remove the biggest chunks until we get below the threshold.
* We make sure to still have a minimum number of events so it's not becoming unusable.
* So this won't work for users with a big number of very active rooms.
*/
internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor) : SessionLifecycleObserver {
override fun onStart() {
taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll()
Timber.v("There are ${allRooms.size} rooms in this session")
cleanUp(realm, MAX_NUMBER_OF_EVENTS_IN_DB / 2L)
}
}
}
private suspend fun cleanUp(realm: Realm, threshold: Long) {
val numberOfEvents = realm.where(EventEntity::class.java).findAll().size
val numberOfTimelineEvents = realm.where(TimelineEventEntity::class.java).findAll().size
Timber.v("Number of events in db: $numberOfEvents | Number of timeline events in db: $numberOfTimelineEvents")
if (threshold <= MIN_NUMBER_OF_EVENTS_BY_CHUNK || numberOfTimelineEvents < MAX_NUMBER_OF_EVENTS_IN_DB) {
Timber.v("Db is low enough")
} else {
val thresholdChunks = realm.where(ChunkEntity::class.java)
.greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, threshold)
.findAll()
Timber.v("There are ${thresholdChunks.size} chunks to clean with more than $threshold events")
for (chunk in thresholdChunks) {
val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS)
val thresholdDisplayIndex = maxDisplayIndex - threshold
val eventsToRemove = chunk.timelineEvents.where().lessThan(TimelineEventEntityFields.DISPLAY_INDEX, thresholdDisplayIndex).findAll()
Timber.v("There are ${eventsToRemove.size} events to clean in chunk: ${chunk.identifier()} from room ${chunk.room?.first()?.roomId}")
chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
eventsToRemove.forEach {
val canDeleteRoot = it.root?.stateKey == null
if (canDeleteRoot) {
it.root?.deleteFromRealm()
}
it.readReceipts?.readReceipts?.deleteAllFromRealm()
it.readReceipts?.deleteFromRealm()
it.annotations?.apply {
editSummary?.deleteFromRealm()
pollResponseSummary?.deleteFromRealm()
referencesSummaryEntity?.deleteFromRealm()
reactionsSummary.deleteAllFromRealm()
}
it.annotations?.deleteFromRealm()
it.readReceipts?.deleteFromRealm()
it.deleteFromRealm()
}
// We reset the prevToken so we will need to fetch again.
chunk.prevToken = null
}
cleanUp(realm, (threshold / 1.5).toLong())
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventInsertEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>,
private val cryptoService: CryptoService)
: RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventInsertEntity> {
it.where(EventInsertEntity::class.java)
}
override fun onChange(results: RealmResults<EventInsertEntity>) {
if (!results.isLoaded || results.isEmpty()) {
return
}
Timber.v("EventInsertEntity updated with ${results.size} results in db")
val filteredEvents = results.mapNotNull {
if (shouldProcess(it)) {
results.realm.copyFromRealm(it)
} else {
null
}
}
Timber.v("There are ${filteredEvents.size} events to process")
observerScope.launch {
awaitTransaction(realmConfiguration) { realm ->
filteredEvents.forEach { eventInsert ->
val eventId = eventInsert.eventId
val event = EventEntity.where(realm, eventId).findFirst()
if (event == null) {
Timber.v("Event $eventId not found")
return@forEach
}
val domainEvent = event.asDomain()
decryptIfNeeded(domainEvent)
processors.filter {
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
}.forEach {
it.process(realm, domainEvent)
}
}
realm.where(EventInsertEntity::class.java).findAll().deleteAllFromRealm()
}
}
}
private fun decryptIfNeeded(event: Event) {
if (event.isEncrypted() && event.mxDecryptionResult == null) {
try {
val result = cryptoService.decryptEvent(event, event.roomId ?: "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Call service: Failed to decrypt event")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
return processors.any {
it.shouldProcess(eventInsertEntity.eventId, eventInsertEntity.eventType, eventInsertEntity.insertType)
}
}
}

View File

@ -19,8 +19,8 @@ package im.vector.matrix.android.internal.database
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.session.SessionLifecycleObserver
import im.vector.matrix.android.internal.util.createBackgroundHandler
import io.realm.OrderedRealmCollectionChangeListener
import io.realm.Realm
import io.realm.RealmChangeListener
import io.realm.RealmConfiguration
import io.realm.RealmObject
import io.realm.RealmResults
@ -30,10 +30,10 @@ import kotlinx.coroutines.cancelChildren
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
internal interface LiveEntityObserver: SessionLifecycleObserver
internal interface LiveEntityObserver : SessionLifecycleObserver
internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val realmConfiguration: RealmConfiguration)
: LiveEntityObserver, OrderedRealmCollectionChangeListener<RealmResults<T>> {
: LiveEntityObserver, RealmChangeListener<RealmResults<T>> {
private companion object {
val BACKGROUND_HANDLER = createBackgroundHandler("LIVE_ENTITY_BACKGROUND")

View File

@ -115,6 +115,7 @@ internal fun ChunkEntity.addTimelineEvent(roomId: String,
true
}
}
numberOfTimelineEvents++
timelineEvents.add(timelineEventEntity)
}

View File

@ -45,7 +45,6 @@ internal object EventMapper {
eventEntity.redacts = event.redacts
eventEntity.age = event.unsignedData?.age ?: event.originServerTs
eventEntity.unsignedData = uds
eventEntity.decryptionResultJson = event.mxDecryptionResult?.let {
MoshiProvider.providesMoshi().adapter<OlmDecryptionResult>(OlmDecryptionResult::class.java).toJson(it)
}

View File

@ -1,34 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database.mapper
import im.vector.matrix.android.api.session.group.Group
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.session.group.DefaultGroup
internal object GroupMapper {
fun map(groupEntity: GroupEntity): Group {
return DefaultGroup(
groupEntity.groupId
)
}
}
internal fun GroupEntity.asDomain(): Group {
return GroupMapper.map(this)
}

View File

@ -27,6 +27,7 @@ internal open class ChunkEntity(@Index var prevToken: String? = null,
@Index var nextToken: String? = null,
var stateEvents: RealmList<EventEntity> = RealmList(),
var timelineEvents: RealmList<TimelineEventEntity> = RealmList(),
var numberOfTimelineEvents: Long = 0,
// Only one chunk will have isLastForward == true
@Index var isLastForward: Boolean = false,
@Index var isLastBackward: Boolean = false

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database.model
import io.realm.RealmObject
/**
* This class is used to get notification on new events being inserted. It's to avoid realm getting slow when listening to insert
* in EventEntity table.
*/
internal open class EventInsertEntity(var eventId: String = "",
var eventType: String = ""
) : RealmObject() {
private var insertTypeStr: String = EventInsertType.INCREMENTAL_SYNC.name
var insertType: EventInsertType
get() {
return EventInsertType.valueOf(insertTypeStr)
}
set(value) {
insertTypeStr = value.name
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database.model;
public enum EventInsertType {
INITIAL_SYNC,
INCREMENTAL_SYNC,
PAGINATION,
LOCAL_ECHO
}

View File

@ -22,8 +22,7 @@ import io.realm.annotations.PrimaryKey
/**
* This class is used to store group info (groupId and membership) from the sync response.
* Then [im.vector.matrix.android.internal.session.group.GroupSummaryUpdater] observes change and
* makes requests to fetch group information from the homeserver
* Then GetGroupDataTask is called regularly to fetch group information from the homeserver.
*/
internal open class GroupEntity(@PrimaryKey var groupId: String = "")
: RealmObject() {

View File

@ -25,6 +25,7 @@ import io.realm.annotations.RealmModule
classes = [
ChunkEntity::class,
EventEntity::class,
EventInsertEntity::class,
TimelineEventEntity::class,
FilterEntity::class,
GroupEntity::class,

View File

@ -18,16 +18,28 @@ package im.vector.matrix.android.internal.database.query
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.model.EventInsertEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import io.realm.Realm
import io.realm.RealmList
import io.realm.RealmQuery
import io.realm.kotlin.where
internal fun EventEntity.copyToRealmOrIgnore(realm: Realm): EventEntity {
return realm.where<EventEntity>()
.equalTo(EventEntityFields.EVENT_ID, eventId)
.equalTo(EventEntityFields.ROOM_ID, roomId)
.findFirst() ?: realm.copyToRealm(this)
internal fun EventEntity.copyToRealmOrIgnore(realm: Realm, insertType: EventInsertType): EventEntity {
val eventEntity = realm.where<EventEntity>()
.equalTo(EventEntityFields.EVENT_ID, eventId)
.equalTo(EventEntityFields.ROOM_ID, roomId)
.findFirst()
return if (eventEntity == null) {
val insertEntity = EventInsertEntity(eventId = eventId, eventType = type).apply {
this.insertType = insertType
}
realm.insert(insertEntity)
// copy this event entity and return it
realm.copyToRealm(this)
} else {
eventEntity
}
}
internal fun EventEntity.Companion.where(realm: Realm, eventId: String): RealmQuery<EventEntity> {

View File

@ -19,6 +19,7 @@ package im.vector.matrix.android.internal.database.query
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupEntityFields
import im.vector.matrix.android.internal.query.process
import io.realm.Realm
import io.realm.RealmQuery
import io.realm.kotlin.where
@ -28,10 +29,6 @@ internal fun GroupEntity.Companion.where(realm: Realm, groupId: String): RealmQu
.equalTo(GroupEntityFields.GROUP_ID, groupId)
}
internal fun GroupEntity.Companion.where(realm: Realm, membership: Membership? = null): RealmQuery<GroupEntity> {
val query = realm.where<GroupEntity>()
if (membership != null) {
query.equalTo(GroupEntityFields.MEMBERSHIP_STR, membership.name)
}
return query
internal fun GroupEntity.Companion.where(realm: Realm, memberships: List<Membership>): RealmQuery<GroupEntity> {
return realm.where<GroupEntity>().process(GroupEntityFields.MEMBERSHIP_STR, memberships)
}

View File

@ -20,6 +20,7 @@ import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntityFields
import io.realm.Realm
import io.realm.RealmQuery
import io.realm.kotlin.createObject
import io.realm.kotlin.where
internal fun GroupSummaryEntity.Companion.where(realm: Realm, groupId: String? = null): RealmQuery<GroupSummaryEntity> {
@ -34,3 +35,7 @@ internal fun GroupSummaryEntity.Companion.where(realm: Realm, groupIds: List<Str
return realm.where<GroupSummaryEntity>()
.`in`(GroupSummaryEntityFields.GROUP_ID, groupIds.toTypedArray())
}
internal fun GroupSummaryEntity.Companion.getOrCreate(realm: Realm, groupId: String): GroupSummaryEntity {
return where(realm, groupId).findFirst() ?: realm.createObject(groupId)
}

View File

@ -21,7 +21,9 @@ import androidx.work.Constraints
import androidx.work.ListenableWorker
import androidx.work.NetworkType
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.PeriodicWorkRequestBuilder
import androidx.work.WorkManager
import java.util.concurrent.TimeUnit
import javax.inject.Inject
internal class WorkManagerProvider @Inject constructor(
@ -39,6 +41,14 @@ internal class WorkManagerProvider @Inject constructor(
OneTimeWorkRequestBuilder<W>()
.addTag(tag)
/**
* Create a PeriodicWorkRequestBuilder, with the Matrix SDK tag
*/
inline fun <reified W : ListenableWorker> matrixPeriodicWorkRequestBuilder(repeatInterval: Long,
repeatIntervalTimeUnit: TimeUnit) =
PeriodicWorkRequestBuilder<W>(repeatInterval, repeatIntervalTimeUnit)
.addTag(tag)
/**
* Cancel all works instantiated by the Matrix SDK for the current session, and not those from the SDK client, or for other sessions
*/

View File

@ -50,6 +50,7 @@ import im.vector.matrix.android.api.session.user.UserService
import im.vector.matrix.android.api.session.widgets.WidgetService
import im.vector.matrix.android.internal.auth.SessionParamsStore
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.SessionId
import im.vector.matrix.android.internal.di.WorkManagerProvider
import im.vector.matrix.android.internal.session.identity.DefaultIdentityService
@ -60,6 +61,7 @@ import im.vector.matrix.android.internal.session.sync.job.SyncWorker
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.util.createUIHandler
import io.realm.RealmConfiguration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.greenrobot.eventbus.EventBus
@ -76,6 +78,7 @@ internal class DefaultSession @Inject constructor(
private val eventBus: EventBus,
@SessionId
override val sessionId: String,
@SessionDatabase private val realmConfiguration: RealmConfiguration,
private val lifecycleObservers: Set<@JvmSuppressWildcards SessionLifecycleObserver>,
private val sessionListeners: SessionListeners,
private val roomService: Lazy<RoomService>,

View File

@ -0,0 +1,28 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.database.model.EventInsertType
import io.realm.Realm
internal interface EventInsertLiveProcessor {
fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean
suspend fun process(realm: Realm, event: Event)
}

View File

@ -39,7 +39,9 @@ import im.vector.matrix.android.api.session.securestorage.SharedSecretStorageSer
import im.vector.matrix.android.api.session.typing.TypingUsersTracker
import im.vector.matrix.android.internal.crypto.crosssigning.ShieldTrustUpdater
import im.vector.matrix.android.internal.crypto.secrets.DefaultSharedSecretStorageService
import im.vector.matrix.android.internal.crypto.verification.VerificationMessageLiveObserver
import im.vector.matrix.android.internal.crypto.verification.VerificationMessageProcessor
import im.vector.matrix.android.internal.database.DatabaseCleaner
import im.vector.matrix.android.internal.database.EventInsertLiveObserver
import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory
import im.vector.matrix.android.internal.di.Authenticated
import im.vector.matrix.android.internal.di.DeviceId
@ -64,16 +66,15 @@ import im.vector.matrix.android.internal.network.httpclient.addSocketFactory
import im.vector.matrix.android.internal.network.interceptors.CurlLoggingInterceptor
import im.vector.matrix.android.internal.network.token.AccessTokenProvider
import im.vector.matrix.android.internal.network.token.HomeserverAccessTokenProvider
import im.vector.matrix.android.internal.session.call.CallEventObserver
import im.vector.matrix.android.internal.session.call.CallEventProcessor
import im.vector.matrix.android.internal.session.download.DownloadProgressInterceptor
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
import im.vector.matrix.android.internal.session.homeserver.DefaultHomeServerCapabilitiesService
import im.vector.matrix.android.internal.session.identity.DefaultIdentityService
import im.vector.matrix.android.internal.session.integrationmanager.IntegrationManager
import im.vector.matrix.android.internal.session.room.EventRelationsAggregationUpdater
import im.vector.matrix.android.internal.session.room.create.RoomCreateEventLiveObserver
import im.vector.matrix.android.internal.session.room.prune.EventsPruner
import im.vector.matrix.android.internal.session.room.tombstone.RoomTombstoneEventLiveObserver
import im.vector.matrix.android.internal.session.room.EventRelationsAggregationProcessor
import im.vector.matrix.android.internal.session.room.create.RoomCreateEventProcessor
import im.vector.matrix.android.internal.session.room.prune.RedactionEventProcessor
import im.vector.matrix.android.internal.session.room.tombstone.RoomTombstoneEventProcessor
import im.vector.matrix.android.internal.session.securestorage.DefaultSecureStorageService
import im.vector.matrix.android.internal.session.typing.DefaultTypingUsersTracker
import im.vector.matrix.android.internal.session.user.accountdata.DefaultAccountDataService
@ -293,31 +294,31 @@ internal abstract class SessionModule {
@Binds
@IntoSet
abstract fun bindGroupSummaryUpdater(updater: GroupSummaryUpdater): SessionLifecycleObserver
abstract fun bindEventRedactionProcessor(processor: RedactionEventProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindEventsPruner(pruner: EventsPruner): SessionLifecycleObserver
abstract fun bindEventRelationsAggregationProcessor(processor: EventRelationsAggregationProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindEventRelationsAggregationUpdater(updater: EventRelationsAggregationUpdater): SessionLifecycleObserver
abstract fun bindRoomTombstoneEventProcessor(processor: RoomTombstoneEventProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindRoomTombstoneEventLiveObserver(observer: RoomTombstoneEventLiveObserver): SessionLifecycleObserver
abstract fun bindRoomCreateEventProcessor(processor: RoomCreateEventProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindRoomCreateEventLiveObserver(observer: RoomCreateEventLiveObserver): SessionLifecycleObserver
abstract fun bindVerificationMessageProcessor(processor: VerificationMessageProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindVerificationMessageLiveObserver(observer: VerificationMessageLiveObserver): SessionLifecycleObserver
abstract fun bindCallEventProcessor(processor: CallEventProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindCallEventObserver(observer: CallEventObserver): SessionLifecycleObserver
abstract fun bindEventInsertObserver(observer: EventInsertLiveObserver): SessionLifecycleObserver
@Binds
@IntoSet
@ -335,6 +336,10 @@ internal abstract class SessionModule {
@IntoSet
abstract fun bindIdentityService(observer: DefaultIdentityService): SessionLifecycleObserver
@Binds
@IntoSet
abstract fun bindDatabaseCleaner(observer: DatabaseCleaner): SessionLifecycleObserver
@Binds
abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService

View File

@ -1,66 +0,0 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
internal class CallEventObserver @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: CallEventsObserverTask
) : RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(
EventType.CALL_ANSWER,
EventType.CALL_CANDIDATES,
EventType.CALL_INVITE,
EventType.CALL_HANGUP,
EventType.ENCRYPTED)
)
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("EventRelationsAggregationUpdater called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = CallEventsObserverTask.Params(
insertedDomains,
userId
)
observerScope.launch {
task.execute(params)
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal class CallEventProcessor @Inject constructor(
@UserId private val userId: String,
private val callService: DefaultCallSignalingService
) : EventInsertLiveProcessor {
private val allowedTypes = listOf(
EventType.CALL_ANSWER,
EventType.CALL_CANDIDATES,
EventType.CALL_INVITE,
EventType.CALL_HANGUP,
EventType.ENCRYPTED
)
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
if (insertType != EventInsertType.INCREMENTAL_SYNC) {
return false
}
return allowedTypes.contains(eventType)
}
override suspend fun process(realm: Realm, event: Event) {
update(realm, event)
}
private fun update(realm: Realm, event: Event) {
val now = System.currentTimeMillis()
// TODO might check if an invite is not closed (hangup/answsered) in the same event batch?
event.roomId ?: return Unit.also {
Timber.w("Event with no room id ${event.eventId}")
}
val age = now - (event.ageLocalTs ?: now)
if (age > 40_000) {
// To old to ring?
return
}
event.ageLocalTs
if (EventType.isCallEvent(event.getClearType())) {
callService.onCallEvent(event)
}
Timber.v("$realm : $userId")
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal interface CallEventsObserverTask : Task<CallEventsObserverTask.Params, Unit> {
data class Params(
val events: List<Event>,
val userId: String
)
}
internal class DefaultCallEventsObserverTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val cryptoService: CryptoService,
private val callService: DefaultCallSignalingService) : CallEventsObserverTask {
override suspend fun execute(params: CallEventsObserverTask.Params) {
val events = params.events
val userId = params.userId
monarchy.awaitTransaction { realm ->
Timber.v(">>> DefaultCallEventsObserverTask[${params.hashCode()}] called with ${events.size} events")
update(realm, events, userId)
Timber.v("<<< DefaultCallEventsObserverTask[${params.hashCode()}] finished")
}
}
private fun update(realm: Realm, events: List<Event>, userId: String) {
val now = System.currentTimeMillis()
// TODO might check if an invite is not closed (hangup/answsered) in the same event batch?
events.forEach { event ->
event.roomId ?: return@forEach Unit.also {
Timber.w("Event with no room id ${event.eventId}")
}
val age = now - (event.ageLocalTs ?: now)
if (age > 40_000) {
// To old to ring?
return@forEach
}
event.ageLocalTs
decryptIfNeeded(event)
if (EventType.isCallEvent(event.getClearType())) {
callService.onCallEvent(event)
}
}
Timber.v("$realm : $userId")
}
private fun decryptIfNeeded(event: Event) {
if (event.isEncrypted() && event.mxDecryptionResult == null) {
try {
val result = cryptoService.decryptEvent(event, event.roomId ?: "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Call service: Failed to decrypt event")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
}

View File

@ -41,7 +41,4 @@ internal abstract class CallModule {
@Binds
abstract fun bindGetTurnServerTask(task: DefaultGetTurnServerTask): GetTurnServerTask
@Binds
abstract fun bindCallEventsObserverTask(task: DefaultCallEventsObserverTask): CallEventsObserverTask
}

View File

@ -18,7 +18,9 @@ package im.vector.matrix.android.internal.session.group
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.network.executeRequest
@ -28,11 +30,14 @@ import im.vector.matrix.android.internal.session.group.model.GroupUsers
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject
internal interface GetGroupDataTask : Task<GetGroupDataTask.Params, Unit> {
data class Params(val groupId: String)
sealed class Params {
object FetchAllActive : Params()
data class FetchWithIds(val groupIds: List<String>) : Params()
}
}
internal class DefaultGetGroupDataTask @Inject constructor(
@ -41,44 +46,64 @@ internal class DefaultGetGroupDataTask @Inject constructor(
private val eventBus: EventBus
) : GetGroupDataTask {
private data class GroupData(
val groupId: String,
val groupSummary: GroupSummaryResponse,
val groupRooms: GroupRooms,
val groupUsers: GroupUsers
)
override suspend fun execute(params: GetGroupDataTask.Params) {
val groupId = params.groupId
val groupSummary = executeRequest<GroupSummaryResponse>(eventBus) {
apiCall = groupAPI.getSummary(groupId)
val groupIds = when (params) {
is GetGroupDataTask.Params.FetchAllActive -> {
getActiveGroupIds()
}
is GetGroupDataTask.Params.FetchWithIds -> {
params.groupIds
}
}
val groupRooms = executeRequest<GroupRooms>(eventBus) {
apiCall = groupAPI.getRooms(groupId)
Timber.v("Fetch data for group with ids: ${groupIds.joinToString(";")}")
val data = groupIds.map { groupId ->
val groupSummary = executeRequest<GroupSummaryResponse>(eventBus) {
apiCall = groupAPI.getSummary(groupId)
}
val groupRooms = executeRequest<GroupRooms>(eventBus) {
apiCall = groupAPI.getRooms(groupId)
}
val groupUsers = executeRequest<GroupUsers>(eventBus) {
apiCall = groupAPI.getUsers(groupId)
}
GroupData(groupId, groupSummary, groupRooms, groupUsers)
}
val groupUsers = executeRequest<GroupUsers>(eventBus) {
apiCall = groupAPI.getUsers(groupId)
}
insertInDb(groupSummary, groupRooms, groupUsers, groupId)
insertInDb(data)
}
private suspend fun insertInDb(groupSummary: GroupSummaryResponse,
groupRooms: GroupRooms,
groupUsers: GroupUsers,
groupId: String) {
private fun getActiveGroupIds(): List<String> {
return monarchy.fetchAllMappedSync(
{ realm ->
GroupEntity.where(realm, Membership.activeMemberships())
},
{ it.groupId }
)
}
private suspend fun insertInDb(groupDataList: List<GroupData>) {
monarchy
.awaitTransaction { realm ->
val groupSummaryEntity = GroupSummaryEntity.where(realm, groupId).findFirst()
?: realm.createObject(GroupSummaryEntity::class.java, groupId)
groupDataList.forEach { groupData ->
groupSummaryEntity.avatarUrl = groupSummary.profile?.avatarUrl ?: ""
val name = groupSummary.profile?.name
groupSummaryEntity.displayName = if (name.isNullOrEmpty()) groupId else name
groupSummaryEntity.shortDescription = groupSummary.profile?.shortDescription ?: ""
val groupSummaryEntity = GroupSummaryEntity.getOrCreate(realm, groupData.groupId)
groupSummaryEntity.roomIds.clear()
groupRooms.rooms.mapTo(groupSummaryEntity.roomIds) { it.roomId }
groupSummaryEntity.avatarUrl = groupData.groupSummary.profile?.avatarUrl ?: ""
val name = groupData.groupSummary.profile?.name
groupSummaryEntity.displayName = if (name.isNullOrEmpty()) groupData.groupId else name
groupSummaryEntity.shortDescription = groupData.groupSummary.profile?.shortDescription ?: ""
groupSummaryEntity.userIds.clear()
groupUsers.users.mapTo(groupSummaryEntity.userIds) { it.userId }
groupSummaryEntity.roomIds.clear()
groupData.groupRooms.rooms.mapTo(groupSummaryEntity.roomIds) { it.roomId }
groupSummaryEntity.membership = when (groupSummary.user?.membership) {
Membership.JOIN.value -> Membership.JOIN
Membership.INVITE.value -> Membership.INVITE
else -> Membership.LEAVE
groupSummaryEntity.userIds.clear()
groupData.groupUsers.users.mapTo(groupSummaryEntity.userIds) { it.userId }
}
}
}

View File

@ -16,6 +16,20 @@
package im.vector.matrix.android.internal.session.group
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.group.Group
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
internal class DefaultGroup(override val groupId: String) : Group
internal class DefaultGroup(override val groupId: String,
private val taskExecutor: TaskExecutor,
private val getGroupDataTask: GetGroupDataTask) : Group {
override fun fetchGroupData(callback: MatrixCallback<Unit>): Cancelable {
val params = GetGroupDataTask.Params.FetchWithIds(listOf(groupId))
return getGroupDataTask.configureWith(params) {
this.callback = callback
}.executeBy(taskExecutor)
}
}

View File

@ -23,6 +23,7 @@ import im.vector.matrix.android.api.session.group.GroupService
import im.vector.matrix.android.api.session.group.GroupSummaryQueryParams
import im.vector.matrix.android.api.session.group.model.GroupSummary
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntityFields
import im.vector.matrix.android.internal.database.query.where
@ -33,10 +34,15 @@ import io.realm.Realm
import io.realm.RealmQuery
import javax.inject.Inject
internal class DefaultGroupService @Inject constructor(@SessionDatabase private val monarchy: Monarchy) : GroupService {
internal class DefaultGroupService @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
private val groupFactory: GroupFactory) : GroupService {
override fun getGroup(groupId: String): Group? {
return null
return Realm.getInstance(monarchy.realmConfiguration).use { realm ->
GroupEntity.where(realm, groupId).findFirst()?.let {
groupFactory.create(groupId)
}
}
}
override fun getGroupSummary(groupId: String): GroupSummary? {

View File

@ -35,7 +35,6 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val groupIds: List<String>,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@ -48,14 +47,11 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val results = params.groupIds.map { groupId ->
runCatching { fetchGroupData(groupId) }
}
val isSuccessful = results.none { it.isFailure }
return if (isSuccessful) Result.success() else Result.retry()
}
private suspend fun fetchGroupData(groupId: String) {
getGroupDataTask.execute(GetGroupDataTask.Params(groupId))
return runCatching {
getGroupDataTask.execute(GetGroupDataTask.Params.FetchAllActive)
}.fold(
{ Result.success() },
{ Result.retry() }
)
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.group
import im.vector.matrix.android.api.session.group.Group
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.task.TaskExecutor
import javax.inject.Inject
internal interface GroupFactory {
fun create(groupId: String): Group
}
@SessionScope
internal class DefaultGroupFactory @Inject constructor(private val getGroupDataTask: GetGroupDataTask,
private val taskExecutor: TaskExecutor) :
GroupFactory {
override fun create(groupId: String): Group {
return DefaultGroup(
groupId = groupId,
taskExecutor = taskExecutor,
getGroupDataTask = getGroupDataTask
)
}
}

View File

@ -36,6 +36,9 @@ internal abstract class GroupModule {
}
}
@Binds
abstract fun bindGroupFactory(factory: DefaultGroupFactory): GroupFactory
@Binds
abstract fun bindGetGroupDataTask(task: DefaultGetGroupDataTask): GetGroupDataTask

View File

@ -1,91 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.group
import androidx.work.ExistingWorkPolicy
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.SessionId
import im.vector.matrix.android.internal.di.WorkManagerProvider
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
internal class GroupSummaryUpdater @Inject constructor(
private val workManagerProvider: WorkManagerProvider,
@SessionId private val sessionId: String,
@SessionDatabase private val monarchy: Monarchy)
: RealmLiveEntityObserver<GroupEntity>(monarchy.realmConfiguration) {
override val query = Monarchy.Query { GroupEntity.where(it) }
override fun onChange(results: RealmResults<GroupEntity>, changeSet: OrderedCollectionChangeSet) {
// `insertions` for new groups and `changes` to handle left groups
val modifiedGroupEntity = (changeSet.insertions + changeSet.changes)
.asSequence()
.mapNotNull { results[it] }
fetchGroupsData(modifiedGroupEntity
.filter { it.membership == Membership.JOIN || it.membership == Membership.INVITE }
.map { it.groupId }
.toList())
modifiedGroupEntity
.filter { it.membership == Membership.LEAVE }
.map { it.groupId }
.toList()
.also {
observerScope.launch {
deleteGroups(it)
}
}
}
private fun fetchGroupsData(groupIds: List<String>) {
val getGroupDataWorkerParams = GetGroupDataWorker.Params(sessionId, groupIds)
val workData = WorkerParamsFactory.toData(getGroupDataWorkerParams)
val getGroupWork = workManagerProvider.matrixOneTimeWorkRequestBuilder<GetGroupDataWorker>()
.setInputData(workData)
.setConstraints(WorkManagerProvider.workConstraints)
.build()
workManagerProvider.workManager
.beginUniqueWork(GET_GROUP_DATA_WORKER, ExistingWorkPolicy.APPEND, getGroupWork)
.enqueue()
}
/**
* Delete the GroupSummaryEntity of left groups
*/
private suspend fun deleteGroups(groupIds: List<String>) = awaitTransaction(monarchy.realmConfiguration) { realm ->
GroupSummaryEntity.where(realm, groupIds)
.findAll()
.deleteAllFromRealm()
}
}

View File

@ -15,9 +15,7 @@
*/
package im.vector.matrix.android.internal.session.room
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.AggregatedAnnotation
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
@ -32,13 +30,13 @@ import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessagePollResponseContent
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.api.session.room.model.relation.ReactionContent
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.crypto.model.event.EncryptedEventContent
import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.mapper.EventMapper
import im.vector.matrix.android.internal.database.model.EditAggregatedSummaryEntity
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.PollResponseAggregatedSummaryEntity
import im.vector.matrix.android.internal.database.model.ReactionAggregatedSummaryEntity
import im.vector.matrix.android.internal.database.model.ReactionAggregatedSummaryEntityFields
@ -47,21 +45,12 @@ import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.create
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal interface EventRelationsAggregationTask : Task<EventRelationsAggregationTask.Params, Unit> {
data class Params(
val events: List<Event>,
val userId: String
)
}
enum class VerificationState {
REQUEST,
WAITING,
@ -89,161 +78,146 @@ private fun VerificationState?.toState(newState: VerificationState): Verificatio
return newState
}
/**
* Called by EventRelationAggregationUpdater, when new events that can affect relations are inserted in base.
*/
internal class DefaultEventRelationsAggregationTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val cryptoService: CryptoService) : EventRelationsAggregationTask {
internal class EventRelationsAggregationProcessor @Inject constructor(@UserId private val userId: String,
private val cryptoService: CryptoService
) : EventInsertLiveProcessor {
// OPT OUT serer aggregation until API mature enough
private val SHOULD_HANDLE_SERVER_AGREGGATION = false
private val allowedTypes = listOf(
EventType.MESSAGE,
EventType.REDACTION,
EventType.REACTION,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
// TODO Add ?
// EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY,
EventType.ENCRYPTED
)
override suspend fun execute(params: EventRelationsAggregationTask.Params) {
val events = params.events
val userId = params.userId
monarchy.awaitTransaction { realm ->
Timber.v(">>> DefaultEventRelationsAggregationTask[${params.hashCode()}] called with ${events.size} events")
update(realm, events, userId)
Timber.v("<<< DefaultEventRelationsAggregationTask[${params.hashCode()}] finished")
}
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
return allowedTypes.contains(eventType)
}
private fun update(realm: Realm, events: List<Event>, userId: String) {
events.forEach { event ->
try { // Temporary catch, should be removed
val roomId = event.roomId
if (roomId == null) {
Timber.w("Event has no room id ${event.eventId}")
return@forEach
override suspend fun process(realm: Realm, event: Event) {
try { // Temporary catch, should be removed
val roomId = event.roomId
if (roomId == null) {
Timber.w("Event has no room id ${event.eventId}")
return
}
val isLocalEcho = LocalEcho.isLocalEchoId(event.eventId ?: "")
when (event.getClearType()) {
EventType.REACTION -> {
// we got a reaction!!
Timber.v("###REACTION in room $roomId , reaction eventID ${event.eventId}")
handleReaction(event, roomId, realm, userId, isLocalEcho)
}
val isLocalEcho = LocalEcho.isLocalEchoId(event.eventId ?: "")
when (event.type) {
EventType.REACTION -> {
// we got a reaction!!
Timber.v("###REACTION in room $roomId , reaction eventID ${event.eventId}")
handleReaction(event, roomId, realm, userId, isLocalEcho)
}
EventType.MESSAGE -> {
if (event.unsignedData?.relations?.annotations != null) {
Timber.v("###REACTION Agreggation in room $roomId for event ${event.eventId}")
handleInitialAggregatedRelations(event, roomId, event.unsignedData.relations.annotations, realm)
EventType.MESSAGE -> {
if (event.unsignedData?.relations?.annotations != null) {
Timber.v("###REACTION Agreggation in room $roomId for event ${event.eventId}")
handleInitialAggregatedRelations(event, roomId, event.unsignedData.relations.annotations, realm)
EventAnnotationsSummaryEntity.where(realm, event.eventId
?: "").findFirst()?.let {
TimelineEventEntity.where(realm, roomId = roomId, eventId = event.eventId
?: "").findFirst()?.let { tet ->
tet.annotations = it
}
}
}
val content: MessageContent? = event.content.toModel()
if (content?.relatesTo?.type == RelationType.REPLACE) {
Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
// A replace!
handleReplace(realm, event, content, roomId, isLocalEcho)
} else if (content?.relatesTo?.type == RelationType.RESPONSE) {
Timber.v("###RESPONSE in room $roomId for event ${event.eventId}")
handleResponse(realm, userId, event, content, roomId, isLocalEcho)
}
}
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY -> {
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
event.content.toModel<MessageRelationContent>()?.relatesTo?.let {
if (it.type == RelationType.REFERENCE && it.eventId != null) {
handleVerification(realm, event, roomId, isLocalEcho, it.eventId, userId)
EventAnnotationsSummaryEntity.where(realm, event.eventId
?: "").findFirst()?.let {
TimelineEventEntity.where(realm, roomId = roomId, eventId = event.eventId
?: "").findFirst()?.let { tet ->
tet.annotations = it
}
}
}
EventType.ENCRYPTED -> {
// Relation type is in clear
val encryptedEventContent = event.content.toModel<EncryptedEventContent>()
if (encryptedEventContent?.relatesTo?.type == RelationType.REPLACE
|| encryptedEventContent?.relatesTo?.type == RelationType.RESPONSE
) {
// we need to decrypt if needed
decryptIfNeeded(event)
event.getClearContent().toModel<MessageContent>()?.let {
if (encryptedEventContent.relatesTo.type == RelationType.REPLACE) {
Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
// A replace!
handleReplace(realm, event, it, roomId, isLocalEcho, encryptedEventContent.relatesTo.eventId)
} else if (encryptedEventContent.relatesTo.type == RelationType.RESPONSE) {
Timber.v("###RESPONSE in room $roomId for event ${event.eventId}")
handleResponse(realm, userId, event, it, roomId, isLocalEcho, encryptedEventContent.relatesTo.eventId)
}
val content: MessageContent? = event.content.toModel()
if (content?.relatesTo?.type == RelationType.REPLACE) {
Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
// A replace!
handleReplace(realm, event, content, roomId, isLocalEcho)
} else if (content?.relatesTo?.type == RelationType.RESPONSE) {
Timber.v("###RESPONSE in room $roomId for event ${event.eventId}")
handleResponse(realm, userId, event, content, roomId, isLocalEcho)
}
}
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY -> {
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
event.content.toModel<MessageRelationContent>()?.relatesTo?.let {
if (it.type == RelationType.REFERENCE && it.eventId != null) {
handleVerification(realm, event, roomId, isLocalEcho, it.eventId, userId)
}
}
}
EventType.ENCRYPTED -> {
// Relation type is in clear
val encryptedEventContent = event.content.toModel<EncryptedEventContent>()
if (encryptedEventContent?.relatesTo?.type == RelationType.REPLACE
|| encryptedEventContent?.relatesTo?.type == RelationType.RESPONSE
) {
// we need to decrypt if needed
event.getClearContent().toModel<MessageContent>()?.let {
if (encryptedEventContent.relatesTo.type == RelationType.REPLACE) {
Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
// A replace!
handleReplace(realm, event, it, roomId, isLocalEcho, encryptedEventContent.relatesTo.eventId)
} else if (encryptedEventContent.relatesTo.type == RelationType.RESPONSE) {
Timber.v("###RESPONSE in room $roomId for event ${event.eventId}")
handleResponse(realm, userId, event, it, roomId, isLocalEcho, encryptedEventContent.relatesTo.eventId)
}
} else if (encryptedEventContent?.relatesTo?.type == RelationType.REFERENCE) {
decryptIfNeeded(event)
when (event.getClearType()) {
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY -> {
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
encryptedEventContent.relatesTo.eventId?.let {
handleVerification(realm, event, roomId, isLocalEcho, it, userId)
}
}
} else if (encryptedEventContent?.relatesTo?.type == RelationType.REFERENCE) {
when (event.getClearType()) {
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY -> {
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
encryptedEventContent.relatesTo.eventId?.let {
handleVerification(realm, event, roomId, isLocalEcho, it, userId)
}
}
}
}
EventType.REDACTION -> {
val eventToPrune = event.redacts?.let { EventEntity.where(realm, eventId = it).findFirst() }
?: return@forEach
when (eventToPrune.type) {
EventType.MESSAGE -> {
Timber.d("REDACTION for message ${eventToPrune.eventId}")
}
EventType.REDACTION -> {
val eventToPrune = event.redacts?.let { EventEntity.where(realm, eventId = it).findFirst() }
?: return
when (eventToPrune.type) {
EventType.MESSAGE -> {
Timber.d("REDACTION for message ${eventToPrune.eventId}")
// val unsignedData = EventMapper.map(eventToPrune).unsignedData
// ?: UnsignedData(null, null)
// was this event a m.replace
val contentModel = ContentMapper.map(eventToPrune.content)?.toModel<MessageContent>()
if (RelationType.REPLACE == contentModel?.relatesTo?.type && contentModel.relatesTo?.eventId != null) {
handleRedactionOfReplace(eventToPrune, contentModel.relatesTo!!.eventId!!, realm)
}
}
EventType.REACTION -> {
handleReactionRedact(eventToPrune, realm, userId)
// was this event a m.replace
val contentModel = ContentMapper.map(eventToPrune.content)?.toModel<MessageContent>()
if (RelationType.REPLACE == contentModel?.relatesTo?.type && contentModel.relatesTo?.eventId != null) {
handleRedactionOfReplace(eventToPrune, contentModel.relatesTo!!.eventId!!, realm)
}
}
EventType.REACTION -> {
handleReactionRedact(eventToPrune, realm, userId)
}
}
else -> Timber.v("UnHandled event ${event.eventId}")
}
} catch (t: Throwable) {
Timber.e(t, "## Should not happen ")
else -> Timber.v("UnHandled event ${event.eventId}")
}
} catch (t: Throwable) {
Timber.e(t, "## Should not happen ")
}
}
private fun decryptIfNeeded(event: Event) {
if (event.mxDecryptionResult == null) {
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Failed to decrypt e2e replace")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
// OPT OUT serer aggregation until API mature enough
private val SHOULD_HANDLE_SERVER_AGREGGATION = false
private fun handleReplace(realm: Realm, event: Event, content: MessageContent, roomId: String, isLocalEcho: Boolean, relatedEventId: String? = null) {
val eventId = event.eventId ?: return

View File

@ -1,76 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
/**
* Acts as a listener of incoming messages in order to incrementally computes a summary of annotations.
* For reactions will build a EventAnnotationsSummaryEntity, ans for edits a EditAggregatedSummaryEntity.
* The summaries can then be extracted and added (as a decoration) to a TimelineEvent for final display.
*/
internal class EventRelationsAggregationUpdater @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: EventRelationsAggregationTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(
EventType.MESSAGE,
EventType.REDACTION,
EventType.REACTION,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
// TODO Add ?
// EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY,
EventType.ENCRYPTED)
)
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("EventRelationsAggregationUpdater called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = EventRelationsAggregationTask.Params(
insertedDomains,
userId
)
observerScope.launch {
task.execute(params)
}
}
}

View File

@ -44,8 +44,6 @@ import im.vector.matrix.android.internal.session.room.membership.joining.InviteT
import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask
import im.vector.matrix.android.internal.session.room.membership.leaving.DefaultLeaveRoomTask
import im.vector.matrix.android.internal.session.room.membership.leaving.LeaveRoomTask
import im.vector.matrix.android.internal.session.room.prune.DefaultPruneEventTask
import im.vector.matrix.android.internal.session.room.prune.PruneEventTask
import im.vector.matrix.android.internal.session.room.read.DefaultMarkAllRoomsReadTask
import im.vector.matrix.android.internal.session.room.read.DefaultSetReadMarkersTask
import im.vector.matrix.android.internal.session.room.read.MarkAllRoomsReadTask
@ -64,10 +62,10 @@ import im.vector.matrix.android.internal.session.room.tags.AddTagToRoomTask
import im.vector.matrix.android.internal.session.room.tags.DefaultAddTagToRoomTask
import im.vector.matrix.android.internal.session.room.tags.DefaultDeleteTagFromRoomTask
import im.vector.matrix.android.internal.session.room.tags.DeleteTagFromRoomTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultFetchNextTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultFetchTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultGetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultPaginationTask
import im.vector.matrix.android.internal.session.room.timeline.FetchNextTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.FetchTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.GetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.PaginationTask
import im.vector.matrix.android.internal.session.room.typing.DefaultSendTypingTask
@ -129,9 +127,6 @@ internal abstract class RoomModule {
@Binds
abstract fun bindFileService(service: DefaultFileService): FileService
@Binds
abstract fun bindEventRelationsAggregationTask(task: DefaultEventRelationsAggregationTask): EventRelationsAggregationTask
@Binds
abstract fun bindCreateRoomTask(task: DefaultCreateRoomTask): CreateRoomTask
@ -156,9 +151,6 @@ internal abstract class RoomModule {
@Binds
abstract fun bindLoadRoomMembersTask(task: DefaultLoadRoomMembersTask): LoadRoomMembersTask
@Binds
abstract fun bindPruneEventTask(task: DefaultPruneEventTask): PruneEventTask
@Binds
abstract fun bindSetReadMarkersTask(task: DefaultSetReadMarkersTask): SetReadMarkersTask
@ -184,7 +176,7 @@ internal abstract class RoomModule {
abstract fun bindPaginationTask(task: DefaultPaginationTask): PaginationTask
@Binds
abstract fun bindFetchNextTokenAndPaginateTask(task: DefaultFetchNextTokenAndPaginateTask): FetchNextTokenAndPaginateTask
abstract fun bindFetchNextTokenAndPaginateTask(task: DefaultFetchTokenAndPaginateTask): FetchTokenAndPaginateTask
@Binds
abstract fun bindFetchEditHistoryTask(task: DefaultFetchEditHistoryTask): FetchEditHistoryTask

View File

@ -1,72 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.create
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
realmConfiguration: RealmConfiguration)
: RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(EventType.STATE_ROOM_CREATE))
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
changeSet.insertions
.asSequence()
.mapNotNull {
results[it]?.asDomain()
}
.toList()
.also {
observerScope.launch {
handleRoomCreateEvents(it)
}
}
}
private suspend fun handleRoomCreateEvents(createEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.create
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import javax.inject.Inject
internal class RoomCreateEventProcessor @Inject constructor() : EventInsertLiveProcessor {
override suspend fun process(realm: Realm, event: Event) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: return
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
return eventType == EventType.STATE_ROOM_CREATE
}
}

View File

@ -21,6 +21,7 @@ import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.CurrentStateEventEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.copyToRealmOrIgnore
import im.vector.matrix.android.internal.database.query.getOrCreate
@ -76,7 +77,7 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(
continue
}
val ageLocalTs = roomMemberEvent.unsignedData?.age?.let { now - it }
val eventEntity = roomMemberEvent.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = roomMemberEvent.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
CurrentStateEventEntity.getOrCreate(realm, roomId, roomMemberEvent.stateKey, roomMemberEvent.type).apply {
eventId = roomMemberEvent.eventId
root = eventEntity

View File

@ -1,56 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.prune
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
/**
* Listens to the database for the insertion of any redaction event.
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class EventsPruner @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val pruneEventTask: PruneEventTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> { EventEntity.whereTypes(it, listOf(EventType.REDACTION)) }
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("Event pruner called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
observerScope.launch {
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.execute(params)
}
}
}

View File

@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.prune
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.LocalEcho
@ -23,32 +23,28 @@ import im.vector.matrix.android.api.session.events.model.UnsignedData
import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.mapper.EventMapper
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.findWithSenderMembershipEvent
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.MoshiProvider
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal interface PruneEventTask : Task<PruneEventTask.Params, Unit> {
/**
* Listens to the database for the insertion of any redaction event.
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class RedactionEventProcessor @Inject constructor() : EventInsertLiveProcessor {
data class Params(
val redactionEvents: List<Event>
)
}
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
return eventType == EventType.REDACTION
}
internal class DefaultPruneEventTask @Inject constructor(@SessionDatabase private val monarchy: Monarchy) : PruneEventTask {
override suspend fun execute(params: PruneEventTask.Params) {
monarchy.awaitTransaction { realm ->
params.redactionEvents.forEach { event ->
pruneEvent(realm, event)
}
}
override suspend fun process(realm: Realm, event: Event) {
pruneEvent(realm, event)
}
private fun pruneEvent(realm: Realm, redactionEvent: Event) {

View File

@ -62,7 +62,6 @@ import im.vector.matrix.android.internal.session.content.ThumbnailExtractor
import im.vector.matrix.android.internal.session.room.send.pills.TextPillsUtils
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.StringProvider
import kotlinx.coroutines.launch
import javax.inject.Inject
/**
@ -472,9 +471,7 @@ internal class LocalEchoEventFactory @Inject constructor(
fun createLocalEcho(event: Event) {
checkNotNull(event.roomId) { "Your event should have a roomId" }
taskExecutor.executorScope.launch {
localEchoRepository.createLocalEcho(event)
}
localEchoRepository.createLocalEcho(event)
}
companion object {

View File

@ -29,6 +29,8 @@ import im.vector.matrix.android.internal.database.mapper.TimelineEventMapper
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventInsertEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.findAllInRoomWithSendStates
@ -48,7 +50,7 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
private val eventBus: EventBus,
private val timelineEventMapper: TimelineEventMapper) {
suspend fun createLocalEcho(event: Event) {
fun createLocalEcho(event: Event) {
val roomId = event.roomId ?: throw IllegalStateException("You should have set a roomId for your event")
val senderId = event.senderId ?: throw IllegalStateException("You should have set a senderIf for your event")
if (event.eventId == null) {
@ -70,8 +72,12 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
}
val timelineEvent = timelineEventMapper.map(timelineEventEntity)
eventBus.post(DefaultTimeline.OnLocalEchoCreated(roomId = roomId, timelineEvent = timelineEvent))
monarchy.awaitTransaction { realm ->
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@awaitTransaction
monarchy.writeAsync { realm ->
val eventInsertEntity = EventInsertEntity(event.eventId, event.type).apply {
this.insertType = EventInsertType.LOCAL_ECHO
}
realm.insert(eventInsertEntity)
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@writeAsync
roomEntity.sendingTimelineEvents.add(0, timelineEventEntity)
roomSummaryUpdater.update(realm, roomId)
}

View File

@ -29,17 +29,14 @@ import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.api.session.room.timeline.TimelineSettings
import im.vector.matrix.android.api.util.CancelableBag
import im.vector.matrix.android.internal.database.mapper.TimelineEventMapper
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.database.query.TimelineEventFilter
import im.vector.matrix.android.internal.database.query.findAllInRoomWithSendStates
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereInRoom
import im.vector.matrix.android.internal.database.query.whereRoomId
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
@ -72,7 +69,7 @@ internal class DefaultTimeline(
private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor,
private val contextOfEventTask: GetContextOfEventTask,
private val fetchNextTokenAndPaginateTask: FetchNextTokenAndPaginateTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val paginationTask: PaginationTask,
private val timelineEventMapper: TimelineEventMapper,
private val settings: TimelineSettings,
@ -98,9 +95,7 @@ internal class DefaultTimeline(
private lateinit var nonFilteredEvents: RealmResults<TimelineEventEntity>
private lateinit var filteredEvents: RealmResults<TimelineEventEntity>
private lateinit var eventRelations: RealmResults<EventAnnotationsSummaryEntity>
private var roomEntity: RoomEntity? = null
private lateinit var sendingEvents: RealmResults<TimelineEventEntity>
private var prevDisplayIndex: Int? = null
private var nextDisplayIndex: Int? = null
@ -122,20 +117,6 @@ internal class DefaultTimeline(
handleUpdates(results, changeSet)
}
private val relationsListener = OrderedRealmCollectionChangeListener<RealmResults<EventAnnotationsSummaryEntity>> { collection, changeSet ->
var hasChange = false
(changeSet.insertions + changeSet.changes).forEach {
val eventRelations = collection[it]
if (eventRelations != null) {
hasChange = rebuildEvent(eventRelations.eventId) { te ->
te.copy(annotations = eventRelations.asDomain())
} || hasChange
}
}
if (hasChange) postSnapshot()
}
// Public methods ******************************************************************************
override fun paginate(direction: Timeline.Direction, count: Int) {
@ -173,26 +154,23 @@ internal class DefaultTimeline(
val realm = Realm.getInstance(realmConfiguration)
backgroundRealm.set(realm)
roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
roomEntity?.sendingTimelineEvents?.addChangeListener { events ->
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
?: throw IllegalStateException("Can't open a timeline without a room")
sendingEvents = roomEntity.sendingTimelineEvents.where().filterEventsWithSettings().findAll()
sendingEvents.addChangeListener { events ->
// Remove in memory as soon as they are known by database
events.forEach { te ->
inMemorySendingEvents.removeAll { te.eventId == it.eventId }
}
postSnapshot()
}
nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll()
filteredEvents = nonFilteredEvents.where()
.filterEventsWithSettings()
.findAll()
filteredEvents.addChangeListener(eventsChangeListener)
handleInitialLoad()
nonFilteredEvents.addChangeListener(eventsChangeListener)
eventRelations = EventAnnotationsSummaryEntity.whereInRoom(realm, roomId)
.findAllAsync()
.also { it.addChangeListener(relationsListener) }
if (settings.shouldHandleHiddenReadReceipts()) {
hiddenReadReceipts.start(realm, filteredEvents, nonFilteredEvents, this)
}
@ -213,9 +191,8 @@ internal class DefaultTimeline(
cancelableBag.cancel()
BACKGROUND_HANDLER.removeCallbacksAndMessages(null)
BACKGROUND_HANDLER.post {
roomEntity?.sendingTimelineEvents?.removeAllChangeListeners()
if (this::eventRelations.isInitialized) {
eventRelations.removeAllChangeListeners()
if (this::sendingEvents.isInitialized) {
sendingEvents.removeAllChangeListeners()
}
if (this::nonFilteredEvents.isInitialized) {
nonFilteredEvents.removeAllChangeListeners()
@ -314,7 +291,7 @@ internal class DefaultTimeline(
listeners.clear()
}
// TimelineHiddenReadReceipts.Delegate
// TimelineHiddenReadReceipts.Delegate
override fun rebuildEvent(eventId: String, readReceipts: List<ReadReceipt>): Boolean {
return rebuildEvent(eventId) { te ->
@ -347,7 +324,7 @@ internal class DefaultTimeline(
}
}
// Private methods *****************************************************************************
// Private methods *****************************************************************************
private fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent): Boolean {
return builtEventsIdMap[eventId]?.let { builtIndex ->
@ -411,20 +388,16 @@ internal class DefaultTimeline(
}
private fun buildSendingEvents(): List<TimelineEvent> {
val sendingEvents = ArrayList<TimelineEvent>()
val builtSendingEvents = ArrayList<TimelineEvent>()
if (hasReachedEnd(Timeline.Direction.FORWARDS) && !hasMoreInCache(Timeline.Direction.FORWARDS)) {
sendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings())
roomEntity?.sendingTimelineEvents
?.where()
?.filterEventsWithSettings()
?.findAll()
?.forEach { timelineEventEntity ->
if (sendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) {
sendingEvents.add(timelineEventMapper.map(timelineEventEntity))
}
}
builtSendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings())
sendingEvents.forEach { timelineEventEntity ->
if (builtSendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) {
builtSendingEvents.add(timelineEventMapper.map(timelineEventEntity))
}
}
}
return sendingEvents
return builtSendingEvents
}
private fun canPaginate(direction: Timeline.Direction): Boolean {
@ -525,19 +498,25 @@ internal class DefaultTimeline(
val currentChunk = getLiveChunk()
val token = if (direction == Timeline.Direction.BACKWARDS) currentChunk?.prevToken else currentChunk?.nextToken
if (token == null) {
if (direction == Timeline.Direction.FORWARDS && currentChunk?.hasBeenALastForwardChunk().orFalse()) {
// We are in the case that next event exists, but we do not know the next token.
// Fetch (again) the last event to get a nextToken
val lastKnownEventId = nonFilteredEvents.firstOrNull()?.eventId
if (direction == Timeline.Direction.BACKWARDS
|| (direction == Timeline.Direction.FORWARDS && currentChunk?.hasBeenALastForwardChunk().orFalse())) {
// We are in the case where event exists, but we do not know the token.
// Fetch (again) the last event to get a token
val lastKnownEventId = if (direction == Timeline.Direction.FORWARDS) {
nonFilteredEvents.firstOrNull()?.eventId
} else {
nonFilteredEvents.lastOrNull()?.eventId
}
if (lastKnownEventId == null) {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
} else {
val params = FetchNextTokenAndPaginateTask.Params(
val params = FetchTokenAndPaginateTask.Params(
roomId = roomId,
limit = limit,
direction = direction.toPaginationDirection(),
lastKnownEventId = lastKnownEventId
)
cancelableBag += fetchNextTokenAndPaginateTask
cancelableBag += fetchTokenAndPaginateTask
.configureWith(params) {
this.callback = createPaginationCallback(limit, direction)
}
@ -766,7 +745,7 @@ internal class DefaultTimeline(
}
}
// Extension methods ***************************************************************************
// Extension methods ***************************************************************************
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS

View File

@ -43,7 +43,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
private val contextOfEventTask: GetContextOfEventTask,
private val eventDecryptor: TimelineEventDecryptor,
private val paginationTask: PaginationTask,
private val fetchNextTokenAndPaginateTask: FetchNextTokenAndPaginateTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper
) : TimelineService {
@ -66,7 +66,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
hiddenReadReceipts = TimelineHiddenReadReceipts(readReceiptsSummaryMapper, roomId, settings),
eventBus = eventBus,
eventDecryptor = eventDecryptor,
fetchNextTokenAndPaginateTask = fetchNextTokenAndPaginateTask
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask
)
}

View File

@ -28,38 +28,48 @@ import im.vector.matrix.android.internal.util.awaitTransaction
import org.greenrobot.eventbus.EventBus
import javax.inject.Inject
internal interface FetchNextTokenAndPaginateTask : Task<FetchNextTokenAndPaginateTask.Params, TokenChunkEventPersistor.Result> {
internal interface FetchTokenAndPaginateTask : Task<FetchTokenAndPaginateTask.Params, TokenChunkEventPersistor.Result> {
data class Params(
val roomId: String,
val lastKnownEventId: String,
val direction: PaginationDirection,
val limit: Int
)
}
internal class DefaultFetchNextTokenAndPaginateTask @Inject constructor(
internal class DefaultFetchTokenAndPaginateTask @Inject constructor(
private val roomAPI: RoomAPI,
@SessionDatabase private val monarchy: Monarchy,
private val filterRepository: FilterRepository,
private val paginationTask: PaginationTask,
private val eventBus: EventBus
) : FetchNextTokenAndPaginateTask {
) : FetchTokenAndPaginateTask {
override suspend fun execute(params: FetchNextTokenAndPaginateTask.Params): TokenChunkEventPersistor.Result {
override suspend fun execute(params: FetchTokenAndPaginateTask.Params): TokenChunkEventPersistor.Result {
val filter = filterRepository.getRoomFilter()
val response = executeRequest<EventContextResponse>(eventBus) {
apiCall = roomAPI.getContextOfEvent(params.roomId, params.lastKnownEventId, 0, filter)
}
if (response.end == null) {
throw IllegalStateException("No next token found")
val fromToken = if (params.direction == PaginationDirection.FORWARDS) {
response.end
} else {
response.start
}
monarchy.awaitTransaction {
ChunkEntity.findIncludingEvent(it, params.lastKnownEventId)?.nextToken = response.end
?: throw IllegalStateException("No token found")
monarchy.awaitTransaction { realm ->
val chunkToUpdate = ChunkEntity.findIncludingEvent(realm, params.lastKnownEventId)
if (params.direction == PaginationDirection.FORWARDS) {
chunkToUpdate?.nextToken = fromToken
} else {
chunkToUpdate?.prevToken = fromToken
}
}
val paginationParams = PaginationTask.Params(
roomId = params.roomId,
from = response.end,
direction = PaginationDirection.FORWARDS,
from = fromToken,
direction = params.direction,
limit = params.limit
)
return paginationTask.execute(paginationParams)

View File

@ -28,6 +28,7 @@ import im.vector.matrix.android.internal.database.helper.deleteOnCascade
import im.vector.matrix.android.internal.database.helper.merge
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
@ -204,7 +205,7 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
for (stateEvent in stateEvents) {
val ageLocalTs = stateEvent.unsignedData?.age?.let { now - it }
val stateEventEntity = stateEvent.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val stateEventEntity = stateEvent.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
currentChunk.addStateEvent(roomId, stateEventEntity, direction)
if (stateEvent.type == EventType.STATE_ROOM_MEMBER && stateEvent.stateKey != null) {
roomMemberContentsByUser[stateEvent.stateKey] = stateEvent.content.toModel<RoomMemberContent>()
@ -217,7 +218,7 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
}
val ageLocalTs = event.unsignedData?.age?.let { now - it }
eventIds.add(event.eventId)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
if (event.type == EventType.STATE_ROOM_MEMBER && event.stateKey != null) {
val contentToUse = if (direction == PaginationDirection.BACKWARDS) {
event.prevContent

View File

@ -1,75 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.tombstone
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDatabase
realmConfiguration: RealmConfiguration)
: RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(EventType.STATE_ROOM_TOMBSTONE))
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
changeSet.insertions
.asSequence()
.mapNotNull {
results[it]?.asDomain()
}
.toList()
.also {
observerScope.launch {
handleRoomTombstoneEvents(it)
}
}
}
private suspend fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoomId == null) continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.tombstone
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import javax.inject.Inject
internal class RoomTombstoneEventProcessor @Inject constructor() : EventInsertLiveProcessor {
override suspend fun process(realm: Realm, event: Event) {
if (event.roomId == null) return
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoomId == null) return
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
return eventType == EventType.STATE_ROOM_TOMBSTONE
}
}

View File

@ -19,6 +19,8 @@ package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.R
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.mapWithProgress
@ -64,29 +66,33 @@ internal class GroupSyncHandler @Inject constructor() {
handleLeftGroup(realm, it.key)
}
}
/** Note: [im.vector.matrix.android.internal.session.group.GroupSummaryUpdater] is observing changes */
realm.insertOrUpdate(groups)
}
private fun handleJoinedGroup(realm: Realm,
groupId: String): GroupEntity {
val groupEntity = GroupEntity.where(realm, groupId).findFirst() ?: GroupEntity(groupId)
val groupSummaryEntity = GroupSummaryEntity.getOrCreate(realm, groupId)
groupEntity.membership = Membership.JOIN
groupSummaryEntity.membership = Membership.JOIN
return groupEntity
}
private fun handleInvitedGroup(realm: Realm,
groupId: String): GroupEntity {
val groupEntity = GroupEntity.where(realm, groupId).findFirst() ?: GroupEntity(groupId)
val groupSummaryEntity = GroupSummaryEntity.getOrCreate(realm, groupId)
groupEntity.membership = Membership.INVITE
groupSummaryEntity.membership = Membership.INVITE
return groupEntity
}
private fun handleLeftGroup(realm: Realm,
groupId: String): GroupEntity {
val groupEntity = GroupEntity.where(realm, groupId).findFirst() ?: GroupEntity(groupId)
val groupSummaryEntity = GroupSummaryEntity.getOrCreate(realm, groupId)
groupEntity.membership = Membership.LEAVE
groupSummaryEntity.membership = Membership.LEAVE
return groupEntity
}
}

View File

@ -35,6 +35,7 @@ import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.CurrentStateEventEntity
import im.vector.matrix.android.internal.database.model.EventInsertType
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.RoomMemberSummaryEntity
import im.vector.matrix.android.internal.database.query.copyToRealmOrIgnore
@ -47,9 +48,9 @@ import im.vector.matrix.android.internal.di.MoshiProvider
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.mapWithProgress
import im.vector.matrix.android.internal.session.room.summary.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.membership.RoomMemberEventHandler
import im.vector.matrix.android.internal.session.room.read.FullyReadContent
import im.vector.matrix.android.internal.session.room.summary.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.timeline.DefaultTimeline
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.internal.session.room.timeline.TimelineEventDecryptor
@ -97,20 +98,25 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
// PRIVATE METHODS *****************************************************************************
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService?) {
val insertType = if (isInitialSync) {
EventInsertType.INITIAL_SYNC
} else {
EventInsertType.INCREMENTAL_SYNC
}
val syncLocalTimeStampMillis = System.currentTimeMillis()
val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, isInitialSync, syncLocalTimeStampMillis)
handleJoinedRoom(realm, it.key, it.value, isInitialSync, insertType, syncLocalTimeStampMillis)
}
is HandlingStrategy.INVITED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) {
handleInvitedRoom(realm, it.key, it.value, syncLocalTimeStampMillis)
handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
}
is HandlingStrategy.LEFT -> {
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_left_rooms, 0.3f) {
handleLeftRoom(realm, it.key, it.value, syncLocalTimeStampMillis)
handleLeftRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
}
}
}
@ -121,6 +127,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomId: String,
roomSync: RoomSync,
isInitialSync: Boolean,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
Timber.v("Handle join sync for room $roomId")
@ -147,7 +154,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
continue
}
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
CurrentStateEventEntity.getOrCreate(realm, roomId, event.stateKey, event.type).apply {
eventId = event.eventId
root = eventEntity
@ -165,6 +172,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomSync.timeline.events,
roomSync.timeline.prevToken,
roomSync.timeline.limited,
insertType,
syncLocalTimestampMillis,
isInitialSync
)
@ -191,6 +199,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleInvitedRoom(realm: Realm,
roomId: String,
roomSync: InvitedRoomSync,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
Timber.v("Handle invited sync for room $roomId")
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
@ -201,7 +210,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
return@forEach
}
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
CurrentStateEventEntity.getOrCreate(realm, roomId, event.stateKey, event.type).apply {
eventId = eventEntity.eventId
root = eventEntity
@ -219,6 +228,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleLeftRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
for (event in roomSync.state?.events.orEmpty()) {
@ -226,7 +236,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
continue
}
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
CurrentStateEventEntity.getOrCreate(realm, roomId, event.stateKey, event.type).apply {
eventId = event.eventId
root = eventEntity
@ -238,7 +248,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
continue
}
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
if (event.stateKey != null) {
CurrentStateEventEntity.getOrCreate(realm, roomId, event.stateKey, event.type).apply {
eventId = event.eventId
@ -263,6 +273,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
eventList: List<Event>,
prevToken: String? = null,
isLimited: Boolean = true,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
isInitialSync: Boolean): ChunkEntity {
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
@ -289,7 +300,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
if (event.stateKey != null) {
CurrentStateEventEntity.getOrCreate(realm, roomId, event.stateKey, event.type).apply {
eventId = event.eventId

View File

@ -16,23 +16,34 @@
package im.vector.matrix.android.internal.session.sync
import androidx.work.ExistingPeriodicWorkPolicy
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R
import im.vector.matrix.android.api.pushrules.PushRuleService
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.SessionId
import im.vector.matrix.android.internal.di.WorkManagerProvider
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.group.GetGroupDataWorker
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
import im.vector.matrix.android.internal.session.reportSubtask
import im.vector.matrix.android.internal.session.sync.model.GroupsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.RoomsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.awaitTransaction
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
import kotlin.system.measureTimeMillis
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
@SessionId private val sessionId: String,
private val workManagerProvider: WorkManagerProvider,
private val roomSyncHandler: RoomSyncHandler,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val groupSyncHandler: GroupSyncHandler,
@ -109,10 +120,39 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
}
syncResponse.groups?.also {
scheduleGroupDataFetchingIfNeeded(it)
}
Timber.v("On sync completed")
cryptoSyncHandler.onSyncCompleted(syncResponse)
}
/**
* At the moment we don't get any group data through the sync, so we poll where every hour.
You can also force to refetch group data using [Group] API.
*/
private fun scheduleGroupDataFetchingIfNeeded(groupsSyncResponse: GroupsSyncResponse) {
val groupIds = ArrayList<String>()
groupIds.addAll(groupsSyncResponse.join.keys)
groupIds.addAll(groupsSyncResponse.invite.keys)
if (groupIds.isEmpty()) {
Timber.v("No new groups to fetch data for.")
return
}
Timber.v("There are ${groupIds.size} new groups to fetch data for.")
val getGroupDataWorkerParams = GetGroupDataWorker.Params(sessionId)
val workData = WorkerParamsFactory.toData(getGroupDataWorkerParams)
val getGroupWork = workManagerProvider.matrixPeriodicWorkRequestBuilder<GetGroupDataWorker>(1, TimeUnit.HOURS)
.setInputData(workData)
.setConstraints(WorkManagerProvider.workConstraints)
.build()
workManagerProvider.workManager
.enqueueUniquePeriodicWork(GET_GROUP_DATA_WORKER, ExistingPeriodicWorkPolicy.REPLACE, getGroupWork)
}
private suspend fun checkPushRules(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean) {
Timber.v("[PushRules] --> checkPushRules")
if (isInitialSync) {

View File

@ -23,6 +23,7 @@ import com.airbnb.mvrx.MvRxViewModelFactory
import com.airbnb.mvrx.ViewModelContext
import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import im.vector.matrix.android.api.NoOpMatrixCallback
import im.vector.matrix.android.api.query.QueryStringValue
import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.api.session.group.groupSummaryQueryParams
@ -74,6 +75,11 @@ class GroupListViewModel @AssistedInject constructor(@Assisted initialState: Gro
}
val optionGroup = Option.just(groupSummary)
selectedGroupStore.post(optionGroup)
} else {
// If selected group is null we force to default. It can happens when leaving the selected group.
setState {
copy(selectedGroup = this.asyncGroups()?.find { it.groupId == ALL_COMMUNITIES_GROUP_ID })
}
}
}
}
@ -88,6 +94,8 @@ class GroupListViewModel @AssistedInject constructor(@Assisted initialState: Gro
private fun handleSelectGroup(action: GroupListAction.SelectGroup) = withState { state ->
if (state.selectedGroup?.groupId != action.groupSummary.groupId) {
// We take care of refreshing group data when selecting to be sure we get all the rooms and users
session.getGroup(action.groupSummary.groupId)?.fetchGroupData(NoOpMatrixCallback())
setState { copy(selectedGroup = action.groupSummary) }
}
}