Improve Rx chain and cleanup
This commit is contained in:
parent
51e0f945a7
commit
c3c88c387b
|
@ -17,7 +17,6 @@
|
|||
package im.vector.matrix.rx
|
||||
|
||||
import im.vector.matrix.android.api.crypto.RoomEncryptionTrustLevel
|
||||
import im.vector.matrix.android.api.extensions.orFalse
|
||||
import im.vector.matrix.android.api.session.Session
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.api.session.room.Room
|
||||
|
@ -31,7 +30,6 @@ import im.vector.matrix.android.api.session.room.send.UserDraft
|
|||
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
|
||||
import im.vector.matrix.android.api.util.Optional
|
||||
import im.vector.matrix.android.api.util.toOptional
|
||||
import im.vector.matrix.android.internal.crypto.model.CryptoDeviceInfo
|
||||
import io.reactivex.Observable
|
||||
import io.reactivex.Single
|
||||
import io.reactivex.functions.BiFunction
|
||||
|
@ -39,32 +37,50 @@ import io.reactivex.functions.BiFunction
|
|||
class RxRoom(private val room: Room, private val session: Session) {
|
||||
|
||||
fun liveRoomSummary(): Observable<Optional<RoomSummary>> {
|
||||
val summaryObservable = room.getRoomSummaryLive().asObservable()
|
||||
val summaryObservable = room.getRoomSummaryLive()
|
||||
.asObservable()
|
||||
.startWith(room.roomSummary().toOptional())
|
||||
|
||||
val memberChangeObserver = summaryObservable.map {
|
||||
it.getOrNull()?.otherMemberIds ?: emptyList()
|
||||
val memberIdsChangeObservable = summaryObservable
|
||||
.map {
|
||||
it.getOrNull()?.let { roomSummary ->
|
||||
if (roomSummary.isEncrypted) {
|
||||
// Return the list of other users
|
||||
roomSummary.otherMemberIds
|
||||
} else {
|
||||
// Return an empty list, the room is not encrypted
|
||||
emptyList()
|
||||
}
|
||||
}.orEmpty()
|
||||
}.distinctUntilChanged()
|
||||
|
||||
val keyObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
||||
|
||||
val subObserver = Observable
|
||||
.combineLatest<List<String>, List<CryptoDeviceInfo>, RoomEncryptionTrustLevel>(
|
||||
memberChangeObserver,
|
||||
keyObservable,
|
||||
BiFunction { userList, _ ->
|
||||
session.getCrossSigningService().getTrustLevelForUsers(userList)
|
||||
// Observe the device info of the users in the room
|
||||
val cryptoDeviceInfoObservable = memberIdsChangeObservable
|
||||
.switchMap { otherUserIds ->
|
||||
session.getLiveCryptoDeviceInfo(otherUserIds)
|
||||
.asObservable()
|
||||
.map {
|
||||
// If any key change, emit the userIds list
|
||||
otherUserIds
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
val roomEncryptionTrustLevelObservable = cryptoDeviceInfoObservable
|
||||
.map { otherUserIds ->
|
||||
if (otherUserIds.isEmpty()) {
|
||||
Optional<RoomEncryptionTrustLevel>(null)
|
||||
} else {
|
||||
session.getCrossSigningService().getTrustLevelForUsers(otherUserIds).toOptional()
|
||||
}
|
||||
}
|
||||
|
||||
return Observable
|
||||
.combineLatest<Optional<RoomSummary>, RoomEncryptionTrustLevel, Optional<RoomSummary>>(
|
||||
.combineLatest<Optional<RoomSummary>, Optional<RoomEncryptionTrustLevel>, Optional<RoomSummary>>(
|
||||
summaryObservable,
|
||||
subObserver,
|
||||
roomEncryptionTrustLevelObservable,
|
||||
BiFunction { summary, level ->
|
||||
summary.getOrNull()?.copy(
|
||||
roomEncryptionTrustLevel = if (summary.getOrNull()?.isEncrypted.orFalse()) level else null
|
||||
roomEncryptionTrustLevel = level.getOrNull()
|
||||
).toOptional()
|
||||
}
|
||||
)
|
||||
|
|
|
@ -38,22 +38,24 @@ import io.reactivex.functions.BiFunction
|
|||
class RxSession(private val session: Session) {
|
||||
|
||||
fun liveRoomSummaries(queryParams: RoomSummaryQueryParams): Observable<List<RoomSummary>> {
|
||||
val summaryObservable = session.getRoomSummariesLive(queryParams).asObservable()
|
||||
val summariesObservable = session.getRoomSummariesLive(queryParams).asObservable()
|
||||
.startWith(session.getRoomSummaries(queryParams))
|
||||
|
||||
val keyObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
||||
val cryptoDeviceInfoObservable = session.getLiveCryptoDeviceInfo().asObservable()
|
||||
|
||||
return Observable
|
||||
.combineLatest<List<RoomSummary>, List<CryptoDeviceInfo>, List<RoomSummary>>(
|
||||
summaryObservable,
|
||||
keyObservable,
|
||||
summariesObservable,
|
||||
cryptoDeviceInfoObservable,
|
||||
BiFunction { summaries, _ ->
|
||||
summaries.map {
|
||||
if (it.isEncrypted) it.copy(
|
||||
roomEncryptionTrustLevel = session.getCrossSigningService().getTrustLevelForUsers(
|
||||
it.otherMemberIds
|
||||
if (it.isEncrypted) {
|
||||
it.copy(
|
||||
roomEncryptionTrustLevel = session.getCrossSigningService().getTrustLevelForUsers(it.otherMemberIds)
|
||||
)
|
||||
) else it
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
|
@ -120,9 +120,12 @@ interface CryptoService {
|
|||
|
||||
fun getCryptoDeviceInfo(userId: String): List<CryptoDeviceInfo>
|
||||
|
||||
fun getLiveCryptoDeviceInfo(userId: String): LiveData<List<CryptoDeviceInfo>>
|
||||
fun getLiveCryptoDeviceInfo(): LiveData<List<CryptoDeviceInfo>>
|
||||
|
||||
fun getLiveCryptoDeviceInfo(userId: String): LiveData<List<CryptoDeviceInfo>>
|
||||
|
||||
fun getLiveCryptoDeviceInfo(userIds: List<String>): LiveData<List<CryptoDeviceInfo>>
|
||||
|
||||
fun addNewSessionListener(newSessionListener: NewSessionListener)
|
||||
|
||||
fun removeSessionListener(listener: NewSessionListener)
|
||||
|
|
|
@ -19,7 +19,6 @@ package im.vector.matrix.android.api.session.crypto.crosssigning
|
|||
import androidx.lifecycle.LiveData
|
||||
import im.vector.matrix.android.api.MatrixCallback
|
||||
import im.vector.matrix.android.api.crypto.RoomEncryptionTrustLevel
|
||||
import im.vector.matrix.android.api.session.room.model.RoomSummary
|
||||
import im.vector.matrix.android.api.util.Optional
|
||||
import im.vector.matrix.android.internal.crypto.crosssigning.DeviceTrustResult
|
||||
import im.vector.matrix.android.internal.crypto.crosssigning.UserTrustResult
|
||||
|
@ -65,5 +64,5 @@ interface CrossSigningService {
|
|||
otherDeviceId: String,
|
||||
locallyTrusted: Boolean?): DeviceTrustResult
|
||||
|
||||
fun getTrustLevelForUsers(userList: List<String>): RoomEncryptionTrustLevel
|
||||
fun getTrustLevelForUsers(userIds: List<String>): RoomEncryptionTrustLevel
|
||||
}
|
||||
|
|
|
@ -403,11 +403,16 @@ internal class DefaultCryptoService @Inject constructor(
|
|||
return cryptoStore.getUserDevices(userId)?.map { it.value } ?: emptyList()
|
||||
}
|
||||
|
||||
override fun getLiveCryptoDeviceInfo(): LiveData<List<CryptoDeviceInfo>> {
|
||||
return cryptoStore.getLiveDeviceList()
|
||||
}
|
||||
|
||||
override fun getLiveCryptoDeviceInfo(userId: String): LiveData<List<CryptoDeviceInfo>> {
|
||||
return cryptoStore.getLiveDeviceList(userId)
|
||||
}
|
||||
override fun getLiveCryptoDeviceInfo(): LiveData<List<CryptoDeviceInfo>> {
|
||||
return cryptoStore.getLiveDeviceList()
|
||||
|
||||
override fun getLiveCryptoDeviceInfo(userIds: List<String>): LiveData<List<CryptoDeviceInfo>> {
|
||||
return cryptoStore.getLiveDeviceList(userIds)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -658,25 +658,33 @@ internal class DefaultCrossSigningService @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
override fun getTrustLevelForUsers(userList: List<String>): RoomEncryptionTrustLevel {
|
||||
val atLeastOneTrusted = userList
|
||||
override fun getTrustLevelForUsers(userIds: List<String>): RoomEncryptionTrustLevel {
|
||||
val atLeastOneTrusted = userIds
|
||||
.filter { it != userId }
|
||||
.map { getUserCrossSigningKeys(it) }
|
||||
.any { it?.isTrusted() == true }
|
||||
|
||||
if (!atLeastOneTrusted) {
|
||||
return RoomEncryptionTrustLevel.Default
|
||||
return if (!atLeastOneTrusted) {
|
||||
RoomEncryptionTrustLevel.Default
|
||||
} else {
|
||||
// I have verified at least one other user
|
||||
val allDevices = userList.mapNotNull {
|
||||
val allDevices = userIds.mapNotNull {
|
||||
cryptoStore.getUserDeviceList(it)
|
||||
}.flatten()
|
||||
if (getMyCrossSigningKeys() != null) {
|
||||
val hasWarning = allDevices.any { !it.trustLevel?.crossSigningVerified.orFalse() }
|
||||
return if (hasWarning) RoomEncryptionTrustLevel.Warning else RoomEncryptionTrustLevel.Trusted
|
||||
if (hasWarning) {
|
||||
RoomEncryptionTrustLevel.Warning
|
||||
} else {
|
||||
val hasWarningLegacy = allDevices.any { !it.trustLevel?.crossSigningVerified.orFalse() }
|
||||
return if (hasWarningLegacy) RoomEncryptionTrustLevel.Warning else RoomEncryptionTrustLevel.Trusted
|
||||
RoomEncryptionTrustLevel.Trusted
|
||||
}
|
||||
} else {
|
||||
val hasWarningLegacy = allDevices.any { !it.isVerified }
|
||||
if (hasWarningLegacy) {
|
||||
RoomEncryptionTrustLevel.Warning
|
||||
} else {
|
||||
RoomEncryptionTrustLevel.Trusted
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -200,6 +200,9 @@ internal interface IMXCryptoStore {
|
|||
fun getUserDeviceList(userId: String): List<CryptoDeviceInfo>?
|
||||
|
||||
fun getLiveDeviceList(userId: String): LiveData<List<CryptoDeviceInfo>>
|
||||
|
||||
fun getLiveDeviceList(userIds: List<String>): LiveData<List<CryptoDeviceInfo>>
|
||||
|
||||
//TODO temp
|
||||
fun getLiveDeviceList(): LiveData<List<CryptoDeviceInfo>>
|
||||
/**
|
||||
|
|
|
@ -398,6 +398,22 @@ internal class RealmCryptoStore(private val realmConfiguration: RealmConfigurati
|
|||
}
|
||||
}
|
||||
|
||||
override fun getLiveDeviceList(userIds: List<String>): LiveData<List<CryptoDeviceInfo>> {
|
||||
val liveData = monarchy.findAllMappedWithChanges(
|
||||
{ realm: Realm ->
|
||||
realm
|
||||
.where<UserEntity>()
|
||||
.`in`(UserEntityFields.USER_ID, userIds.toTypedArray())
|
||||
},
|
||||
{ entity ->
|
||||
entity.devices.map { CryptoMapper.mapToModel(it) }
|
||||
}
|
||||
)
|
||||
return Transformations.map(liveData) {
|
||||
it.firstOrNull() ?: emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
override fun getLiveDeviceList(): LiveData<List<CryptoDeviceInfo>> {
|
||||
val liveData = monarchy.findAllMappedWithChanges(
|
||||
{ realm: Realm ->
|
||||
|
|
Loading…
Reference in New Issue