converting task running to the engine

This commit is contained in:
Adam Brown 2022-10-09 21:54:55 +01:00
parent 489f45056c
commit 2b55c7dffa
4 changed files with 55 additions and 241 deletions

View File

@ -23,33 +23,19 @@ import app.dapk.st.home.HomeModule
import app.dapk.st.home.MainActivity
import app.dapk.st.imageloader.ImageLoaderModule
import app.dapk.st.login.LoginModule
import app.dapk.st.matrix.MatrixClient
import app.dapk.st.matrix.auth.DeviceDisplayNameGenerator
import app.dapk.st.matrix.auth.installAuthService
import app.dapk.st.matrix.common.*
import app.dapk.st.matrix.crypto.RoomMembersProvider
import app.dapk.st.matrix.crypto.Verification
import app.dapk.st.matrix.crypto.cryptoService
import app.dapk.st.matrix.crypto.installCryptoService
import app.dapk.st.matrix.device.deviceService
import app.dapk.st.matrix.device.installEncryptionService
import app.dapk.st.matrix.http.ktor.KtorMatrixHttpClientFactory
import app.dapk.st.matrix.message.*
import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.JsonString
import app.dapk.st.matrix.common.MatrixLogger
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.message.internal.ImageContentReader
import app.dapk.st.matrix.push.installPushService
import app.dapk.st.matrix.room.*
import app.dapk.st.matrix.sync.*
import app.dapk.st.matrix.sync.internal.request.ApiToDeviceEvent
import app.dapk.st.matrix.sync.internal.room.MessageDecrypter
import app.dapk.st.messenger.MessengerActivity
import app.dapk.st.messenger.MessengerModule
import app.dapk.st.messenger.gallery.ImageGalleryModule
import app.dapk.st.navigator.IntentFactory
import app.dapk.st.navigator.MessageAttachment
import app.dapk.st.notifications.NotificationsModule
import app.dapk.st.olm.DeviceKeyFactory
import app.dapk.st.olm.OlmPersistenceWrapper
import app.dapk.st.olm.OlmWrapper
import app.dapk.st.profile.ProfileModule
import app.dapk.st.push.PushHandler
import app.dapk.st.push.PushModule
@ -64,7 +50,6 @@ import com.squareup.sqldelight.android.AndroidSqliteDriver
import kotlinx.coroutines.Dispatchers
import kotlinx.serialization.json.Json
import java.io.InputStream
import java.time.Clock
internal class AppModule(context: Application, logger: MatrixLogger) {
@ -79,7 +64,7 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
private val driver = AndroidSqliteDriver(DapkDb.Schema, context, "dapk.db")
private val database = DapkDb(driver)
val coroutineDispatchers = CoroutineDispatchers(Dispatchers.IO)
val base64 = AndroidBase64()
private val base64 = AndroidBase64()
val storeModule = unsafeLazy {
StoreModule(
@ -95,9 +80,10 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
private val imageLoaderModule = ImageLoaderModule(context)
private val imageContentReader by unsafeLazy { AndroidImageContentReader(context.contentResolver) }
private val matrixModules = MatrixModules(storeModule, trackingModule, workModule, logger, coroutineDispatchers, imageContentReader, base64, buildMeta)
private val chatEngineModule =
ChatEngineModule(storeModule, trackingModule, workModule, logger, coroutineDispatchers, imageContentReader, base64, buildMeta)
val domainModules = DomainModules(matrixModules, trackingModule.errorTracker, context, coroutineDispatchers)
val domainModules = DomainModules(chatEngineModule, trackingModule.errorTracker, context, coroutineDispatchers)
val coreAndroidModule = CoreAndroidModule(
intentFactory = object : IntentFactory {
@ -131,7 +117,7 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
val featureModules = FeatureModules(
storeModule,
matrixModules,
chatEngineModule,
domainModules,
trackingModule,
coreAndroidModule,
@ -145,7 +131,7 @@ internal class AppModule(context: Application, logger: MatrixLogger) {
internal class FeatureModules internal constructor(
private val storeModule: Lazy<StoreModule>,
private val matrixModules: MatrixModules,
private val chatEngineModule: ChatEngineModule,
private val domainModules: DomainModules,
private val trackingModule: TrackingModule,
private val coreAndroidModule: CoreAndroidModule,
@ -159,27 +145,27 @@ internal class FeatureModules internal constructor(
val directoryModule by unsafeLazy {
DirectoryModule(
context = context,
chatEngine = matrixModules.engine,
chatEngine = chatEngineModule.engine,
)
}
val loginModule by unsafeLazy {
LoginModule(
matrixModules.engine,
chatEngineModule.engine,
domainModules.pushModule,
trackingModule.errorTracker
)
}
val messengerModule by unsafeLazy {
MessengerModule(
matrixModules.engine,
chatEngineModule.engine,
context,
storeModule.value.messageStore(),
)
}
val homeModule by unsafeLazy { HomeModule(matrixModules.engine, storeModule.value, buildMeta) }
val homeModule by unsafeLazy { HomeModule(chatEngineModule.engine, storeModule.value, buildMeta) }
val settingsModule by unsafeLazy {
SettingsModule(
matrixModules.engine,
chatEngineModule.engine,
storeModule.value,
pushModule,
context.contentResolver,
@ -191,7 +177,7 @@ internal class FeatureModules internal constructor(
storeModule.value.messageStore(),
)
}
val profileModule by unsafeLazy { ProfileModule(matrixModules.engine, trackingModule.errorTracker) }
val profileModule by unsafeLazy { ProfileModule(chatEngineModule.engine, trackingModule.errorTracker) }
val notificationsModule by unsafeLazy {
NotificationsModule(
imageLoaderModule.iconLoader(),
@ -205,7 +191,7 @@ internal class FeatureModules internal constructor(
}
val shareEntryModule by unsafeLazy {
ShareEntryModule(matrixModules.engine)
ShareEntryModule(chatEngineModule.engine)
}
val imageGalleryModule by unsafeLazy {
@ -222,7 +208,7 @@ internal class FeatureModules internal constructor(
}
internal class MatrixModules(
internal class ChatEngineModule(
private val storeModule: Lazy<StoreModule>,
private val trackingModule: TrackingModule,
private val workModule: WorkModule,
@ -257,217 +243,17 @@ internal class MatrixModules(
)
}
val matrix by unsafeLazy {
val store = storeModule.value
val credentialsStore = store.credentialsStore()
MatrixClient(
KtorMatrixHttpClientFactory(
credentialsStore,
includeLogging = buildMeta.isDebug,
),
logger
).also {
it.install {
installAuthService(credentialsStore, SmallTalkDeviceNameGenerator())
installEncryptionService(store.knownDevicesStore())
val olmAccountStore = OlmPersistenceWrapper(store.olmStore(), base64)
val singletonFlows = SingletonFlows(coroutineDispatchers)
val olm = OlmWrapper(
olmStore = olmAccountStore,
singletonFlows = singletonFlows,
jsonCanonicalizer = JsonCanonicalizer(),
deviceKeyFactory = DeviceKeyFactory(JsonCanonicalizer()),
errorTracker = trackingModule.errorTracker,
logger = logger,
clock = Clock.systemUTC(),
coroutineDispatchers = coroutineDispatchers,
)
installCryptoService(
credentialsStore,
olm,
roomMembersProvider = { services ->
RoomMembersProvider {
services.roomService().joinedMembers(it).map { it.userId }
}
},
base64 = base64,
coroutineDispatchers = coroutineDispatchers,
)
installMessageService(
store.localEchoStore,
BackgroundWorkAdapter(workModule.workScheduler()),
imageContentReader,
messageEncrypter = {
val cryptoService = it.cryptoService()
MessageEncrypter { message ->
val result = cryptoService.encrypt(
roomId = message.roomId,
credentials = credentialsStore.credentials()!!,
messageJson = message.contents,
)
MessageEncrypter.EncryptedMessagePayload(
result.algorithmName,
result.senderKey,
result.cipherText,
result.sessionId,
result.deviceId,
)
}
},
mediaEncrypter = {
val cryptoService = it.cryptoService()
MediaEncrypter { input ->
val result = cryptoService.encrypt(input)
MediaEncrypter.Result(
uri = result.uri,
contentLength = result.contentLength,
algorithm = result.algorithm,
ext = result.ext,
keyOperations = result.keyOperations,
kty = result.kty,
k = result.k,
iv = result.iv,
hashes = result.hashes,
v = result.v,
)
}
},
)
val overviewStore = store.overviewStore()
installRoomService(
storeModule.value.memberStore(),
roomMessenger = {
val messageService = it.messageService()
object : RoomMessenger {
override suspend fun enableEncryption(roomId: RoomId) {
messageService.sendEventMessage(
roomId, MessageService.EventMessage.Encryption(
algorithm = AlgorithmName("m.megolm.v1.aes-sha2")
)
)
}
}
},
roomInviteRemover = {
overviewStore.removeInvites(listOf(it))
}
)
installProfileService(storeModule.value.profileStore(), singletonFlows, credentialsStore)
installSyncService(
credentialsStore,
overviewStore,
store.roomStore(),
store.syncStore(),
store.filterStore(),
deviceNotifier = { services ->
val encryption = services.deviceService()
val crypto = services.cryptoService()
DeviceNotifier { userIds, syncToken ->
encryption.updateStaleDevices(userIds)
crypto.updateOlmSession(userIds, syncToken)
}
},
messageDecrypter = { serviceProvider ->
val cryptoService = serviceProvider.cryptoService()
MessageDecrypter {
cryptoService.decrypt(it)
}
},
keySharer = { serviceProvider ->
val cryptoService = serviceProvider.cryptoService()
KeySharer { sharedRoomKeys ->
cryptoService.importRoomKeys(sharedRoomKeys)
}
},
verificationHandler = { services ->
val cryptoService = services.cryptoService()
VerificationHandler { apiEvent ->
logger.matrixLog(MatrixLogTag.VERIFICATION, "got a verification request $it")
cryptoService.onVerificationEvent(
when (apiEvent) {
is ApiToDeviceEvent.VerificationRequest -> Verification.Event.Requested(
apiEvent.sender,
apiEvent.content.fromDevice,
apiEvent.content.transactionId,
apiEvent.content.methods,
apiEvent.content.timestampPosix,
)
is ApiToDeviceEvent.VerificationReady -> Verification.Event.Ready(
apiEvent.sender,
apiEvent.content.fromDevice,
apiEvent.content.transactionId,
apiEvent.content.methods,
)
is ApiToDeviceEvent.VerificationStart -> Verification.Event.Started(
apiEvent.sender,
apiEvent.content.fromDevice,
apiEvent.content.method,
apiEvent.content.protocols,
apiEvent.content.hashes,
apiEvent.content.codes,
apiEvent.content.short,
apiEvent.content.transactionId,
)
is ApiToDeviceEvent.VerificationCancel -> TODO()
is ApiToDeviceEvent.VerificationAccept -> TODO()
is ApiToDeviceEvent.VerificationKey -> Verification.Event.Key(
apiEvent.sender,
apiEvent.content.transactionId,
apiEvent.content.key
)
is ApiToDeviceEvent.VerificationMac -> Verification.Event.Mac(
apiEvent.sender,
apiEvent.content.transactionId,
apiEvent.content.keys,
apiEvent.content.mac,
)
}
)
}
},
oneTimeKeyProducer = { services ->
val cryptoService = services.cryptoService()
MaybeCreateMoreKeys {
cryptoService.maybeCreateMoreKeys(it)
}
},
roomMembersService = { services ->
val roomService = services.roomService()
object : RoomMembersService {
override suspend fun find(roomId: RoomId, userIds: List<UserId>) = roomService.findMembers(roomId, userIds)
override suspend fun findSummary(roomId: RoomId) = roomService.findMembersSummary(roomId)
override suspend fun insert(roomId: RoomId, members: List<RoomMember>) = roomService.insertMembers(roomId, members)
}
},
errorTracker = trackingModule.errorTracker,
coroutineDispatchers = coroutineDispatchers,
)
installPushService(credentialsStore)
}
}
}
}
internal class DomainModules(
private val matrixModules: MatrixModules,
private val chatEngineModule: ChatEngineModule,
private val errorTracker: ErrorTracker,
private val context: Application,
private val dispatchers: CoroutineDispatchers,
) {
private val pushHandler by unsafeLazy {
val enginePushHandler = matrixModules.engine.pushHandler()
val enginePushHandler = chatEngineModule.engine.pushHandler()
object : PushHandler {
override fun onNewToken(payload: PushTokenPayload) {
enginePushHandler.onNewToken(JsonString(Json.encodeToString(PushTokenPayload.serializer(), payload)))
@ -489,7 +275,9 @@ internal class DomainModules(
messaging.messaging,
)
}
val taskRunnerModule by unsafeLazy { TaskRunnerModule(TaskRunnerAdapter(matrixModules.matrix::run, AppTaskRunner(matrixModules.engine))) }
val taskRunnerModule by unsafeLazy {
TaskRunnerModule(TaskRunnerAdapter(chatEngineModule.engine, AppTaskRunner(chatEngineModule.engine)))
}
}

View File

@ -1,10 +1,11 @@
package app.dapk.st.graph
import app.dapk.st.matrix.MatrixTaskRunner
import app.dapk.st.engine.ChatEngine
import app.dapk.st.engine.ChatEngineTask
import app.dapk.st.work.TaskRunner
class TaskRunnerAdapter(
private val matrixTaskRunner: suspend (MatrixTaskRunner.MatrixTask) -> MatrixTaskRunner.TaskResult,
private val chatEngine: ChatEngine,
private val appTaskRunner: AppTaskRunner,
) : TaskRunner {
@ -12,11 +13,12 @@ class TaskRunnerAdapter(
return tasks.map {
when {
it.task.type.startsWith("matrix") -> {
when (val result = matrixTaskRunner(MatrixTaskRunner.MatrixTask(it.task.type, it.task.jsonPayload))) {
is MatrixTaskRunner.TaskResult.Failure -> TaskRunner.TaskResult.Failure(it.source, canRetry = result.canRetry)
MatrixTaskRunner.TaskResult.Success -> TaskRunner.TaskResult.Success(it.source)
when (val result = chatEngine.runTask(ChatEngineTask(it.task.type, it.task.jsonPayload))) {
is app.dapk.st.engine.TaskRunner.TaskResult.Failure -> TaskRunner.TaskResult.Failure(it.source, canRetry = result.canRetry)
app.dapk.st.engine.TaskRunner.TaskResult.Success -> TaskRunner.TaskResult.Success(it.source)
}
}
else -> appTaskRunner.run(it)
}
}

View File

@ -7,7 +7,7 @@ import app.dapk.st.matrix.common.RoomMember
import kotlinx.coroutines.flow.Flow
import java.io.InputStream
interface ChatEngine {
interface ChatEngine: TaskRunner {
fun directory(): Flow<DirectoryState>
@ -34,8 +34,24 @@ interface ChatEngine {
fun mediaDecrypter(): MediaDecrypter
fun pushHandler(): PushHandler
}
interface TaskRunner {
suspend fun runTask(task: ChatEngineTask): TaskResult
sealed interface TaskResult {
object Success : TaskResult
data class Failure(val canRetry: Boolean) : TaskResult
}
}
data class ChatEngineTask(val type: String, val jsonPayload: String)
interface MediaDecrypter {
fun decrypt(input: InputStream, k: String, iv: String): Collector

View File

@ -6,6 +6,7 @@ import app.dapk.st.core.CoroutineDispatchers
import app.dapk.st.core.SingletonFlows
import app.dapk.st.core.extensions.ErrorTracker
import app.dapk.st.matrix.MatrixClient
import app.dapk.st.matrix.MatrixTaskRunner
import app.dapk.st.matrix.auth.DeviceDisplayNameGenerator
import app.dapk.st.matrix.auth.authService
import app.dapk.st.matrix.auth.installAuthService
@ -103,6 +104,13 @@ class MatrixEngine internal constructor(
override fun pushHandler() = matrixPushHandler.value
override suspend fun runTask(task: ChatEngineTask): TaskRunner.TaskResult {
return when (val result = matrix.value.run(MatrixTaskRunner.MatrixTask(task.type, task.jsonPayload))) {
is MatrixTaskRunner.TaskResult.Failure -> TaskRunner.TaskResult.Failure(result.canRetry)
MatrixTaskRunner.TaskResult.Success -> TaskRunner.TaskResult.Success
}
}
class Factory {
fun create(