fixing sign out not resetting database or inmemory caches
This commit is contained in:
parent
0b65fd7766
commit
91cf19baad
|
@ -33,7 +33,7 @@ internal class SharedPreferencesDelegate(
|
||||||
|
|
||||||
override suspend fun clear() {
|
override suspend fun clear() {
|
||||||
coroutineDispatchers.withIoContext {
|
coroutineDispatchers.withIoContext {
|
||||||
preferences.edit().clear().apply()
|
preferences.edit().clear().commit()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,26 +1,27 @@
|
||||||
package app.dapk.st
|
package app.dapk.st
|
||||||
|
|
||||||
import android.app.Application
|
import android.app.Application
|
||||||
|
import android.content.Intent
|
||||||
import android.util.Log
|
import android.util.Log
|
||||||
import app.dapk.st.core.CoreAndroidModule
|
import app.dapk.st.core.CoreAndroidModule
|
||||||
import app.dapk.st.core.ModuleProvider
|
import app.dapk.st.core.ModuleProvider
|
||||||
import app.dapk.st.core.ProvidableModule
|
import app.dapk.st.core.ProvidableModule
|
||||||
import app.dapk.st.core.attachAppLogger
|
import app.dapk.st.core.attachAppLogger
|
||||||
|
import app.dapk.st.core.extensions.ResettableUnsafeLazy
|
||||||
import app.dapk.st.core.extensions.Scope
|
import app.dapk.st.core.extensions.Scope
|
||||||
import app.dapk.st.core.extensions.unsafeLazy
|
|
||||||
import app.dapk.st.directory.DirectoryModule
|
import app.dapk.st.directory.DirectoryModule
|
||||||
import app.dapk.st.messenger.MessengerModule
|
import app.dapk.st.domain.StoreModule
|
||||||
import app.dapk.st.graph.AppModule
|
import app.dapk.st.graph.AppModule
|
||||||
import app.dapk.st.graph.FeatureModules
|
|
||||||
import app.dapk.st.home.HomeModule
|
import app.dapk.st.home.HomeModule
|
||||||
import app.dapk.st.login.LoginModule
|
import app.dapk.st.login.LoginModule
|
||||||
|
import app.dapk.st.messenger.MessengerModule
|
||||||
import app.dapk.st.notifications.NotificationsModule
|
import app.dapk.st.notifications.NotificationsModule
|
||||||
|
import app.dapk.st.notifications.PushAndroidService
|
||||||
import app.dapk.st.profile.ProfileModule
|
import app.dapk.st.profile.ProfileModule
|
||||||
import app.dapk.st.settings.SettingsModule
|
import app.dapk.st.settings.SettingsModule
|
||||||
import app.dapk.st.work.TaskRunnerModule
|
import app.dapk.st.work.TaskRunnerModule
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.cancel
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
class SmallTalkApplication : Application(), ModuleProvider {
|
class SmallTalkApplication : Application(), ModuleProvider {
|
||||||
|
@ -28,8 +29,10 @@ class SmallTalkApplication : Application(), ModuleProvider {
|
||||||
private val appLogger: (String, String) -> Unit = { tag, message -> _appLogger?.invoke(tag, message) }
|
private val appLogger: (String, String) -> Unit = { tag, message -> _appLogger?.invoke(tag, message) }
|
||||||
private var _appLogger: ((String, String) -> Unit)? = null
|
private var _appLogger: ((String, String) -> Unit)? = null
|
||||||
|
|
||||||
private val appModule: AppModule by unsafeLazy { AppModule(this, appLogger) }
|
private val lazyAppModule = ResettableUnsafeLazy { AppModule(this, appLogger) }
|
||||||
private val featureModules: FeatureModules by unsafeLazy { appModule.featureModules }
|
private val lazyFeatureModules = ResettableUnsafeLazy { appModule.featureModules }
|
||||||
|
private val appModule by lazyAppModule
|
||||||
|
private val featureModules by lazyFeatureModules
|
||||||
private val applicationScope = Scope(Dispatchers.IO)
|
private val applicationScope = Scope(Dispatchers.IO)
|
||||||
|
|
||||||
override fun onCreate() {
|
override fun onCreate() {
|
||||||
|
@ -40,13 +43,15 @@ class SmallTalkApplication : Application(), ModuleProvider {
|
||||||
|
|
||||||
val logger: (String, String) -> Unit = { tag, message ->
|
val logger: (String, String) -> Unit = { tag, message ->
|
||||||
Log.e(tag, message)
|
Log.e(tag, message)
|
||||||
GlobalScope.launch {
|
applicationScope.launch { eventLogStore.insert(tag, message) }
|
||||||
eventLogStore.insert(tag, message)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
attachAppLogger(logger)
|
attachAppLogger(logger)
|
||||||
_appLogger = logger
|
_appLogger = logger
|
||||||
|
|
||||||
|
onApplicationLaunch(notificationsModule, storeModule)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun onApplicationLaunch(notificationsModule: NotificationsModule, storeModule: StoreModule) {
|
||||||
applicationScope.launch {
|
applicationScope.launch {
|
||||||
notificationsModule.firebasePushTokenUseCase().registerCurrentToken()
|
notificationsModule.firebasePushTokenUseCase().registerCurrentToken()
|
||||||
storeModule.localEchoStore.preload()
|
storeModule.localEchoStore.preload()
|
||||||
|
@ -73,4 +78,17 @@ class SmallTalkApplication : Application(), ModuleProvider {
|
||||||
else -> throw IllegalArgumentException("Unknown: $klass")
|
else -> throw IllegalArgumentException("Unknown: $klass")
|
||||||
} as T
|
} as T
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun reset() {
|
||||||
|
featureModules.notificationsModule.firebasePushTokenUseCase().unregister()
|
||||||
|
appModule.coroutineDispatchers.io.cancel()
|
||||||
|
applicationScope.cancel()
|
||||||
|
stopService(Intent(this, PushAndroidService::class.java))
|
||||||
|
lazyAppModule.reset()
|
||||||
|
lazyFeatureModules.reset()
|
||||||
|
|
||||||
|
val notificationsModule = featureModules.notificationsModule
|
||||||
|
val storeModule = appModule.storeModule.value
|
||||||
|
onApplicationLaunch(notificationsModule, storeModule)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package app.dapk.st.graph
|
||||||
|
|
||||||
|
import app.dapk.st.core.Base64
|
||||||
|
|
||||||
|
class AndroidBase64 : Base64 {
|
||||||
|
override fun encode(input: ByteArray): String {
|
||||||
|
return android.util.Base64.encodeToString(input, android.util.Base64.DEFAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun decode(input: String): ByteArray {
|
||||||
|
return android.util.Base64.decode(input, android.util.Base64.DEFAULT)
|
||||||
|
}
|
||||||
|
}
|
|
@ -69,7 +69,7 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
|
||||||
private val driver = AndroidSqliteDriver(DapkDb.Schema, context, "dapk.db")
|
private val driver = AndroidSqliteDriver(DapkDb.Schema, context, "dapk.db")
|
||||||
private val database = DapkDb(driver)
|
private val database = DapkDb(driver)
|
||||||
private val clock = Clock.systemUTC()
|
private val clock = Clock.systemUTC()
|
||||||
private val coroutineDispatchers = CoroutineDispatchers(Dispatchers.IO)
|
val coroutineDispatchers = CoroutineDispatchers(Dispatchers.IO)
|
||||||
|
|
||||||
val storeModule = unsafeLazy {
|
val storeModule = unsafeLazy {
|
||||||
StoreModule(
|
StoreModule(
|
||||||
|
@ -83,9 +83,12 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
|
||||||
sql = "SELECT name FROM sqlite_master WHERE type = 'table' ${if (includeCryptoAccount) "" else "AND name != 'dbCryptoAccount'"}",
|
sql = "SELECT name FROM sqlite_master WHERE type = 'table' ${if (includeCryptoAccount) "" else "AND name != 'dbCryptoAccount'"}",
|
||||||
parameters = 0
|
parameters = 0
|
||||||
)
|
)
|
||||||
while (cursor.next()) {
|
cursor.use {
|
||||||
cursor.getString(0)?.let {
|
while (cursor.next()) {
|
||||||
driver.execute(null, "DELETE FROM $it", 0)
|
cursor.getString(0)?.let {
|
||||||
|
log(AppLogTag.ERROR_NON_FATAL, "Deleting $it")
|
||||||
|
driver.execute(null, "DELETE FROM $it", 0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -283,6 +286,14 @@ internal class MatrixModules(
|
||||||
store.roomStore(),
|
store.roomStore(),
|
||||||
store.syncStore(),
|
store.syncStore(),
|
||||||
store.filterStore(),
|
store.filterStore(),
|
||||||
|
deviceNotifier = { services ->
|
||||||
|
val encryption = services.deviceService()
|
||||||
|
val crypto = services.cryptoService()
|
||||||
|
DeviceNotifier { userIds, syncToken ->
|
||||||
|
encryption.updateStaleDevices(userIds)
|
||||||
|
crypto.updateOlmSession(userIds, syncToken)
|
||||||
|
}
|
||||||
|
},
|
||||||
messageDecrypter = { serviceProvider ->
|
messageDecrypter = { serviceProvider ->
|
||||||
val cryptoService = serviceProvider.cryptoService()
|
val cryptoService = serviceProvider.cryptoService()
|
||||||
MessageDecrypter {
|
MessageDecrypter {
|
||||||
|
@ -296,9 +307,9 @@ internal class MatrixModules(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
verificationHandler = { services ->
|
verificationHandler = { services ->
|
||||||
logger.matrixLog(MatrixLogTag.VERIFICATION, "got a verification request $it")
|
|
||||||
val cryptoService = services.cryptoService()
|
val cryptoService = services.cryptoService()
|
||||||
VerificationHandler { apiEvent ->
|
VerificationHandler { apiEvent ->
|
||||||
|
logger.matrixLog(MatrixLogTag.VERIFICATION, "got a verification request $it")
|
||||||
cryptoService.onVerificationEvent(
|
cryptoService.onVerificationEvent(
|
||||||
when (apiEvent) {
|
when (apiEvent) {
|
||||||
is ApiToDeviceEvent.VerificationRequest -> Verification.Event.Requested(
|
is ApiToDeviceEvent.VerificationRequest -> Verification.Event.Requested(
|
||||||
|
@ -341,14 +352,6 @@ internal class MatrixModules(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
deviceNotifier = { services ->
|
|
||||||
val encryption = services.deviceService()
|
|
||||||
val crypto = services.cryptoService()
|
|
||||||
DeviceNotifier { userIds, syncToken ->
|
|
||||||
encryption.updateStaleDevices(userIds)
|
|
||||||
crypto.updateOlmSession(userIds, syncToken)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
oneTimeKeyProducer = { services ->
|
oneTimeKeyProducer = { services ->
|
||||||
val cryptoService = services.cryptoService()
|
val cryptoService = services.cryptoService()
|
||||||
MaybeCreateMoreKeys {
|
MaybeCreateMoreKeys {
|
||||||
|
@ -367,8 +370,6 @@ internal class MatrixModules(
|
||||||
)
|
)
|
||||||
|
|
||||||
installPushService(credentialsStore)
|
installPushService(credentialsStore)
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -414,13 +415,3 @@ class TaskRunnerAdapter(private val matrixTaskRunner: suspend (MatrixTask) -> Ma
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class AndroidBase64 : Base64 {
|
|
||||||
override fun encode(input: ByteArray): String {
|
|
||||||
return android.util.Base64.encodeToString(input, android.util.Base64.DEFAULT)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun decode(input: String): ByteArray {
|
|
||||||
return android.util.Base64.decode(input, android.util.Base64.DEFAULT)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -4,7 +4,8 @@ import kotlin.reflect.KClass
|
||||||
|
|
||||||
interface ModuleProvider {
|
interface ModuleProvider {
|
||||||
|
|
||||||
fun <T: ProvidableModule> provide(klass: KClass<T>): T
|
fun <T : ProvidableModule> provide(klass: KClass<T>): T
|
||||||
|
fun reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ProvidableModule
|
interface ProvidableModule
|
|
@ -21,3 +21,25 @@ inline fun <T, T1 : T, T2 : T> Iterable<T>.firstOrNull(predicate: (T) -> Boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <T> unsafeLazy(initializer: () -> T): Lazy<T> = lazy(mode = LazyThreadSafetyMode.NONE, initializer = initializer)
|
fun <T> unsafeLazy(initializer: () -> T): Lazy<T> = lazy(mode = LazyThreadSafetyMode.NONE, initializer = initializer)
|
||||||
|
|
||||||
|
class ResettableUnsafeLazy<T>(private val initializer: () -> T) : Lazy<T> {
|
||||||
|
|
||||||
|
private var _value: T? = null
|
||||||
|
|
||||||
|
override val value: T
|
||||||
|
get() {
|
||||||
|
return if (_value == null) {
|
||||||
|
initializer().also { _value = it }
|
||||||
|
} else {
|
||||||
|
_value!!
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun isInitialized(): Boolean {
|
||||||
|
return _value != null
|
||||||
|
}
|
||||||
|
|
||||||
|
fun reset() {
|
||||||
|
_value = null
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,4 +3,6 @@ package app.dapk.st.core
|
||||||
import android.content.Context
|
import android.content.Context
|
||||||
|
|
||||||
inline fun <reified T : ProvidableModule> Context.module() =
|
inline fun <reified T : ProvidableModule> Context.module() =
|
||||||
(this.applicationContext as ModuleProvider).provide(T::class)
|
(this.applicationContext as ModuleProvider).provide(T::class)
|
||||||
|
|
||||||
|
fun Context.resetModules() = (this.applicationContext as ModuleProvider).reset()
|
|
@ -12,6 +12,10 @@ class RegisterFirebasePushTokenUseCase(
|
||||||
override val errorTracker: ErrorTracker,
|
override val errorTracker: ErrorTracker,
|
||||||
) : CrashScope {
|
) : CrashScope {
|
||||||
|
|
||||||
|
fun unregister() {
|
||||||
|
FirebaseMessaging.getInstance().deleteToken()
|
||||||
|
}
|
||||||
|
|
||||||
suspend fun registerCurrentToken() {
|
suspend fun registerCurrentToken() {
|
||||||
kotlin.runCatching {
|
kotlin.runCatching {
|
||||||
FirebaseMessaging.getInstance().token().also {
|
FirebaseMessaging.getInstance().token().also {
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package app.dapk.st.domain
|
package app.dapk.st.domain
|
||||||
|
|
||||||
|
import app.dapk.st.core.AppLogTag
|
||||||
|
import app.dapk.st.core.log
|
||||||
import app.dapk.st.matrix.common.SyncToken
|
import app.dapk.st.matrix.common.SyncToken
|
||||||
import app.dapk.st.matrix.sync.SyncStore
|
import app.dapk.st.matrix.sync.SyncStore
|
||||||
import app.dapk.st.matrix.sync.SyncStore.SyncKey
|
import app.dapk.st.matrix.sync.SyncStore.SyncKey
|
||||||
|
@ -9,11 +11,13 @@ internal class SyncTokenPreferences(
|
||||||
) : SyncStore {
|
) : SyncStore {
|
||||||
|
|
||||||
override suspend fun store(key: SyncKey, syncToken: SyncToken) {
|
override suspend fun store(key: SyncKey, syncToken: SyncToken) {
|
||||||
|
log(AppLogTag.ERROR_NON_FATAL, "Store token :$syncToken")
|
||||||
preferences.store(key.value, syncToken.value)
|
preferences.store(key.value, syncToken.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun read(key: SyncKey): SyncToken? {
|
override suspend fun read(key: SyncKey): SyncToken? {
|
||||||
return preferences.readString(key.value)?.let {
|
return preferences.readString(key.value)?.let {
|
||||||
|
log(AppLogTag.ERROR_NON_FATAL, "Read token :$it")
|
||||||
SyncToken(it)
|
SyncToken(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import androidx.compose.material.Surface
|
||||||
import androidx.compose.ui.Modifier
|
import androidx.compose.ui.Modifier
|
||||||
import app.dapk.st.core.DapkActivity
|
import app.dapk.st.core.DapkActivity
|
||||||
import app.dapk.st.core.module
|
import app.dapk.st.core.module
|
||||||
|
import app.dapk.st.core.resetModules
|
||||||
import app.dapk.st.core.viewModel
|
import app.dapk.st.core.viewModel
|
||||||
import app.dapk.st.design.components.SmallTalkTheme
|
import app.dapk.st.design.components.SmallTalkTheme
|
||||||
|
|
||||||
|
@ -20,6 +21,7 @@ class SettingsActivity : DapkActivity() {
|
||||||
SmallTalkTheme {
|
SmallTalkTheme {
|
||||||
Surface(Modifier.fillMaxSize()) {
|
Surface(Modifier.fillMaxSize()) {
|
||||||
SettingsScreen(settingsViewModel, onSignOut = {
|
SettingsScreen(settingsViewModel, onSignOut = {
|
||||||
|
resetModules()
|
||||||
navigator.navigate.toHome()
|
navigator.navigate.toHome()
|
||||||
finish()
|
finish()
|
||||||
}, navigator)
|
}, navigator)
|
||||||
|
|
|
@ -19,7 +19,6 @@ class SettingsModule(
|
||||||
) : ProvidableModule {
|
) : ProvidableModule {
|
||||||
|
|
||||||
internal fun settingsViewModel() = SettingsViewModel(
|
internal fun settingsViewModel() = SettingsViewModel(
|
||||||
storeModule.credentialsStore(),
|
|
||||||
storeModule.cacheCleaner(),
|
storeModule.cacheCleaner(),
|
||||||
contentResolver,
|
contentResolver,
|
||||||
cryptoService,
|
cryptoService,
|
||||||
|
|
|
@ -3,10 +3,11 @@ package app.dapk.st.settings
|
||||||
import android.content.ContentResolver
|
import android.content.ContentResolver
|
||||||
import android.net.Uri
|
import android.net.Uri
|
||||||
import androidx.lifecycle.viewModelScope
|
import androidx.lifecycle.viewModelScope
|
||||||
|
import app.dapk.st.core.AppLogTag
|
||||||
import app.dapk.st.core.Lce
|
import app.dapk.st.core.Lce
|
||||||
|
import app.dapk.st.core.log
|
||||||
import app.dapk.st.design.components.SpiderPage
|
import app.dapk.st.design.components.SpiderPage
|
||||||
import app.dapk.st.domain.StoreCleaner
|
import app.dapk.st.domain.StoreCleaner
|
||||||
import app.dapk.st.matrix.common.CredentialsStore
|
|
||||||
import app.dapk.st.matrix.crypto.CryptoService
|
import app.dapk.st.matrix.crypto.CryptoService
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import app.dapk.st.settings.SettingItem.Id.*
|
import app.dapk.st.settings.SettingItem.Id.*
|
||||||
|
@ -19,7 +20,6 @@ import kotlinx.coroutines.launch
|
||||||
private const val PRIVACY_POLICY_URL = "https://ouchadam.github.io/small-talk/privacy/"
|
private const val PRIVACY_POLICY_URL = "https://ouchadam.github.io/small-talk/privacy/"
|
||||||
|
|
||||||
internal class SettingsViewModel(
|
internal class SettingsViewModel(
|
||||||
private val credentialsStore: CredentialsStore,
|
|
||||||
private val cacheCleaner: StoreCleaner,
|
private val cacheCleaner: StoreCleaner,
|
||||||
private val contentResolver: ContentResolver,
|
private val contentResolver: ContentResolver,
|
||||||
private val cryptoService: CryptoService,
|
private val cryptoService: CryptoService,
|
||||||
|
@ -49,9 +49,9 @@ internal class SettingsViewModel(
|
||||||
when (item.id) {
|
when (item.id) {
|
||||||
SignOut -> {
|
SignOut -> {
|
||||||
viewModelScope.launch {
|
viewModelScope.launch {
|
||||||
credentialsStore.clear()
|
log(AppLogTag.ERROR_NON_FATAL, "Sign out triggered")
|
||||||
|
cacheCleaner.cleanCache(removeCredentials = true)
|
||||||
_events.emit(SignedOut)
|
_events.emit(SignedOut)
|
||||||
println("emitted")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
AccessToken -> {
|
AccessToken -> {
|
||||||
|
|
|
@ -38,7 +38,6 @@ internal class SettingsViewModelTest {
|
||||||
private val fakeSettingsItemFactory = FakeSettingsItemFactory()
|
private val fakeSettingsItemFactory = FakeSettingsItemFactory()
|
||||||
|
|
||||||
private val viewModel = SettingsViewModel(
|
private val viewModel = SettingsViewModel(
|
||||||
fakeCredentialsStore,
|
|
||||||
fakeStoreCleaner,
|
fakeStoreCleaner,
|
||||||
fakeContentResolver.instance,
|
fakeContentResolver.instance,
|
||||||
fakeCryptoService,
|
fakeCryptoService,
|
||||||
|
@ -77,8 +76,8 @@ internal class SettingsViewModelTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `when sign out clicked, then clears credentials`() = runViewModelTest {
|
fun `when sign out clicked, then clears store`() = runViewModelTest {
|
||||||
fakeCredentialsStore.expectUnit { it.clear() }
|
fakeStoreCleaner.expectUnit { it.cleanCache(removeCredentials = true) }
|
||||||
val aSignOutItem = aSettingTextItem(id = SettingItem.Id.SignOut)
|
val aSignOutItem = aSettingTextItem(id = SettingItem.Id.SignOut)
|
||||||
|
|
||||||
viewModel.test().onClick(aSignOutItem)
|
viewModel.test().onClick(aSignOutItem)
|
||||||
|
|
|
@ -23,7 +23,6 @@ interface RoomStore {
|
||||||
interface FilterStore {
|
interface FilterStore {
|
||||||
|
|
||||||
suspend fun store(key: String, filterId: String)
|
suspend fun store(key: String, filterId: String)
|
||||||
|
|
||||||
suspend fun read(key: String): String?
|
suspend fun read(key: String): String?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package app.dapk.st.matrix.sync.internal
|
||||||
|
|
||||||
import app.dapk.st.core.CoroutineDispatchers
|
import app.dapk.st.core.CoroutineDispatchers
|
||||||
import app.dapk.st.core.extensions.ErrorTracker
|
import app.dapk.st.core.extensions.ErrorTracker
|
||||||
|
import app.dapk.st.core.withIoContext
|
||||||
import app.dapk.st.matrix.common.*
|
import app.dapk.st.matrix.common.*
|
||||||
import app.dapk.st.matrix.http.MatrixHttpClient
|
import app.dapk.st.matrix.http.MatrixHttpClient
|
||||||
import app.dapk.st.matrix.sync.*
|
import app.dapk.st.matrix.sync.*
|
||||||
|
@ -12,11 +13,15 @@ import app.dapk.st.matrix.sync.internal.room.RoomEventsDecrypter
|
||||||
import app.dapk.st.matrix.sync.internal.room.SyncEventDecrypter
|
import app.dapk.st.matrix.sync.internal.room.SyncEventDecrypter
|
||||||
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
||||||
import app.dapk.st.matrix.sync.internal.sync.*
|
import app.dapk.st.matrix.sync.internal.sync.*
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.awaitAll
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
|
private val syncSubscriptionCount = AtomicInteger()
|
||||||
|
|
||||||
internal class DefaultSyncService(
|
internal class DefaultSyncService(
|
||||||
httpClient: MatrixHttpClient,
|
httpClient: MatrixHttpClient,
|
||||||
syncStore: SyncStore,
|
syncStore: SyncStore,
|
||||||
|
@ -34,11 +39,10 @@ internal class DefaultSyncService(
|
||||||
roomMembersService: RoomMembersService,
|
roomMembersService: RoomMembersService,
|
||||||
logger: MatrixLogger,
|
logger: MatrixLogger,
|
||||||
errorTracker: ErrorTracker,
|
errorTracker: ErrorTracker,
|
||||||
coroutineDispatchers: CoroutineDispatchers,
|
private val coroutineDispatchers: CoroutineDispatchers,
|
||||||
syncConfig: SyncConfig,
|
syncConfig: SyncConfig,
|
||||||
) : SyncService {
|
) : SyncService {
|
||||||
|
|
||||||
private val syncSubscriptionCount = AtomicInteger()
|
|
||||||
private val syncEventsFlow = MutableStateFlow<List<SyncService.SyncEvent>>(emptyList())
|
private val syncEventsFlow = MutableStateFlow<List<SyncService.SyncEvent>>(emptyList())
|
||||||
|
|
||||||
private val roomDataSource by lazy { RoomDataSource(roomStore, logger) }
|
private val roomDataSource by lazy { RoomDataSource(roomStore, logger) }
|
||||||
|
@ -107,7 +111,7 @@ internal class DefaultSyncService(
|
||||||
override fun events() = syncEventsFlow
|
override fun events() = syncEventsFlow
|
||||||
override suspend fun observeEvent(eventId: EventId) = roomStore.observeEvent(eventId)
|
override suspend fun observeEvent(eventId: EventId) = roomStore.observeEvent(eventId)
|
||||||
override suspend fun forceManualRefresh(roomIds: List<RoomId>) {
|
override suspend fun forceManualRefresh(roomIds: List<RoomId>) {
|
||||||
withContext(Dispatchers.IO) {
|
coroutineDispatchers.withIoContext {
|
||||||
roomIds.map {
|
roomIds.map {
|
||||||
async {
|
async {
|
||||||
roomRefresher.refreshRoomContent(it)?.also {
|
roomRefresher.refreshRoomContent(it)?.also {
|
||||||
|
|
|
@ -34,18 +34,22 @@ internal class SyncReducer(
|
||||||
val newRooms = response.rooms?.join?.keys?.filterNot { roomDataSource.contains(it) } ?: emptyList()
|
val newRooms = response.rooms?.join?.keys?.filterNot { roomDataSource.contains(it) } ?: emptyList()
|
||||||
|
|
||||||
val apiUpdatedRooms = response.rooms?.join?.keepRoomsWithChanges()
|
val apiUpdatedRooms = response.rooms?.join?.keepRoomsWithChanges()
|
||||||
val apiRoomsToProcess = apiUpdatedRooms?.map { (roomId, apiRoom) ->
|
val apiRoomsToProcess = apiUpdatedRooms?.mapNotNull { (roomId, apiRoom) ->
|
||||||
logger.matrixLog(SYNC, "reducing: $roomId")
|
logger.matrixLog(SYNC, "reducing: $roomId")
|
||||||
coroutineDispatchers.withIoContextAsync {
|
coroutineDispatchers.withIoContextAsync {
|
||||||
roomProcessor.processRoom(
|
runCatching {
|
||||||
roomToProcess = RoomToProcess(
|
roomProcessor.processRoom(
|
||||||
roomId = roomId,
|
roomToProcess = RoomToProcess(
|
||||||
apiSyncRoom = apiRoom,
|
roomId = roomId,
|
||||||
directMessage = directMessages[roomId],
|
apiSyncRoom = apiRoom,
|
||||||
userCredentials = userCredentials,
|
directMessage = directMessages[roomId],
|
||||||
),
|
userCredentials = userCredentials,
|
||||||
isInitialSync = isInitialSync
|
),
|
||||||
)
|
isInitialSync = isInitialSync
|
||||||
|
)
|
||||||
|
}
|
||||||
|
.onFailure { logger.matrixLog(SYNC, "failed to reduce: $roomId, skipping") }
|
||||||
|
.getOrNull()
|
||||||
}
|
}
|
||||||
} ?: emptyList()
|
} ?: emptyList()
|
||||||
|
|
||||||
|
|
|
@ -8,9 +8,12 @@ import app.dapk.st.matrix.sync.internal.SideEffectFlowIterator
|
||||||
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
|
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
|
||||||
import app.dapk.st.matrix.sync.internal.request.syncRequest
|
import app.dapk.st.matrix.sync.internal.request.syncRequest
|
||||||
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
||||||
|
import kotlinx.coroutines.currentCoroutineContext
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.cancellable
|
import kotlinx.coroutines.flow.cancellable
|
||||||
import kotlinx.coroutines.flow.flow
|
import kotlinx.coroutines.flow.flow
|
||||||
|
import kotlinx.coroutines.flow.isActive
|
||||||
|
import kotlinx.coroutines.isActive
|
||||||
|
|
||||||
internal class SyncUseCase(
|
internal class SyncUseCase(
|
||||||
private val persistence: OverviewStore,
|
private val persistence: OverviewStore,
|
||||||
|
@ -27,40 +30,43 @@ internal class SyncUseCase(
|
||||||
|
|
||||||
fun sync(): Flow<Unit> {
|
fun sync(): Flow<Unit> {
|
||||||
return flow<Unit> {
|
return flow<Unit> {
|
||||||
logger.matrixLog("flow instance: ${hashCode()}")
|
|
||||||
val credentials = credentialsStore.credentials()!!
|
val credentials = credentialsStore.credentials()!!
|
||||||
val filterId = filterUseCase.reducedFilter(credentials.userId)
|
val filterId = filterUseCase.reducedFilter(credentials.userId)
|
||||||
|
|
||||||
with(flowIterator) {
|
with(flowIterator) {
|
||||||
loop<OverviewState>(initial = null) { previousState ->
|
loop<OverviewState>(initial = null) { previousState ->
|
||||||
logger.matrixLog("looper : ${hashCode()}")
|
|
||||||
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
|
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
|
||||||
val response = doSyncRequest(filterId, syncToken)
|
val response = doSyncRequest(filterId, syncToken)
|
||||||
logger.logP("sync processing") {
|
if (credentialsStore.isSignedIn()) {
|
||||||
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
|
logger.logP("sync processing") {
|
||||||
val sideEffects = logger.logP("side effects processing") {
|
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
|
||||||
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
|
val sideEffects = logger.logP("side effects processing") {
|
||||||
}
|
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
|
||||||
|
}
|
||||||
|
|
||||||
val isInitialSync = syncToken == null
|
val isInitialSync = syncToken == null
|
||||||
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
|
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
|
||||||
val overview = nextState.roomState.map { it.roomOverview }
|
val overview = nextState.roomState.map { it.roomOverview }
|
||||||
|
|
||||||
if (nextState.roomsLeft.isNotEmpty()) {
|
if (nextState.roomsLeft.isNotEmpty()) {
|
||||||
persistence.removeRooms(nextState.roomsLeft)
|
persistence.removeRooms(nextState.roomsLeft)
|
||||||
}
|
}
|
||||||
if (nextState.invites.isNotEmpty()) {
|
if (nextState.invites.isNotEmpty()) {
|
||||||
persistence.persistInvites(nextState.invites)
|
persistence.persistInvites(nextState.invites)
|
||||||
}
|
}
|
||||||
if (nextState.newRoomsJoined.isNotEmpty()) {
|
if (nextState.newRoomsJoined.isNotEmpty()) {
|
||||||
persistence.removeInvites(nextState.newRoomsJoined)
|
persistence.removeInvites(nextState.newRoomsJoined)
|
||||||
}
|
}
|
||||||
|
|
||||||
when {
|
when {
|
||||||
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
|
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
|
||||||
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
|
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
|
||||||
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
|
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
|
||||||
|
null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,6 +157,14 @@ class TestMatrix(
|
||||||
storeModule.roomStore(),
|
storeModule.roomStore(),
|
||||||
storeModule.syncStore(),
|
storeModule.syncStore(),
|
||||||
storeModule.filterStore(),
|
storeModule.filterStore(),
|
||||||
|
deviceNotifier = { services ->
|
||||||
|
val encryptionService = services.deviceService()
|
||||||
|
val cryptoService = services.cryptoService()
|
||||||
|
DeviceNotifier { userIds, syncToken ->
|
||||||
|
encryptionService.updateStaleDevices(userIds)
|
||||||
|
cryptoService.updateOlmSession(userIds, syncToken)
|
||||||
|
}
|
||||||
|
},
|
||||||
messageDecrypter = { serviceProvider ->
|
messageDecrypter = { serviceProvider ->
|
||||||
MessageDecrypter {
|
MessageDecrypter {
|
||||||
serviceProvider.cryptoService().decrypt(it)
|
serviceProvider.cryptoService().decrypt(it)
|
||||||
|
@ -222,14 +230,6 @@ class TestMatrix(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
deviceNotifier = { services ->
|
|
||||||
val encryptionService = services.deviceService()
|
|
||||||
val cryptoService = services.cryptoService()
|
|
||||||
DeviceNotifier { userIds, syncToken ->
|
|
||||||
encryptionService.updateStaleDevices(userIds)
|
|
||||||
cryptoService.updateOlmSession(userIds, syncToken)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
oneTimeKeyProducer = { services ->
|
oneTimeKeyProducer = { services ->
|
||||||
val cryptoService = services.cryptoService()
|
val cryptoService = services.cryptoService()
|
||||||
MaybeCreateMoreKeys {
|
MaybeCreateMoreKeys {
|
||||||
|
|
Loading…
Reference in New Issue