Add startWith and logs
This commit is contained in:
parent
f098d6cf5b
commit
6ec2dd8c00
@ -34,6 +34,7 @@ import im.vector.matrix.android.internal.crypto.model.CryptoDeviceInfo
|
|||||||
import io.reactivex.Observable
|
import io.reactivex.Observable
|
||||||
import io.reactivex.Single
|
import io.reactivex.Single
|
||||||
import io.reactivex.functions.BiFunction
|
import io.reactivex.functions.BiFunction
|
||||||
|
import timber.log.Timber
|
||||||
|
|
||||||
class RxRoom(private val room: Room, private val session: Session) {
|
class RxRoom(private val room: Room, private val session: Session) {
|
||||||
|
|
||||||
@ -41,6 +42,7 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
val summaryObservable = room.getRoomSummaryLive()
|
val summaryObservable = room.getRoomSummaryLive()
|
||||||
.asObservable()
|
.asObservable()
|
||||||
.startWith(room.roomSummary().toOptional())
|
.startWith(room.roomSummary().toOptional())
|
||||||
|
.doOnNext { Timber.d("RX: summary emitted for: ${it.getOrNull()?.roomId}") }
|
||||||
|
|
||||||
val memberIdsChangeObservable = summaryObservable
|
val memberIdsChangeObservable = summaryObservable
|
||||||
.map {
|
.map {
|
||||||
@ -54,6 +56,7 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
}
|
}
|
||||||
}.orEmpty()
|
}.orEmpty()
|
||||||
}.distinctUntilChanged()
|
}.distinctUntilChanged()
|
||||||
|
.doOnNext { Timber.d("RX: memberIds emitted. Size: ${it.size}") }
|
||||||
|
|
||||||
// Observe the device info of the users in the room
|
// Observe the device info of the users in the room
|
||||||
val cryptoDeviceInfoObservable = memberIdsChangeObservable
|
val cryptoDeviceInfoObservable = memberIdsChangeObservable
|
||||||
@ -64,7 +67,10 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
// If any key change, emit the userIds list
|
// If any key change, emit the userIds list
|
||||||
membersIds
|
membersIds
|
||||||
}
|
}
|
||||||
|
.startWith(membersIds)
|
||||||
|
.doOnNext { Timber.d("RX: CryptoDeviceInfo emitted. Size: ${it.size}") }
|
||||||
}
|
}
|
||||||
|
.doOnNext { Timber.d("RX: cryptoDeviceInfo emitted 2. Size: ${it.size}") }
|
||||||
|
|
||||||
val roomEncryptionTrustLevelObservable = cryptoDeviceInfoObservable
|
val roomEncryptionTrustLevelObservable = cryptoDeviceInfoObservable
|
||||||
.map { userIds ->
|
.map { userIds ->
|
||||||
@ -74,6 +80,7 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
session.getCrossSigningService().getTrustLevelForUsers(userIds).toOptional()
|
session.getCrossSigningService().getTrustLevelForUsers(userIds).toOptional()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
.doOnNext { Timber.d("RX: roomEncryptionTrustLevel emitted: ${it.getOrNull()?.name}") }
|
||||||
|
|
||||||
return Observable
|
return Observable
|
||||||
.combineLatest<Optional<RoomSummary>, Optional<RoomEncryptionTrustLevel>, Optional<RoomSummary>>(
|
.combineLatest<Optional<RoomSummary>, Optional<RoomEncryptionTrustLevel>, Optional<RoomSummary>>(
|
||||||
@ -85,15 +92,18 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
).toOptional()
|
).toOptional()
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
.doOnNext { Timber.d("RX: final room summary emitted for ${it.getOrNull()?.roomId}") }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun liveRoomMembers(queryParams: RoomMemberQueryParams): Observable<List<RoomMemberSummary>> {
|
fun liveRoomMembers(queryParams: RoomMemberQueryParams): Observable<List<RoomMemberSummary>> {
|
||||||
val roomMembersObservable = room.getRoomMembersLive(queryParams).asObservable()
|
val roomMembersObservable = room.getRoomMembersLive(queryParams).asObservable()
|
||||||
.startWith(room.getRoomMembers(queryParams))
|
.startWith(room.getRoomMembers(queryParams))
|
||||||
|
.doOnNext { Timber.d("RX: room members emitted. Size: ${it.size}") }
|
||||||
|
|
||||||
// TODO Do it only for room members of the room (switchMap)
|
// TODO Do it only for room members of the room (switchMap)
|
||||||
val cryptoDeviceInfoObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
val cryptoDeviceInfoObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
||||||
.startWith(emptyList<CryptoDeviceInfo>())
|
.startWith(emptyList<CryptoDeviceInfo>())
|
||||||
|
.doOnNext { Timber.d("RX: cryptoDeviceInfo emitted. Size: ${it.size}") }
|
||||||
|
|
||||||
return Observable
|
return Observable
|
||||||
.combineLatest<List<RoomMemberSummary>, List<CryptoDeviceInfo>, List<RoomMemberSummary>>(
|
.combineLatest<List<RoomMemberSummary>, List<CryptoDeviceInfo>, List<RoomMemberSummary>>(
|
||||||
@ -112,6 +122,7 @@ class RxRoom(private val room: Room, private val session: Session) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
.doOnNext { Timber.d("RX: final room members emitted. Size: ${it.size}") }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun liveAnnotationSummary(eventId: String): Observable<Optional<EventAnnotationsSummary>> {
|
fun liveAnnotationSummary(eventId: String): Observable<Optional<EventAnnotationsSummary>> {
|
||||||
|
@ -34,15 +34,18 @@ import im.vector.matrix.android.internal.crypto.model.CryptoDeviceInfo
|
|||||||
import io.reactivex.Observable
|
import io.reactivex.Observable
|
||||||
import io.reactivex.Single
|
import io.reactivex.Single
|
||||||
import io.reactivex.functions.BiFunction
|
import io.reactivex.functions.BiFunction
|
||||||
|
import timber.log.Timber
|
||||||
|
|
||||||
class RxSession(private val session: Session) {
|
class RxSession(private val session: Session) {
|
||||||
|
|
||||||
fun liveRoomSummaries(queryParams: RoomSummaryQueryParams): Observable<List<RoomSummary>> {
|
fun liveRoomSummaries(queryParams: RoomSummaryQueryParams): Observable<List<RoomSummary>> {
|
||||||
val summariesObservable = session.getRoomSummariesLive(queryParams).asObservable()
|
val summariesObservable = session.getRoomSummariesLive(queryParams).asObservable()
|
||||||
.startWith(session.getRoomSummaries(queryParams))
|
.startWith(session.getRoomSummaries(queryParams))
|
||||||
|
.doOnNext { Timber.d("RX: summaries emitted: size: ${it.size}") }
|
||||||
|
|
||||||
val cryptoDeviceInfoObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
val cryptoDeviceInfoObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
||||||
.startWith(emptyList<CryptoDeviceInfo>())
|
.startWith(emptyList<CryptoDeviceInfo>())
|
||||||
|
.doOnNext { Timber.d("RX: crypto device info emitted: size: ${it.size}") }
|
||||||
|
|
||||||
return Observable
|
return Observable
|
||||||
.combineLatest<List<RoomSummary>, List<CryptoDeviceInfo>, List<RoomSummary>>(
|
.combineLatest<List<RoomSummary>, List<CryptoDeviceInfo>, List<RoomSummary>>(
|
||||||
@ -60,6 +63,7 @@ class RxSession(private val session: Session) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
.doOnNext { Timber.d("RX: final summaries emitted: size: ${it.size}") }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun liveGroupSummaries(queryParams: GroupSummaryQueryParams): Observable<List<GroupSummary>> {
|
fun liveGroupSummaries(queryParams: GroupSummaryQueryParams): Observable<List<GroupSummary>> {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user