diff --git a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/LiveDataObservable.kt b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/LiveDataObservable.kt index 7958d3efa1..302e775056 100644 --- a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/LiveDataObservable.kt +++ b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/LiveDataObservable.kt @@ -20,6 +20,7 @@ import androidx.lifecycle.LiveData import androidx.lifecycle.Observer import io.reactivex.Observable import io.reactivex.android.MainThreadDisposable +import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.schedulers.Schedulers private class LiveDataObservable( @@ -57,6 +58,14 @@ private class LiveDataObservable( } } -fun LiveData.asObservable(): Observable { +internal fun LiveData.asObservable(): Observable { return LiveDataObservable(this).observeOn(Schedulers.computation()) } + +internal fun Observable.startWithCallable(supplier: () -> T): Observable { + val startObservable = Observable + .fromCallable(supplier) + .subscribeOn(Schedulers.io()) + .observeOn(AndroidSchedulers.mainThread()) + return startWith(startObservable) +} diff --git a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxRoom.kt b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxRoom.kt index 9491a69ef1..ef55b090b7 100644 --- a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxRoom.kt +++ b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxRoom.kt @@ -35,27 +35,37 @@ class RxRoom(private val room: Room) { fun liveRoomSummary(): Observable> { return room.getRoomSummaryLive().asObservable() - .startWith(room.roomSummary().toOptional()) + .startWithCallable { + room.roomSummary().toOptional() + } } fun liveRoomMembers(queryParams: RoomMemberQueryParams): Observable> { return room.getRoomMembersLive(queryParams).asObservable() - .startWith(room.getRoomMembers(queryParams)) + .startWithCallable { + room.getRoomMembers(queryParams) + } } fun liveAnnotationSummary(eventId: String): Observable> { return room.getEventAnnotationsSummaryLive(eventId).asObservable() - .startWith(room.getEventAnnotationsSummary(eventId).toOptional()) + .startWithCallable { + room.getEventAnnotationsSummary(eventId).toOptional() + } } fun liveTimelineEvent(eventId: String): Observable> { return room.getTimeLineEventLive(eventId).asObservable() - .startWith(room.getTimeLineEvent(eventId).toOptional()) + .startWithCallable { + room.getTimeLineEvent(eventId).toOptional() + } } fun liveStateEvent(eventType: String): Observable> { return room.getStateEventLive(eventType).asObservable() - .startWith(room.getStateEvent(eventType).toOptional()) + .startWithCallable { + room.getStateEvent(eventType).toOptional() + } } fun liveReadMarker(): Observable> { diff --git a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxSession.kt b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxSession.kt index 406e274258..edba4ca9bf 100644 --- a/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxSession.kt +++ b/matrix-sdk-android-rx/src/main/java/im/vector/matrix/rx/RxSession.kt @@ -36,17 +36,23 @@ class RxSession(private val session: Session) { fun liveRoomSummaries(queryParams: RoomSummaryQueryParams): Observable> { return session.getRoomSummariesLive(queryParams).asObservable() - .startWith(session.getRoomSummaries(queryParams)) + .startWithCallable { + session.getRoomSummaries(queryParams) + } } fun liveGroupSummaries(queryParams: GroupSummaryQueryParams): Observable> { return session.getGroupSummariesLive(queryParams).asObservable() - .startWith(session.getGroupSummaries(queryParams)) + .startWithCallable { + session.getGroupSummaries(queryParams) + } } fun liveBreadcrumbs(): Observable> { return session.getBreadcrumbsLive().asObservable() - .startWith(session.getBreadcrumbs()) + .startWithCallable { + session.getBreadcrumbs() + } } fun liveSyncState(): Observable { @@ -59,7 +65,9 @@ class RxSession(private val session: Session) { fun liveUser(userId: String): Observable> { return session.getUserLive(userId).asObservable() - .startWith(session.getUser(userId).toOptional()) + .startWithCallable { + session.getUser(userId).toOptional() + } } fun liveUsers(): Observable> {