Timeline: fix potential issues when starting/disposing the timeline
This commit is contained in:
parent
1931a1a4a4
commit
77de059dc9
|
@ -50,6 +50,8 @@ import im.vector.matrix.android.internal.session.room.relation.FindReactionEvent
|
||||||
import im.vector.matrix.android.internal.session.room.relation.UpdateQuickReactionTask
|
import im.vector.matrix.android.internal.session.room.relation.UpdateQuickReactionTask
|
||||||
import im.vector.matrix.android.internal.session.room.state.DefaultSendStateTask
|
import im.vector.matrix.android.internal.session.room.state.DefaultSendStateTask
|
||||||
import im.vector.matrix.android.internal.session.room.state.SendStateTask
|
import im.vector.matrix.android.internal.session.room.state.SendStateTask
|
||||||
|
import im.vector.matrix.android.internal.session.room.timeline.*
|
||||||
|
import im.vector.matrix.android.internal.session.room.timeline.ClearUnlinkedEventsTask
|
||||||
import im.vector.matrix.android.internal.session.room.timeline.DefaultGetContextOfEventTask
|
import im.vector.matrix.android.internal.session.room.timeline.DefaultGetContextOfEventTask
|
||||||
import im.vector.matrix.android.internal.session.room.timeline.DefaultPaginationTask
|
import im.vector.matrix.android.internal.session.room.timeline.DefaultPaginationTask
|
||||||
import im.vector.matrix.android.internal.session.room.timeline.GetContextOfEventTask
|
import im.vector.matrix.android.internal.session.room.timeline.GetContextOfEventTask
|
||||||
|
@ -120,6 +122,9 @@ internal abstract class RoomModule {
|
||||||
@Binds
|
@Binds
|
||||||
abstract fun bindGetContextOfEventTask(getContextOfEventTask: DefaultGetContextOfEventTask): GetContextOfEventTask
|
abstract fun bindGetContextOfEventTask(getContextOfEventTask: DefaultGetContextOfEventTask): GetContextOfEventTask
|
||||||
|
|
||||||
|
@Binds
|
||||||
|
abstract fun bindClearUnlinkedEventsTask(clearUnlinkedEventsTask: DefaultClearUnlinkedEventsTask): ClearUnlinkedEventsTask
|
||||||
|
|
||||||
@Binds
|
@Binds
|
||||||
abstract fun bindPaginationTask(paginationTask: DefaultPaginationTask): PaginationTask
|
abstract fun bindPaginationTask(paginationTask: DefaultPaginationTask): PaginationTask
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
|
||||||
|
* Copyright 2019 New Vector Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
|
||||||
|
*/
|
||||||
|
package im.vector.matrix.android.internal.session.room.timeline
|
||||||
|
|
||||||
|
import com.zhuinden.monarchy.Monarchy
|
||||||
|
import im.vector.matrix.android.internal.database.helper.deleteOnCascade
|
||||||
|
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||||
|
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
|
||||||
|
import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||||
|
import im.vector.matrix.android.internal.database.query.where
|
||||||
|
import im.vector.matrix.android.internal.task.Task
|
||||||
|
import im.vector.matrix.android.internal.util.awaitTransaction
|
||||||
|
import javax.inject.Inject
|
||||||
|
|
||||||
|
internal interface ClearUnlinkedEventsTask : Task<ClearUnlinkedEventsTask.Params, Unit> {
|
||||||
|
|
||||||
|
data class Params(val roomId: String)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class DefaultClearUnlinkedEventsTask @Inject constructor(private val monarchy: Monarchy) : ClearUnlinkedEventsTask {
|
||||||
|
|
||||||
|
override suspend fun execute(params: ClearUnlinkedEventsTask.Params) {
|
||||||
|
monarchy.awaitTransaction { localRealm ->
|
||||||
|
val unlinkedChunks = ChunkEntity
|
||||||
|
.where(localRealm, roomId = params.roomId)
|
||||||
|
.equalTo("${ChunkEntityFields.TIMELINE_EVENTS.ROOT}.${EventEntityFields.IS_UNLINKED}", true)
|
||||||
|
.findAll()
|
||||||
|
unlinkedChunks.forEach {
|
||||||
|
it.deleteOnCascade()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -53,6 +53,8 @@ import io.realm.RealmConfiguration
|
||||||
import io.realm.RealmQuery
|
import io.realm.RealmQuery
|
||||||
import io.realm.RealmResults
|
import io.realm.RealmResults
|
||||||
import io.realm.Sort
|
import io.realm.Sort
|
||||||
|
import kotlinx.coroutines.GlobalScope
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
@ -71,6 +73,7 @@ internal class DefaultTimeline(
|
||||||
private val realmConfiguration: RealmConfiguration,
|
private val realmConfiguration: RealmConfiguration,
|
||||||
private val taskExecutor: TaskExecutor,
|
private val taskExecutor: TaskExecutor,
|
||||||
private val contextOfEventTask: GetContextOfEventTask,
|
private val contextOfEventTask: GetContextOfEventTask,
|
||||||
|
private val clearUnlinkedEventsTask: ClearUnlinkedEventsTask,
|
||||||
private val paginationTask: PaginationTask,
|
private val paginationTask: PaginationTask,
|
||||||
private val cryptoService: CryptoService,
|
private val cryptoService: CryptoService,
|
||||||
private val timelineEventMapper: TimelineEventMapper,
|
private val timelineEventMapper: TimelineEventMapper,
|
||||||
|
@ -180,7 +183,6 @@ internal class DefaultTimeline(
|
||||||
eventDecryptor.start()
|
eventDecryptor.start()
|
||||||
val realm = Realm.getInstance(realmConfiguration)
|
val realm = Realm.getInstance(realmConfiguration)
|
||||||
backgroundRealm.set(realm)
|
backgroundRealm.set(realm)
|
||||||
clearUnlinkedEvents(realm)
|
|
||||||
|
|
||||||
roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()?.also {
|
roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()?.also {
|
||||||
it.sendingTimelineEvents.addChangeListener { _ ->
|
it.sendingTimelineEvents.addChangeListener { _ ->
|
||||||
|
@ -212,20 +214,28 @@ internal class DefaultTimeline(
|
||||||
isReady.set(false)
|
isReady.set(false)
|
||||||
Timber.v("Dispose timeline for roomId: $roomId and eventId: $initialEventId")
|
Timber.v("Dispose timeline for roomId: $roomId and eventId: $initialEventId")
|
||||||
cancelableBag.cancel()
|
cancelableBag.cancel()
|
||||||
|
BACKGROUND_HANDLER.removeCallbacksAndMessages(null)
|
||||||
BACKGROUND_HANDLER.post {
|
BACKGROUND_HANDLER.post {
|
||||||
roomEntity?.sendingTimelineEvents?.removeAllChangeListeners()
|
roomEntity?.sendingTimelineEvents?.removeAllChangeListeners()
|
||||||
eventRelations.removeAllChangeListeners()
|
if (this::eventRelations.isInitialized) {
|
||||||
filteredEvents.removeAllChangeListeners()
|
eventRelations.removeAllChangeListeners()
|
||||||
|
}
|
||||||
|
if (this::filteredEvents.isInitialized) {
|
||||||
|
filteredEvents.removeAllChangeListeners()
|
||||||
|
}
|
||||||
hiddenReadMarker.dispose()
|
hiddenReadMarker.dispose()
|
||||||
if (settings.buildReadReceipts) {
|
if (settings.buildReadReceipts) {
|
||||||
hiddenReadReceipts.dispose()
|
hiddenReadReceipts.dispose()
|
||||||
}
|
}
|
||||||
clearAllValues()
|
clearAllValues()
|
||||||
backgroundRealm.getAndSet(null).also {
|
backgroundRealm.getAndSet(null).also {
|
||||||
it.close()
|
it?.close()
|
||||||
}
|
}
|
||||||
eventDecryptor.destroy()
|
eventDecryptor.destroy()
|
||||||
}
|
}
|
||||||
|
clearUnlinkedEventsTask
|
||||||
|
.configureWith(ClearUnlinkedEventsTask.Params(roomId))
|
||||||
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,9 +509,9 @@ internal class DefaultTimeline(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val params = PaginationTask.Params(roomId = roomId,
|
val params = PaginationTask.Params(roomId = roomId,
|
||||||
from = token,
|
from = token,
|
||||||
direction = direction.toPaginationDirection(),
|
direction = direction.toPaginationDirection(),
|
||||||
limit = limit)
|
limit = limit)
|
||||||
|
|
||||||
Timber.v("Should fetch $limit items $direction")
|
Timber.v("Should fetch $limit items $direction")
|
||||||
cancelableBag += paginationTask
|
cancelableBag += paginationTask
|
||||||
|
@ -577,7 +587,7 @@ internal class DefaultTimeline(
|
||||||
val timelineEvent = buildTimelineEvent(eventEntity)
|
val timelineEvent = buildTimelineEvent(eventEntity)
|
||||||
|
|
||||||
if (timelineEvent.isEncrypted()
|
if (timelineEvent.isEncrypted()
|
||||||
&& timelineEvent.root.mxDecryptionResult == null) {
|
&& timelineEvent.root.mxDecryptionResult == null) {
|
||||||
timelineEvent.root.eventId?.let { eventDecryptor.requestDecryption(it) }
|
timelineEvent.root.eventId?.let { eventDecryptor.requestDecryption(it) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -639,18 +649,6 @@ internal class DefaultTimeline(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun clearUnlinkedEvents(realm: Realm) {
|
|
||||||
realm.executeTransaction { localRealm ->
|
|
||||||
val unlinkedChunks = ChunkEntity
|
|
||||||
.where(localRealm, roomId = roomId)
|
|
||||||
.equalTo("${ChunkEntityFields.TIMELINE_EVENTS.ROOT}.${EventEntityFields.IS_UNLINKED}", true)
|
|
||||||
.findAll()
|
|
||||||
unlinkedChunks.forEach {
|
|
||||||
it.deleteOnCascade()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun fetchEvent(eventId: String) {
|
private fun fetchEvent(eventId: String) {
|
||||||
val params = GetContextOfEventTask.Params(roomId, eventId)
|
val params = GetContextOfEventTask.Params(roomId, eventId)
|
||||||
cancelableBag += contextOfEventTask.configureWith(params).executeBy(taskExecutor)
|
cancelableBag += contextOfEventTask.configureWith(params).executeBy(taskExecutor)
|
||||||
|
|
|
@ -41,7 +41,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
|
||||||
private val cryptoService: CryptoService,
|
private val cryptoService: CryptoService,
|
||||||
private val paginationTask: PaginationTask,
|
private val paginationTask: PaginationTask,
|
||||||
private val timelineEventMapper: TimelineEventMapper,
|
private val timelineEventMapper: TimelineEventMapper,
|
||||||
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper
|
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
|
||||||
|
private val clearUnlinkedEventsTask: ClearUnlinkedEventsTask
|
||||||
) : TimelineService {
|
) : TimelineService {
|
||||||
|
|
||||||
@AssistedInject.Factory
|
@AssistedInject.Factory
|
||||||
|
@ -55,6 +56,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
|
||||||
monarchy.realmConfiguration,
|
monarchy.realmConfiguration,
|
||||||
taskExecutor,
|
taskExecutor,
|
||||||
contextOfEventTask,
|
contextOfEventTask,
|
||||||
|
clearUnlinkedEventsTask,
|
||||||
paginationTask,
|
paginationTask,
|
||||||
cryptoService,
|
cryptoService,
|
||||||
timelineEventMapper,
|
timelineEventMapper,
|
||||||
|
|
|
@ -26,6 +26,7 @@ import im.vector.matrix.android.internal.database.query.where
|
||||||
import io.realm.Realm
|
import io.realm.Realm
|
||||||
import io.realm.RealmConfiguration
|
import io.realm.RealmConfiguration
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
|
import java.util.concurrent.ExecutorService
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
|
|
||||||
|
@ -53,18 +54,20 @@ internal class TimelineEventDecryptor(
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private val executor = Executors.newSingleThreadExecutor()
|
private var executor: ExecutorService? = null
|
||||||
|
|
||||||
private val existingRequests = HashSet<String>()
|
private val existingRequests = HashSet<String>()
|
||||||
private val unknownSessionsFailure = HashMap<String, MutableList<String>>()
|
private val unknownSessionsFailure = HashMap<String, MutableList<String>>()
|
||||||
|
|
||||||
fun start() {
|
fun start() {
|
||||||
|
executor = Executors.newSingleThreadExecutor()
|
||||||
cryptoService.addNewSessionListener(newSessionListener)
|
cryptoService.addNewSessionListener(newSessionListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun destroy() {
|
fun destroy() {
|
||||||
cryptoService.removeSessionListener(newSessionListener)
|
cryptoService.removeSessionListener(newSessionListener)
|
||||||
executor.shutdownNow()
|
executor?.shutdownNow()
|
||||||
|
executor = null
|
||||||
unknownSessionsFailure.clear()
|
unknownSessionsFailure.clear()
|
||||||
existingRequests.clear()
|
existingRequests.clear()
|
||||||
}
|
}
|
||||||
|
@ -85,11 +88,9 @@ internal class TimelineEventDecryptor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
executor.execute {
|
executor?.execute {
|
||||||
Realm.getInstance(realmConfiguration).use { realm ->
|
Realm.getInstance(realmConfiguration).use { realm ->
|
||||||
realm.executeTransaction {
|
processDecryptRequest(eventId, realm)
|
||||||
processDecryptRequest(eventId, it)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,12 +105,16 @@ internal class TimelineEventDecryptor(
|
||||||
try {
|
try {
|
||||||
val result = cryptoService.decryptEvent(event, timelineId)
|
val result = cryptoService.decryptEvent(event, timelineId)
|
||||||
Timber.v("Successfully decrypted event ${eventId}")
|
Timber.v("Successfully decrypted event ${eventId}")
|
||||||
eventEntity.setDecryptionResult(result)
|
realm.executeTransaction {
|
||||||
|
eventEntity.setDecryptionResult(result)
|
||||||
|
}
|
||||||
} catch (e: MXCryptoError) {
|
} catch (e: MXCryptoError) {
|
||||||
Timber.v("Failed to decrypt event ${eventId} ${e}")
|
Timber.v("Failed to decrypt event ${eventId} ${e}")
|
||||||
if (e is MXCryptoError.Base && e.errorType == MXCryptoError.ErrorType.UNKNOWN_INBOUND_SESSION_ID) {
|
if (e is MXCryptoError.Base && e.errorType == MXCryptoError.ErrorType.UNKNOWN_INBOUND_SESSION_ID) {
|
||||||
//Keep track of unknown sessions to automatically try to decrypt on new session
|
//Keep track of unknown sessions to automatically try to decrypt on new session
|
||||||
eventEntity.decryptionErrorCode = e.errorType.name
|
realm.executeTransaction {
|
||||||
|
eventEntity.decryptionErrorCode = e.errorType.name
|
||||||
|
}
|
||||||
event.content?.toModel<EncryptedEventContent>()?.let { content ->
|
event.content?.toModel<EncryptedEventContent>()?.let { content ->
|
||||||
content.sessionId?.let { sessionId ->
|
content.sessionId?.let { sessionId ->
|
||||||
synchronized(unknownSessionsFailure) {
|
synchronized(unknownSessionsFailure) {
|
||||||
|
|
|
@ -132,7 +132,9 @@ internal class TimelineHiddenReadReceipts constructor(private val readReceiptsSu
|
||||||
* Dispose the realm query subscription. Has to be called on an HandlerThread
|
* Dispose the realm query subscription. Has to be called on an HandlerThread
|
||||||
*/
|
*/
|
||||||
fun dispose() {
|
fun dispose() {
|
||||||
this.hiddenReadReceipts.removeAllChangeListeners()
|
if (this::hiddenReadReceipts.isInitialized) {
|
||||||
|
this.hiddenReadReceipts.removeAllChangeListeners()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue