diff --git a/common/src/commonMain/kotlin/com/artemchep/keyguard/common/usecase/impl/WatchtowerSyncerImpl.kt b/common/src/commonMain/kotlin/com/artemchep/keyguard/common/usecase/impl/WatchtowerSyncerImpl.kt index 9a098295..29a6fb5e 100644 --- a/common/src/commonMain/kotlin/com/artemchep/keyguard/common/usecase/impl/WatchtowerSyncerImpl.kt +++ b/common/src/commonMain/kotlin/com/artemchep/keyguard/common/usecase/impl/WatchtowerSyncerImpl.kt @@ -26,13 +26,19 @@ import com.artemchep.keyguard.common.usecase.CipherIncompleteCheck import com.artemchep.keyguard.common.usecase.CipherUnsecureUrlCheck import com.artemchep.keyguard.common.usecase.CipherUrlDuplicateCheck import com.artemchep.keyguard.common.usecase.GetBreaches +import com.artemchep.keyguard.common.usecase.GetCheckPasskeys +import com.artemchep.keyguard.common.usecase.GetCheckPwnedPasswords +import com.artemchep.keyguard.common.usecase.GetCheckPwnedServices +import com.artemchep.keyguard.common.usecase.GetCheckTwoFA import com.artemchep.keyguard.common.usecase.GetCiphers import com.artemchep.keyguard.common.usecase.GetPasskeys import com.artemchep.keyguard.common.usecase.GetTwoFa import com.artemchep.keyguard.common.usecase.GetVaultSession import com.artemchep.keyguard.common.usecase.WatchtowerSyncer +import com.artemchep.keyguard.common.util.int import com.artemchep.keyguard.core.store.DatabaseDispatcher import com.artemchep.keyguard.core.store.DatabaseManager +import com.artemchep.keyguard.data.Database import com.artemchep.keyguard.feature.crashlytics.crashlyticsTap import com.artemchep.keyguard.platform.lifecycle.LeLifecycleState import com.artemchep.keyguard.platform.lifecycle.onState @@ -45,6 +51,10 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach @@ -110,27 +120,22 @@ private class WatchtowerClient( .bind() list.forEach { processor -> val type = processor.type - val version = processor.version() + "_v2" - val cipherIdsFlow = db.watchtowerThreatQueries - .getPendingCipherIds( - type = type, - version = version, - ) - .asFlow() - .mapToList(dispatcher) - .map { ids -> - ids.toSet() + val versionFlow = processor.version() + val requestsFlow = versionFlow + .distinctUntilChanged() + .flatMapLatest { version -> + getPendingCiphersFlow( + db = db, + type = type, + version = version, + ) + .map { ciphers -> version to ciphers } } - val ciphersFlow = getCiphers() - .combine(cipherIdsFlow) { ciphers, ids -> - ciphers - .filter { it.id in ids } - } - ciphersFlow + requestsFlow .debounce(1000L) - .onEach { ciphers -> - val message = "Processing watchtower alert [$type/${version}]: " + + .onEach { (version, ciphers) -> + val message = "Processing watchtower alert [$type/$version]: " + ciphers.joinToString { it.id } logRepository.add(TAG, message) @@ -142,7 +147,7 @@ private class WatchtowerClient( value = r.value, threat = r.threat && !r.cipher.deleted, cipherId = r.cipher.id, - type = processor.type, + type = type, reportedAt = now, version = version, ) @@ -152,6 +157,28 @@ private class WatchtowerClient( .launchIn(this) } } + + private fun getPendingCiphersFlow( + db: Database, + type: Long, + version: String, + ): Flow> { + val cipherIdsFlow = db.watchtowerThreatQueries + .getPendingCipherIds( + type = type, + version = version, + ) + .asFlow() + .mapToList(dispatcher) + .map { ids -> + ids.toSet() + } + return getCiphers() + .combine(cipherIdsFlow) { ciphers, ids -> + ciphers + .filter { it.id in ids } + } + } } data class WatchtowerClientResult( @@ -163,7 +190,7 @@ data class WatchtowerClientResult( interface WatchtowerClientTyped { val type: Long - suspend fun version(): String + fun version(): Flow suspend fun process( ciphers: List, @@ -178,7 +205,7 @@ class WatchtowerPasswordStrength( constructor(directDI: DirectDI) : this( ) - override suspend fun version(): String = "1" + override fun version() = flowOf("1") override suspend fun process( ciphers: List, @@ -198,19 +225,29 @@ class WatchtowerPasswordStrength( class WatchtowerPasswordPwned( private val checkPasswordSetLeak: CheckPasswordSetLeak, + private val getCheckPwnedPasswords: GetCheckPwnedPasswords, ) : WatchtowerClientTyped { override val type: Long get() = DWatchtowerAlertType.PWNED_PASSWORD.value constructor(directDI: DirectDI) : this( checkPasswordSetLeak = directDI.instance(), + getCheckPwnedPasswords = directDI.instance(), ) - override suspend fun version(): String = kotlin.run { + override fun version() = combineJoinToVersion( + getDatabaseVersionFlow(), + getCheckPwnedPasswords() + .map { + it.int.toString() + }, + ) + + private fun getDatabaseVersionFlow() = flow { // Refresh weekly val seconds = Clock.System.now().epochSeconds val weeks = seconds / 604800L - weeks.toString() + emit(weeks.toString()) } override suspend fun process( @@ -276,6 +313,7 @@ class WatchtowerPasswordPwned( class WatchtowerWebsitePwned( private val cipherBreachCheck: CipherBreachCheck, private val getBreaches: GetBreaches, + private val getCheckPwnedServices: GetCheckPwnedServices, ) : WatchtowerClientTyped { override val type: Long get() = DWatchtowerAlertType.PWNED_WEBSITE.value @@ -283,13 +321,22 @@ class WatchtowerWebsitePwned( constructor(directDI: DirectDI) : this( cipherBreachCheck = directDI.instance(), getBreaches = directDI.instance(), + getCheckPwnedServices = directDI.instance(), ) - override suspend fun version(): String = kotlin.run { + override fun version() = combineJoinToVersion( + getDatabaseVersionFlow(), + getCheckPwnedServices() + .map { + it.int.toString() + }, + ) + + private fun getDatabaseVersionFlow() = flow { // Refresh weekly val seconds = Clock.System.now().epochSeconds val weeks = seconds / 604800L - weeks.toString() + emit(weeks.toString()) } override suspend fun process( @@ -354,15 +401,25 @@ class WatchtowerWebsitePwned( class WatchtowerInactivePasskey( private val getPasskeys: GetPasskeys, + private val getCheckPasskeys: GetCheckPasskeys, ) : WatchtowerClientTyped { override val type: Long get() = DWatchtowerAlertType.PASSKEY_WEBSITE.value constructor(directDI: DirectDI) : this( getPasskeys = directDI.instance(), + getCheckPasskeys = directDI.instance(), ) - override suspend fun version(): String = FileHashes.passkeys + override fun version() = combineJoinToVersion( + getDatabaseVersionFlow(), + getCheckPasskeys() + .map { + it.int.toString() + }, + ) + + private fun getDatabaseVersionFlow() = flowOf(FileHashes.passkeys) override suspend fun process( ciphers: List, @@ -454,7 +511,7 @@ class WatchtowerIncomplete( cipherIncompleteCheck = directDI.instance(), ) - override suspend fun version(): String = "1" + override fun version() = flowOf("1") override suspend fun process( ciphers: List, @@ -509,7 +566,12 @@ class WatchtowerExpiring( cipherExpiringCheck = directDI.instance(), ) - override suspend fun version(): String = "1" + override fun version() = flow { + // Refresh daily + val seconds = Clock.System.now().epochSeconds + val days = seconds / 86400L + emit(days.toString()) + } override suspend fun process( ciphers: List, @@ -567,7 +629,7 @@ class WatchtowerUnsecureWebsite( cipherUnsecureUrlCheck = directDI.instance(), ) - override suspend fun version(): String = FileHashes.public_suffix_list + override fun version() = flowOf(FileHashes.public_suffix_list) override suspend fun process( ciphers: List, @@ -608,15 +670,25 @@ class WatchtowerUnsecureWebsite( class WatchtowerInactiveTfa( private val tfaService: GetTwoFa, + private val getCheckTwoFA: GetCheckTwoFA, ) : WatchtowerClientTyped { override val type: Long get() = DWatchtowerAlertType.TWO_FA_WEBSITE.value constructor(directDI: DirectDI) : this( tfaService = directDI.instance(), + getCheckTwoFA = directDI.instance(), ) - override suspend fun version(): String = FileHashes.tfa + override fun version() = combineJoinToVersion( + getDatabaseVersionFlow(), + getCheckTwoFA() + .map { + it.int.toString() + }, + ) + + private fun getDatabaseVersionFlow() = flowOf(FileHashes.tfa) override suspend fun process( ciphers: List, @@ -696,7 +768,7 @@ class WatchtowerDuplicateUris( cipherUrlDuplicateCheck = directDI.instance(), ) - override suspend fun version(): String = "1" + override fun version() = flowOf("1") override suspend fun process( ciphers: List, @@ -784,3 +856,11 @@ private fun parseHost(uri: DSecret.Uri) = if ( // can not get the domain null } + +private fun combineJoinToVersion( + vararg flows: Flow, +): Flow = combine( + flows = flows, +) { + it.joinToString(separator = "|") +}