diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/RealmQueryLatch.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/RealmQueryLatch.kt index 2d386eac15..98544d46f7 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/RealmQueryLatch.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/RealmQueryLatch.kt @@ -16,43 +16,36 @@ package im.vector.matrix.android.internal.database -import im.vector.matrix.android.internal.util.createBackgroundHandler import io.realm.* -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.* -class RealmQueryLatch(private val realmConfiguration: RealmConfiguration, - private val realmQueryBuilder: (Realm) -> RealmQuery) { +internal suspend fun awaitNotEmptyResult(realmConfiguration: RealmConfiguration, + timeoutMillis: Long, + builder: (Realm) -> RealmQuery) { + withTimeout(timeoutMillis) { + // Confine Realm interaction to a single thread with Looper. + withContext(Dispatchers.Main) { + val latch = CompletableDeferred() - private companion object { - val QUERY_LATCH_HANDLER = createBackgroundHandler("REALM_QUERY_LATCH") - } + Realm.getInstance(realmConfiguration).use { realm -> + val result = builder(realm).findAllAsync() - @Throws(InterruptedException::class) - fun await(timeout: Long, timeUnit: TimeUnit) { - val realmRef = AtomicReference() - val latch = CountDownLatch(1) - QUERY_LATCH_HANDLER.post { - val realm = Realm.getInstance(realmConfiguration) - realmRef.set(realm) - val result = realmQueryBuilder(realm).findAllAsync() - result.addChangeListener(object : RealmChangeListener> { - override fun onChange(t: RealmResults) { - if (t.isNotEmpty()) { - result.removeChangeListener(this) - latch.countDown() + val listener = object : RealmChangeListener> { + override fun onChange(it: RealmResults) { + if (it.isNotEmpty()) { + result.removeChangeListener(this) + latch.complete(Unit) + } } } - }) - } - try { - latch.await(timeout, timeUnit) - } catch (exception: InterruptedException) { - throw exception - } finally { - QUERY_LATCH_HANDLER.post { - realmRef.getAndSet(null).close() + + result.addChangeListener(listener) + try { + latch.await() + } catch (e: CancellationException) { + result.removeChangeListener(listener) + throw e + } } } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/create/CreateRoomTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/create/CreateRoomTask.kt index 9af8434b7c..970a1fed7e 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/create/CreateRoomTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/create/CreateRoomTask.kt @@ -20,7 +20,7 @@ import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.api.session.room.failure.CreateRoomFailure import im.vector.matrix.android.api.session.room.model.create.CreateRoomParams import im.vector.matrix.android.api.session.room.model.create.CreateRoomResponse -import im.vector.matrix.android.internal.database.RealmQueryLatch +import im.vector.matrix.android.internal.database.awaitNotEmptyResult import im.vector.matrix.android.internal.database.model.RoomEntity import im.vector.matrix.android.internal.database.model.RoomEntityFields import im.vector.matrix.android.internal.database.model.RoomSummaryEntity @@ -34,6 +34,7 @@ import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAcco import im.vector.matrix.android.internal.task.Task import im.vector.matrix.android.internal.util.awaitTransaction import io.realm.RealmConfiguration +import kotlinx.coroutines.TimeoutCancellationException import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -53,13 +54,12 @@ internal class DefaultCreateRoomTask @Inject constructor(private val roomAPI: Ro } val roomId = createRoomResponse.roomId!! // Wait for room to come back from the sync (but it can maybe be in the DB if the sync response is received before) - val rql = RealmQueryLatch(realmConfiguration) { realm -> - realm.where(RoomEntity::class.java) - .equalTo(RoomEntityFields.ROOM_ID, roomId) - } try { - rql.await(timeout = 1L, timeUnit = TimeUnit.MINUTES) - } catch (exception: Exception) { + awaitNotEmptyResult(realmConfiguration, TimeUnit.MINUTES.toMillis(1L)) { realm -> + realm.where(RoomEntity::class.java) + .equalTo(RoomEntityFields.ROOM_ID, roomId) + } + } catch (exception: TimeoutCancellationException) { throw CreateRoomFailure.CreatedWithTimeout } if (params.isDirect()) { diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/joining/JoinRoomTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/joining/JoinRoomTask.kt index 7304c09d57..fbede72520 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/joining/JoinRoomTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/joining/JoinRoomTask.kt @@ -17,7 +17,7 @@ package im.vector.matrix.android.internal.session.room.membership.joining import im.vector.matrix.android.api.session.room.failure.JoinRoomFailure -import im.vector.matrix.android.internal.database.RealmQueryLatch +import im.vector.matrix.android.internal.database.awaitNotEmptyResult import im.vector.matrix.android.internal.database.model.RoomEntity import im.vector.matrix.android.internal.database.model.RoomEntityFields import im.vector.matrix.android.internal.di.SessionDatabase @@ -26,6 +26,7 @@ import im.vector.matrix.android.internal.session.room.RoomAPI import im.vector.matrix.android.internal.session.room.read.SetReadMarkersTask import im.vector.matrix.android.internal.task.Task import io.realm.RealmConfiguration +import kotlinx.coroutines.TimeoutCancellationException import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -46,18 +47,16 @@ internal class DefaultJoinRoomTask @Inject constructor(private val roomAPI: Room executeRequest { apiCall = roomAPI.join(params.roomId, params.viaServers, mapOf("reason" to params.reason)) } - val roomId = params.roomId // Wait for room to come back from the sync (but it can maybe be in the DB is the sync response is received before) - val rql = RealmQueryLatch(realmConfiguration) { realm -> - realm.where(RoomEntity::class.java) - .equalTo(RoomEntityFields.ROOM_ID, roomId) - } try { - rql.await(timeout = 1L, timeUnit = TimeUnit.MINUTES) - } catch (exception: Exception) { + awaitNotEmptyResult(realmConfiguration, TimeUnit.MINUTES.toMillis(1L)) { realm -> + realm.where(RoomEntity::class.java) + .equalTo(RoomEntityFields.ROOM_ID, params.roomId) + } + } catch (exception: TimeoutCancellationException) { throw JoinRoomFailure.JoinedWithTimeout } - setReadMarkers(roomId) + setReadMarkers(params.roomId) } private suspend fun setReadMarkers(roomId: String) {