Merge pull request #5592 from vector-im/feature/aris/prevent_decryption_fom_suspend_functions
Avoid accessing realm instance from suspend functions
This commit is contained in:
commit
09e8c104bb
|
@ -0,0 +1 @@
|
||||||
|
Improve/fix crashes on messages decryption
|
|
@ -20,6 +20,7 @@ import io.realm.Realm
|
||||||
import io.realm.RealmQuery
|
import io.realm.RealmQuery
|
||||||
import io.realm.Sort
|
import io.realm.Sort
|
||||||
import io.realm.kotlin.createObject
|
import io.realm.kotlin.createObject
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||||
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
||||||
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
|
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
|
||||||
|
@ -127,7 +128,7 @@ private fun EventEntity.toTimelineEventEntity(roomMemberContentsByUser: HashMap<
|
||||||
return timelineEventEntity
|
return timelineEventEntity
|
||||||
}
|
}
|
||||||
|
|
||||||
internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
internal fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||||
threadSummaryType: ThreadSummaryUpdateType,
|
threadSummaryType: ThreadSummaryUpdateType,
|
||||||
realm: Realm,
|
realm: Realm,
|
||||||
roomId: String,
|
roomId: String,
|
||||||
|
@ -153,10 +154,18 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||||
}
|
}
|
||||||
|
|
||||||
val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also {
|
val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also {
|
||||||
decryptIfNeeded(cryptoService, it, roomId)
|
try {
|
||||||
|
decryptIfNeeded(cryptoService, it, roomId)
|
||||||
|
} catch (e: InterruptedException) {
|
||||||
|
Timber.i("Decryption got interrupted")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also {
|
val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also {
|
||||||
decryptIfNeeded(cryptoService, it, roomId)
|
try {
|
||||||
|
decryptIfNeeded(cryptoService, it, roomId)
|
||||||
|
} catch (e: InterruptedException) {
|
||||||
|
Timber.i("Decryption got interrupted")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId
|
val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId
|
||||||
roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId)
|
roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId)
|
||||||
|
@ -204,14 +213,15 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) {
|
private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) {
|
||||||
cryptoService ?: return
|
cryptoService ?: return
|
||||||
val event = eventEntity.asDomain()
|
val event = eventEntity.asDomain()
|
||||||
if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) {
|
if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) {
|
||||||
try {
|
try {
|
||||||
Timber.i("###THREADS ThreadSummaryHelper request decryption for eventId:${event.eventId}")
|
Timber.i("###THREADS ThreadSummaryHelper request decryption for eventId:${event.eventId}")
|
||||||
// Event from sync does not have roomId, so add it to the event first
|
// Event from sync does not have roomId, so add it to the event first
|
||||||
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
|
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
|
||||||
|
val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") }
|
||||||
event.mxDecryptionResult = OlmDecryptionResult(
|
event.mxDecryptionResult = OlmDecryptionResult(
|
||||||
payload = result.clearEvent,
|
payload = result.clearEvent,
|
||||||
senderKey = result.senderCurve25519Key,
|
senderKey = result.senderCurve25519Key,
|
||||||
|
|
|
@ -99,9 +99,7 @@ internal class TimelineEventDecryptor @Inject constructor(
|
||||||
executor?.execute {
|
executor?.execute {
|
||||||
Realm.getInstance(realmConfiguration).use { realm ->
|
Realm.getInstance(realmConfiguration).use { realm ->
|
||||||
try {
|
try {
|
||||||
runBlocking {
|
processDecryptRequest(request, realm)
|
||||||
processDecryptRequest(request, realm)
|
|
||||||
}
|
|
||||||
} catch (e: InterruptedException) {
|
} catch (e: InterruptedException) {
|
||||||
Timber.i("Decryption got interrupted")
|
Timber.i("Decryption got interrupted")
|
||||||
}
|
}
|
||||||
|
@ -121,7 +119,7 @@ internal class TimelineEventDecryptor @Inject constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun processDecryptRequest(request: DecryptionRequest, realm: Realm) {
|
private fun processDecryptRequest(request: DecryptionRequest, realm: Realm) {
|
||||||
val event = request.event
|
val event = request.event
|
||||||
val timelineId = request.timelineId
|
val timelineId = request.timelineId
|
||||||
|
|
||||||
|
@ -132,7 +130,8 @@ internal class TimelineEventDecryptor @Inject constructor(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
val result = cryptoService.decryptEvent(request.event, timelineId)
|
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
|
||||||
|
val result = runBlocking { cryptoService.decryptEvent(request.event, timelineId) }
|
||||||
Timber.v("Successfully decrypted event ${event.eventId}")
|
Timber.v("Successfully decrypted event ${event.eventId}")
|
||||||
realm.executeTransaction {
|
realm.executeTransaction {
|
||||||
val eventId = event.eventId ?: return@executeTransaction
|
val eventId = event.eventId ?: return@executeTransaction
|
||||||
|
|
|
@ -102,11 +102,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun handle(realm: Realm,
|
fun handle(realm: Realm,
|
||||||
roomsSyncResponse: RoomsSyncResponse,
|
roomsSyncResponse: RoomsSyncResponse,
|
||||||
isInitialSync: Boolean,
|
isInitialSync: Boolean,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator,
|
aggregator: SyncResponsePostTreatmentAggregator,
|
||||||
reporter: ProgressReporter? = null) {
|
reporter: ProgressReporter? = null) {
|
||||||
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
|
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
|
||||||
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
|
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
|
||||||
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter)
|
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter)
|
||||||
|
@ -120,11 +120,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
}
|
}
|
||||||
// PRIVATE METHODS *****************************************************************************
|
// PRIVATE METHODS *****************************************************************************
|
||||||
|
|
||||||
private suspend fun handleRoomSync(realm: Realm,
|
private fun handleRoomSync(realm: Realm,
|
||||||
handlingStrategy: HandlingStrategy,
|
handlingStrategy: HandlingStrategy,
|
||||||
isInitialSync: Boolean,
|
isInitialSync: Boolean,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator,
|
aggregator: SyncResponsePostTreatmentAggregator,
|
||||||
reporter: ProgressReporter?) {
|
reporter: ProgressReporter?) {
|
||||||
val insertType = if (isInitialSync) {
|
val insertType = if (isInitialSync) {
|
||||||
EventInsertType.INITIAL_SYNC
|
EventInsertType.INITIAL_SYNC
|
||||||
} else {
|
} else {
|
||||||
|
@ -157,11 +157,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
realm.insertOrUpdate(rooms)
|
realm.insertOrUpdate(rooms)
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun insertJoinRoomsFromInitSync(realm: Realm,
|
private fun insertJoinRoomsFromInitSync(realm: Realm,
|
||||||
handlingStrategy: HandlingStrategy.JOINED,
|
handlingStrategy: HandlingStrategy.JOINED,
|
||||||
syncLocalTimeStampMillis: Long,
|
syncLocalTimeStampMillis: Long,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator,
|
aggregator: SyncResponsePostTreatmentAggregator,
|
||||||
reporter: ProgressReporter?) {
|
reporter: ProgressReporter?) {
|
||||||
val bestChunkSize = computeBestChunkSize(
|
val bestChunkSize = computeBestChunkSize(
|
||||||
listSize = handlingStrategy.data.keys.size,
|
listSize = handlingStrategy.data.keys.size,
|
||||||
limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||||
|
@ -199,12 +199,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun handleJoinedRoom(realm: Realm,
|
private fun handleJoinedRoom(realm: Realm,
|
||||||
roomId: String,
|
roomId: String,
|
||||||
roomSync: RoomSync,
|
roomSync: RoomSync,
|
||||||
insertType: EventInsertType,
|
insertType: EventInsertType,
|
||||||
syncLocalTimestampMillis: Long,
|
syncLocalTimestampMillis: Long,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
|
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
|
||||||
Timber.v("Handle join sync for room $roomId")
|
Timber.v("Handle join sync for room $roomId")
|
||||||
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
||||||
|
|
||||||
|
@ -353,15 +353,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
return roomEntity
|
return roomEntity
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun handleTimelineEvents(realm: Realm,
|
private fun handleTimelineEvents(realm: Realm,
|
||||||
roomId: String,
|
roomId: String,
|
||||||
roomEntity: RoomEntity,
|
roomEntity: RoomEntity,
|
||||||
eventList: List<Event>,
|
eventList: List<Event>,
|
||||||
prevToken: String? = null,
|
prevToken: String? = null,
|
||||||
isLimited: Boolean = true,
|
isLimited: Boolean = true,
|
||||||
insertType: EventInsertType,
|
insertType: EventInsertType,
|
||||||
syncLocalTimestampMillis: Long,
|
syncLocalTimestampMillis: Long,
|
||||||
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
|
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
|
||||||
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
|
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
|
||||||
if (isLimited && lastChunk != null) {
|
if (isLimited && lastChunk != null) {
|
||||||
lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true)
|
lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true)
|
||||||
|
@ -389,8 +389,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync)
|
liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync)
|
||||||
|
|
||||||
if (event.isEncrypted() && !isInitialSync) {
|
if (event.isEncrypted() && !isInitialSync) {
|
||||||
runBlocking {
|
try {
|
||||||
decryptIfNeeded(event, roomId)
|
decryptIfNeeded(event, roomId)
|
||||||
|
} catch (e: InterruptedException) {
|
||||||
|
Timber.i("Decryption got interrupted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var contentToInject: String? = null
|
var contentToInject: String? = null
|
||||||
|
@ -421,7 +423,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
roomId = roomId,
|
roomId = roomId,
|
||||||
eventEntity = eventEntity,
|
eventEntity = eventEntity,
|
||||||
direction = PaginationDirection.FORWARDS,
|
direction = PaginationDirection.FORWARDS,
|
||||||
roomMemberContentsByUser = roomMemberContentsByUser)
|
roomMemberContentsByUser = roomMemberContentsByUser
|
||||||
|
)
|
||||||
if (lightweightSettingsStorage.areThreadMessagesEnabled()) {
|
if (lightweightSettingsStorage.areThreadMessagesEnabled()) {
|
||||||
eventEntity.rootThreadEventId?.let {
|
eventEntity.rootThreadEventId?.let {
|
||||||
// This is a thread event
|
// This is a thread event
|
||||||
|
@ -437,7 +440,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
threadEventEntity = eventEntity,
|
threadEventEntity = eventEntity,
|
||||||
roomMemberContentsByUser = roomMemberContentsByUser,
|
roomMemberContentsByUser = roomMemberContentsByUser,
|
||||||
userId = userId,
|
userId = userId,
|
||||||
roomEntity = roomEntity)
|
roomEntity = roomEntity
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} ?: run {
|
} ?: run {
|
||||||
// This is a normal event or a root thread one
|
// This is a normal event or a root thread one
|
||||||
|
@ -475,7 +479,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
roomId = roomId,
|
roomId = roomId,
|
||||||
realm = realm,
|
realm = realm,
|
||||||
chunkEntity = chunkEntity,
|
chunkEntity = chunkEntity,
|
||||||
currentUserId = userId)
|
currentUserId = userId
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// posting new events to timeline if any is registered
|
// posting new events to timeline if any is registered
|
||||||
|
@ -505,10 +510,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun decryptIfNeeded(event: Event, roomId: String) {
|
private fun decryptIfNeeded(event: Event, roomId: String) {
|
||||||
try {
|
try {
|
||||||
// Event from sync does not have roomId, so add it to the event first
|
// Event from sync does not have roomId, so add it to the event first
|
||||||
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
|
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
|
||||||
|
val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") }
|
||||||
event.mxDecryptionResult = OlmDecryptionResult(
|
event.mxDecryptionResult = OlmDecryptionResult(
|
||||||
payload = result.clearEvent,
|
payload = result.clearEvent,
|
||||||
senderKey = result.senderCurve25519Key,
|
senderKey = result.senderCurve25519Key,
|
||||||
|
|
Loading…
Reference in New Issue