porting message observing to the chat engine

This commit is contained in:
Adam Brown 2022-10-09 18:58:03 +01:00
parent d2e8a29af8
commit baf7cfc17a
15 changed files with 263 additions and 85 deletions

View File

@ -177,11 +177,8 @@ internal class FeatureModules internal constructor(
}
val messengerModule by unsafeLazy {
MessengerModule(
matrixModules.sync,
matrixModules.engine,
matrixModules.message,
matrixModules.room,
storeModule.value.credentialsStore(),
storeModule.value.roomStore(),
clock,
context,
base64,

View File

@ -10,6 +10,8 @@ interface ChatEngine {
fun invites(): Flow<InviteState>
suspend fun messages(roomId: RoomId, disableReadReceipts: Boolean): Flow<MessengerState>
suspend fun login(request: LoginRequest): LoginResult
suspend fun me(forceRefresh: Boolean): Me

View File

@ -1,6 +1,10 @@
package app.dapk.st.engine
import app.dapk.st.matrix.common.*
import java.time.Instant
import java.time.ZoneId
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
typealias DirectoryState = List<DirectoryItem>
typealias OverviewState = List<RoomOverview>
@ -79,3 +83,118 @@ sealed interface ImportResult {
data class Update(val importedKeysCount: Long) : ImportResult
}
data class MessengerState(
val self: UserId,
val roomState: RoomState,
val typing: Typing?
)
data class RoomState(
val roomOverview: RoomOverview,
val events: List<RoomEvent>,
)
internal val DEFAULT_ZONE = ZoneId.systemDefault()
internal val MESSAGE_TIME_FORMAT = DateTimeFormatter.ofPattern("HH:mm")
sealed class RoomEvent {
abstract val eventId: EventId
abstract val utcTimestamp: Long
abstract val author: RoomMember
abstract val meta: MessageMeta
data class Message(
override val eventId: EventId,
override val utcTimestamp: Long,
val content: String,
override val author: RoomMember,
override val meta: MessageMeta,
val edited: Boolean = false,
val redacted: Boolean = false,
) : RoomEvent() {
val time: String by lazy(mode = LazyThreadSafetyMode.NONE) {
val instant = Instant.ofEpochMilli(utcTimestamp)
ZonedDateTime.ofInstant(instant, DEFAULT_ZONE).toLocalTime().format(MESSAGE_TIME_FORMAT)
}
}
data class Reply(
val message: RoomEvent,
val replyingTo: RoomEvent,
) : RoomEvent() {
override val eventId: EventId = message.eventId
override val utcTimestamp: Long = message.utcTimestamp
override val author: RoomMember = message.author
override val meta: MessageMeta = message.meta
val replyingToSelf = replyingTo.author == message.author
val time: String by lazy(mode = LazyThreadSafetyMode.NONE) {
val instant = Instant.ofEpochMilli(utcTimestamp)
ZonedDateTime.ofInstant(instant, DEFAULT_ZONE).toLocalTime().format(MESSAGE_TIME_FORMAT)
}
}
data class Image(
override val eventId: EventId,
override val utcTimestamp: Long,
val imageMeta: ImageMeta,
override val author: RoomMember,
override val meta: MessageMeta,
val edited: Boolean = false,
) : RoomEvent() {
val time: String by lazy(mode = LazyThreadSafetyMode.NONE) {
val instant = Instant.ofEpochMilli(utcTimestamp)
ZonedDateTime.ofInstant(instant, DEFAULT_ZONE).toLocalTime().format(MESSAGE_TIME_FORMAT)
}
data class ImageMeta(
val width: Int?,
val height: Int?,
val url: String,
val keys: Keys?,
) {
data class Keys(
val k: String,
val iv: String,
val v: String,
val hashes: Map<String, String>,
)
}
}
}
sealed class MessageMeta {
object FromServer : MessageMeta()
data class LocalEcho(
val echoId: String,
val state: State
) : MessageMeta() {
sealed class State {
object Sending : State()
object Sent : State()
data class Error(
val message: String,
val type: Type,
) : State() {
enum class Type {
UNKNOWN
}
}
}
}
}

View File

@ -2,6 +2,7 @@ applyAndroidComposeLibraryModule(project)
apply plugin: 'kotlin-parcelize'
dependencies {
implementation project(":chat-engine")
implementation project(":matrix:services:sync")
implementation project(":matrix:services:message")
implementation project(":matrix:services:crypto")

View File

@ -4,6 +4,7 @@ import android.content.Context
import app.dapk.st.core.Base64
import app.dapk.st.core.ProvidableModule
import app.dapk.st.domain.application.message.MessageOptionsStore
import app.dapk.st.engine.ChatEngine
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.message.MessageService
@ -14,11 +15,8 @@ import app.dapk.st.matrix.sync.SyncService
import java.time.Clock
class MessengerModule(
private val syncService: SyncService,
private val chatEngine: ChatEngine,
private val messageService: MessageService,
private val roomService: RoomService,
private val credentialsStore: CredentialsStore,
private val roomStore: RoomStore,
private val clock: Clock,
private val context: Context,
private val base64: Base64,
@ -28,11 +26,8 @@ class MessengerModule(
internal fun messengerViewModel(): MessengerViewModel {
return MessengerViewModel(
chatEngine,
messageService,
roomService,
roomStore,
credentialsStore,
timelineUseCase(),
LocalIdFactory(),
imageMetaReader,
messageOptionsStore,
@ -40,10 +35,5 @@ class MessengerModule(
)
}
private fun timelineUseCase(): TimelineUseCaseImpl {
val mergeWithLocalEchosUseCase = MergeWithLocalEchosUseCaseImpl(LocalEchoMapper(MetaMapper()))
return TimelineUseCaseImpl(syncService, messageService, roomService, mergeWithLocalEchosUseCase)
}
internal fun decryptingFetcherFactory(roomId: RoomId) = DecryptingFetcherFactory(context, base64, roomId)
}

View File

@ -44,12 +44,12 @@ import app.dapk.st.core.StartObserving
import app.dapk.st.core.components.CenteredLoading
import app.dapk.st.core.extensions.takeIfContent
import app.dapk.st.design.components.*
import app.dapk.st.engine.MessageMeta
import app.dapk.st.engine.MessengerState
import app.dapk.st.engine.RoomEvent
import app.dapk.st.engine.RoomState
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.sync.MessageMeta
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.matrix.sync.RoomEvent.Message
import app.dapk.st.matrix.sync.RoomState
import app.dapk.st.messenger.gallery.ImageGalleryActivityPayload
import app.dapk.st.navigator.MessageAttachment
import app.dapk.st.navigator.Navigator
@ -196,7 +196,7 @@ private fun ColumnScope.RoomContent(self: UserId, state: RoomState, replyActions
AlignedBubble(item, self, wasPreviousMessageSameSender, replyActions) {
when (item) {
is RoomEvent.Image -> MessageImage(it as BubbleContent<RoomEvent.Image>)
is Message -> TextBubbleContent(it as BubbleContent<RoomEvent.Message>)
is RoomEvent.Message -> TextBubbleContent(it as BubbleContent<RoomEvent.Message>)
is RoomEvent.Reply -> ReplyBubbleContent(it as BubbleContent<RoomEvent.Reply>)
}
}
@ -482,7 +482,7 @@ private fun ReplyBubbleContent(content: BubbleContent<RoomEvent.Reply>) {
)
Spacer(modifier = Modifier.height(2.dp))
when (val replyingTo = content.message.replyingTo) {
is Message -> {
is RoomEvent.Message -> {
Text(
text = replyingTo.content,
color = content.textColor().copy(alpha = 0.8f),
@ -525,7 +525,7 @@ private fun ReplyBubbleContent(content: BubbleContent<RoomEvent.Reply>) {
)
}
when (val message = content.message.message) {
is Message -> {
is RoomEvent.Message -> {
Text(
text = message.content,
color = content.textColor(),
@ -642,7 +642,7 @@ private fun TextComposer(state: ComposerState.Text, onTextChange: (String) -> Un
)
}
) {
if (it is Message) {
if (it is RoomEvent.Message) {
Box(Modifier.padding(12.dp)) {
Box(Modifier.padding(8.dp).clickable { replyActions.onDismiss() }.wrapContentWidth().align(Alignment.TopEnd)) {
Icon(

View File

@ -1,8 +1,9 @@
package app.dapk.st.messenger
import app.dapk.st.core.Lce
import app.dapk.st.engine.MessengerState
import app.dapk.st.engine.RoomEvent
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.navigator.MessageAttachment
data class MessengerScreenState(

View File

@ -4,31 +4,24 @@ import androidx.lifecycle.viewModelScope
import app.dapk.st.core.Lce
import app.dapk.st.core.extensions.takeIfContent
import app.dapk.st.domain.application.message.MessageOptionsStore
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.common.EventId
import app.dapk.st.engine.ChatEngine
import app.dapk.st.engine.RoomEvent
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.message.internal.ImageContentReader
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.matrix.sync.RoomStore
import app.dapk.st.navigator.MessageAttachment
import app.dapk.st.viewmodel.DapkViewModel
import app.dapk.st.viewmodel.MutableStateFactory
import app.dapk.st.viewmodel.defaultStateFactory
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import java.time.Clock
internal class MessengerViewModel(
private val chatEngine: ChatEngine,
private val messageService: MessageService,
private val roomService: RoomService,
private val roomStore: RoomStore,
private val credentialsStore: CredentialsStore,
private val observeTimeline: ObserveTimelineUseCase,
private val localIdFactory: LocalIdFactory,
private val imageContentReader: ImageContentReader,
private val messageOptionsStore: MessageOptionsStore,
@ -83,29 +76,10 @@ internal class MessengerViewModel(
private fun start(action: MessengerAction.OnMessengerVisible) {
updateState { copy(roomId = action.roomId, composerState = action.attachments?.let { ComposerState.Attachments(it, null) } ?: composerState) }
syncJob = viewModelScope.launch {
roomStore.markRead(action.roomId)
val credentials = credentialsStore.credentials()!!
var lastKnownReadEvent: EventId? = null
observeTimeline.invoke(action.roomId, credentials.userId).distinctUntilChanged().onEach { state ->
state.latestMessageEventFromOthers(self = credentials.userId)?.let {
if (lastKnownReadEvent != it) {
updateRoomReadStateAsync(latestReadEvent = it, state)
lastKnownReadEvent = it
}
}
updateState { copy(roomState = Lce.Content(state)) }
}.collect()
}
}
private fun CoroutineScope.updateRoomReadStateAsync(latestReadEvent: EventId, state: MessengerState): Deferred<Unit> {
return async {
runCatching {
roomService.markFullyRead(state.roomState.roomOverview.roomId, latestReadEvent, isPrivate = messageOptionsStore.isReadReceiptsDisabled())
roomStore.markRead(state.roomState.roomOverview.roomId)
}
viewModelScope.launch {
syncJob = chatEngine.messages(action.roomId, disableReadReceipts = messageOptionsStore.isReadReceiptsDisabled())
.onEach { updateState { copy(roomState = Lce.Content(it)) } }
.launchIn(this)
}
}
@ -190,12 +164,6 @@ internal class MessengerViewModel(
}
private fun MessengerState.latestMessageEventFromOthers(self: UserId) = this.roomState.events
.filterIsInstance<RoomEvent.Message>()
.filterNot { it.author.id == self }
.firstOrNull()
?.eventId
sealed interface MessengerAction {
data class ComposerTextUpdate(val newValue: String) : MessengerAction
data class ComposerEnterReplyMode(val replyingTo: RoomEvent) : MessengerAction

View File

@ -1,10 +1,8 @@
package app.dapk.st.messenger
package app.dapk.st.engine
import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.sync.MessageMeta
import app.dapk.st.matrix.sync.RoomEvent
internal class LocalEchoMapper(private val metaMapper: MetaMapper) {

View File

@ -0,0 +1,7 @@
package app.dapk.st.engine
import java.util.*
internal class LocalIdFactory {
fun create() = "local.${UUID.randomUUID()}"
}

View File

@ -7,8 +7,11 @@ import app.dapk.st.matrix.auth.AuthService.LoginResult as MatrixLoginResult
import app.dapk.st.matrix.crypto.ImportResult as MatrixImportResult
import app.dapk.st.matrix.room.ProfileService.Me as MatrixMe
import app.dapk.st.matrix.sync.LastMessage as MatrixLastMessage
import app.dapk.st.matrix.sync.MessageMeta as MatrixMessageMeta
import app.dapk.st.matrix.sync.RoomEvent as MatrixRoomEvent
import app.dapk.st.matrix.sync.RoomInvite as MatrixRoomInvite
import app.dapk.st.matrix.sync.RoomOverview as MatrixRoomOverview
import app.dapk.st.matrix.sync.RoomState as MatrixRoomState
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing as MatrixTyping
fun MatrixRoomOverview.engine() = RoomOverview(
@ -76,4 +79,38 @@ fun MatrixImportResult.engine() = when (this) {
is MatrixImportResult.Success -> ImportResult.Success(this.roomIds, this.totalImportedKeysCount)
is MatrixImportResult.Update -> ImportResult.Update(this.importedKeysCount)
}
fun MatrixRoomState.engine() = RoomState(
this.roomOverview.engine(),
this.events.map { it.engine() }
)
fun MatrixRoomEvent.engine(): RoomEvent = when (this) {
is MatrixRoomEvent.Image -> RoomEvent.Image(this.eventId, this.utcTimestamp, this.imageMeta.engine(), this.author, this.meta.engine(), this.edited)
is MatrixRoomEvent.Message -> RoomEvent.Message(this.eventId, this.utcTimestamp, this.content, this.author, this.meta.engine(), this.edited, this.redacted)
is MatrixRoomEvent.Reply -> RoomEvent.Reply(this.message.engine(), this.replyingTo.engine())
}
fun MatrixRoomEvent.Image.ImageMeta.engine() = RoomEvent.Image.ImageMeta(
this.width,
this.height,
this.url,
this.keys?.let { RoomEvent.Image.ImageMeta.Keys(it.k, it.iv, it.v, it.hashes) }
)
fun MatrixMessageMeta.engine() = when (this) {
MatrixMessageMeta.FromServer -> MessageMeta.FromServer
is MatrixMessageMeta.LocalEcho -> MessageMeta.LocalEcho(
this.echoId, when (val echo = this.state) {
is MatrixMessageMeta.LocalEcho.State.Error -> MessageMeta.LocalEcho.State.Error(
echo.message, when (echo.type) {
MatrixMessageMeta.LocalEcho.State.Error.Type.UNKNOWN -> MessageMeta.LocalEcho.State.Error.Type.UNKNOWN
}
)
MatrixMessageMeta.LocalEcho.State.Sending -> MessageMeta.LocalEcho.State.Sending
MatrixMessageMeta.LocalEcho.State.Sent -> MessageMeta.LocalEcho.State.Sent
}
)
}

View File

@ -36,6 +36,7 @@ import java.time.Clock
class MatrixEngine internal constructor(
private val directoryUseCase: Lazy<DirectoryUseCase>,
private val matrix: Lazy<MatrixClient>,
private val timelineUseCase: Lazy<ReadMarkingTimeline>,
) : ChatEngine {
override fun directory() = directoryUseCase.value.state()
@ -43,6 +44,10 @@ class MatrixEngine internal constructor(
return matrix.value.syncService().invites().map { it.map { it.engine() } }
}
override suspend fun messages(roomId: RoomId, disableReadReceipts: Boolean): Flow<MessengerState> {
return timelineUseCase.value.foo(roomId, isReadReceiptsDisabled = disableReadReceipts)
}
override suspend fun login(request: LoginRequest): LoginResult {
return matrix.value.authService().login(request.engine()).engine()
}
@ -116,8 +121,14 @@ class MatrixEngine internal constructor(
roomStore
)
}
val timelineUseCase = unsafeLazy {
val matrix = lazyMatrix.value
val mergeWithLocalEchosUseCase = MergeWithLocalEchosUseCaseImpl(LocalEchoMapper(MetaMapper()))
val timeline = TimelineUseCaseImpl(matrix.syncService(), matrix.messageService(), matrix.roomService(), mergeWithLocalEchosUseCase)
ReadMarkingTimeline(roomStore, credentialsStore, timeline, matrix.roomService())
}
return MatrixEngine(directoryUseCase, lazyMatrix)
return MatrixEngine(directoryUseCase, lazyMatrix, timelineUseCase)
}

View File

@ -1,10 +1,8 @@
package app.dapk.st.messenger
package app.dapk.st.engine
import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.matrix.sync.RoomState
internal typealias MergeWithLocalEchosUseCase = (RoomState, RoomMember, List<MessageService.LocalEcho>) -> RoomState

View File

@ -0,0 +1,55 @@
package app.dapk.st.engine
import app.dapk.st.matrix.common.CredentialsStore
import app.dapk.st.matrix.common.EventId
import app.dapk.st.matrix.common.RoomId
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.RoomEvent
import app.dapk.st.matrix.sync.RoomStore
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.onEach
class ReadMarkingTimeline(
private val roomStore: RoomStore,
private val credentialsStore: CredentialsStore,
private val observeTimelineUseCase: ObserveTimelineUseCase,
private val roomService: RoomService,
) {
suspend fun foo(roomId: RoomId, isReadReceiptsDisabled: Boolean): Flow<MessengerState> {
var lastKnownReadEvent: EventId? = null
val credentials = credentialsStore.credentials()!!
roomStore.markRead(roomId)
return observeTimelineUseCase.invoke(roomId, credentials.userId).distinctUntilChanged().onEach { state ->
state.latestMessageEventFromOthers(self = credentials.userId)?.let {
if (lastKnownReadEvent != it) {
updateRoomReadStateAsync(latestReadEvent = it, state, isReadReceiptsDisabled)
lastKnownReadEvent = it
}
}
}
}
private suspend fun updateRoomReadStateAsync(latestReadEvent: EventId, state: MessengerState, isReadReceiptsDisabled: Boolean): Deferred<*> {
return coroutineScope {
async {
runCatching {
roomService.markFullyRead(state.roomState.roomOverview.roomId, latestReadEvent, isPrivate = isReadReceiptsDisabled)
roomStore.markRead(state.roomState.roomOverview.roomId)
}
}
}
}
}
private fun MessengerState.latestMessageEventFromOthers(self: UserId) = this.roomState.events
.filterIsInstance<RoomEvent.Message>()
.filterNot { it.author.id == self }
.firstOrNull()
?.eventId

View File

@ -1,4 +1,4 @@
package app.dapk.st.messenger
package app.dapk.st.engine
import app.dapk.st.core.extensions.startAndIgnoreEmissions
import app.dapk.st.matrix.common.RoomId
@ -6,10 +6,10 @@ import app.dapk.st.matrix.common.RoomMember
import app.dapk.st.matrix.common.UserId
import app.dapk.st.matrix.message.MessageService
import app.dapk.st.matrix.room.RoomService
import app.dapk.st.matrix.sync.RoomState
import app.dapk.st.matrix.sync.SyncService
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.map
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
@ -37,7 +37,7 @@ internal class TimelineUseCaseImpl(
)
}
},
typing = events.filterIsInstance<SyncService.SyncEvent.Typing>().firstOrNull { it.roomId == roomId },
typing = events.filterIsInstance<SyncService.SyncEvent.Typing>().firstOrNull { it.roomId == roomId }?.engine(),
self = userId,
)
}
@ -45,14 +45,8 @@ internal class TimelineUseCaseImpl(
private fun roomDatasource(roomId: RoomId) = combine(
syncService.startSyncing().startAndIgnoreEmissions(),
syncService.room(roomId)
syncService.room(roomId).map { it.engine() }
) { _, room -> room }
}
private fun UserId.toFallbackMember() = RoomMember(this, displayName = null, avatarUrl = null)
data class MessengerState(
val self: UserId,
val roomState: RoomState,
val typing: SyncService.SyncEvent.Typing?
)