Add runBlocking when decrypt events to avoid thread switching when accessing the realm instance (thread local)
This commit is contained in:
parent
ea9c9ae490
commit
f6b38d2b49
@ -20,6 +20,7 @@ import io.realm.Realm
|
||||
import io.realm.RealmQuery
|
||||
import io.realm.Sort
|
||||
import io.realm.kotlin.createObject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
@ -127,7 +128,7 @@ private fun EventEntity.toTimelineEventEntity(roomMemberContentsByUser: HashMap<
|
||||
return timelineEventEntity
|
||||
}
|
||||
|
||||
internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||
internal fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||
threadSummaryType: ThreadSummaryUpdateType,
|
||||
realm: Realm,
|
||||
roomId: String,
|
||||
@ -153,10 +154,18 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||
}
|
||||
|
||||
val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also {
|
||||
decryptIfNeeded(cryptoService, it, roomId)
|
||||
try {
|
||||
decryptIfNeeded(cryptoService, it, roomId)
|
||||
} catch (e: InterruptedException) {
|
||||
Timber.i("Decryption got interrupted")
|
||||
}
|
||||
}
|
||||
val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also {
|
||||
decryptIfNeeded(cryptoService, it, roomId)
|
||||
try {
|
||||
decryptIfNeeded(cryptoService, it, roomId)
|
||||
} catch (e: InterruptedException) {
|
||||
Timber.i("Decryption got interrupted")
|
||||
}
|
||||
}
|
||||
val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId
|
||||
roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId)
|
||||
@ -204,8 +213,8 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) {
|
||||
cryptoService ?: return
|
||||
private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) = runBlocking {
|
||||
cryptoService ?: return@runBlocking
|
||||
val event = eventEntity.asDomain()
|
||||
if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) {
|
||||
try {
|
||||
|
@ -102,11 +102,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||
}
|
||||
|
||||
suspend fun handle(realm: Realm,
|
||||
roomsSyncResponse: RoomsSyncResponse,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter? = null) {
|
||||
fun handle(realm: Realm,
|
||||
roomsSyncResponse: RoomsSyncResponse,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter? = null) {
|
||||
Timber.v("Execute transaction from $this")
|
||||
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
|
||||
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
|
||||
@ -121,11 +121,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
}
|
||||
// PRIVATE METHODS *****************************************************************************
|
||||
|
||||
private suspend fun handleRoomSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
private fun handleRoomSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy,
|
||||
isInitialSync: Boolean,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
val insertType = if (isInitialSync) {
|
||||
EventInsertType.INITIAL_SYNC
|
||||
} else {
|
||||
@ -158,11 +158,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
realm.insertOrUpdate(rooms)
|
||||
}
|
||||
|
||||
private suspend fun insertJoinRoomsFromInitSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy.JOINED,
|
||||
syncLocalTimeStampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
private fun insertJoinRoomsFromInitSync(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy.JOINED,
|
||||
syncLocalTimeStampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator,
|
||||
reporter: ProgressReporter?) {
|
||||
val bestChunkSize = computeBestChunkSize(
|
||||
listSize = handlingStrategy.data.keys.size,
|
||||
limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||
@ -200,12 +200,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun handleJoinedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
|
||||
private fun handleJoinedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
|
||||
Timber.v("Handle join sync for room $roomId")
|
||||
|
||||
val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed)
|
||||
@ -351,15 +351,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
private suspend fun handleTimelineEvents(realm: Realm,
|
||||
roomId: String,
|
||||
roomEntity: RoomEntity,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
|
||||
private fun handleTimelineEvents(realm: Realm,
|
||||
roomId: String,
|
||||
roomEntity: RoomEntity,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long,
|
||||
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
|
||||
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
|
||||
if (isLimited && lastChunk != null) {
|
||||
lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true)
|
||||
@ -387,8 +387,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
||||
|
||||
if (event.isEncrypted() && !isInitialSync) {
|
||||
runBlocking {
|
||||
try {
|
||||
decryptIfNeeded(event, roomId)
|
||||
} catch (e: InterruptedException) {
|
||||
Timber.i("Decryption got interrupted")
|
||||
}
|
||||
}
|
||||
var contentToInject: String? = null
|
||||
@ -504,7 +506,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun decryptIfNeeded(event: Event, roomId: String) {
|
||||
private fun decryptIfNeeded(event: Event, roomId: String) = runBlocking {
|
||||
try {
|
||||
// Event from sync does not have roomId, so add it to the event first
|
||||
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
|
||||
|
Loading…
x
Reference in New Issue
Block a user