taking into account redactions and updating the message list
This commit is contained in:
parent
5a67275b89
commit
0a0f998785
|
@ -27,10 +27,10 @@ internal class RoomPersistence(
|
||||||
private val coroutineDispatchers: CoroutineDispatchers,
|
private val coroutineDispatchers: CoroutineDispatchers,
|
||||||
) : RoomStore {
|
) : RoomStore {
|
||||||
|
|
||||||
override suspend fun persist(roomId: RoomId, state: RoomState) {
|
override suspend fun persist(roomId: RoomId, events: List<RoomEvent>) {
|
||||||
coroutineDispatchers.withIoContext {
|
coroutineDispatchers.withIoContext {
|
||||||
database.transaction {
|
database.transaction {
|
||||||
state.events.forEach {
|
events.forEach {
|
||||||
database.roomEventQueries.insertRoomEvent(roomId, it)
|
database.roomEventQueries.insertRoomEvent(roomId, it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,9 +38,16 @@ internal class RoomPersistence(
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun remove(rooms: List<RoomId>) {
|
override suspend fun remove(rooms: List<RoomId>) {
|
||||||
coroutineDispatchers
|
coroutineDispatchers.withIoContext {
|
||||||
database.roomEventQueries.transaction {
|
database.roomEventQueries.transaction {
|
||||||
rooms.forEach { database.roomEventQueries.remove(it.value) }
|
rooms.forEach { database.roomEventQueries.remove(it.value) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun remove(eventId: EventId) {
|
||||||
|
coroutineDispatchers.withIoContext {
|
||||||
|
database.roomEventQueries.removeEvent(eventId.value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,4 +36,8 @@ LIMIT 100;
|
||||||
|
|
||||||
remove:
|
remove:
|
||||||
DELETE FROM dbRoomEvent
|
DELETE FROM dbRoomEvent
|
||||||
WHERE room_id = ?;
|
WHERE room_id = ?;
|
||||||
|
|
||||||
|
removeEvent:
|
||||||
|
DELETE FROM dbRoomEvent
|
||||||
|
WHERE event_id = ?;
|
|
@ -35,6 +35,7 @@ sealed class RoomEvent {
|
||||||
@SerialName("meta") override val meta: MessageMeta,
|
@SerialName("meta") override val meta: MessageMeta,
|
||||||
@SerialName("encrypted_content") val encryptedContent: MegOlmV1? = null,
|
@SerialName("encrypted_content") val encryptedContent: MegOlmV1? = null,
|
||||||
@SerialName("edited") val edited: Boolean = false,
|
@SerialName("edited") val edited: Boolean = false,
|
||||||
|
@SerialName("redacted") val redacted: Boolean = false,
|
||||||
) : RoomEvent() {
|
) : RoomEvent() {
|
||||||
|
|
||||||
@Serializable
|
@Serializable
|
||||||
|
|
|
@ -7,8 +7,9 @@ import kotlinx.coroutines.flow.Flow
|
||||||
|
|
||||||
interface RoomStore {
|
interface RoomStore {
|
||||||
|
|
||||||
suspend fun persist(roomId: RoomId, state: RoomState)
|
suspend fun persist(roomId: RoomId, events: List<RoomEvent>)
|
||||||
suspend fun remove(rooms: List<RoomId>)
|
suspend fun remove(rooms: List<RoomId>)
|
||||||
|
suspend fun remove(eventId: EventId)
|
||||||
suspend fun retrieve(roomId: RoomId): RoomState?
|
suspend fun retrieve(roomId: RoomId): RoomState?
|
||||||
fun latest(roomId: RoomId): Flow<RoomState>
|
fun latest(roomId: RoomId): Flow<RoomState>
|
||||||
suspend fun insertUnread(roomId: RoomId, eventIds: List<EventId>)
|
suspend fun insertUnread(roomId: RoomId, eventIds: List<EventId>)
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
package app.dapk.st.matrix.sync.internal.sync
|
package app.dapk.st.matrix.sync.internal.sync
|
||||||
|
|
||||||
import app.dapk.st.matrix.common.MatrixLogTag
|
import app.dapk.st.matrix.common.*
|
||||||
import app.dapk.st.matrix.common.MatrixLogger
|
import app.dapk.st.matrix.sync.RoomEvent
|
||||||
import app.dapk.st.matrix.common.RoomId
|
|
||||||
import app.dapk.st.matrix.common.matrixLog
|
|
||||||
import app.dapk.st.matrix.sync.RoomState
|
import app.dapk.st.matrix.sync.RoomState
|
||||||
import app.dapk.st.matrix.sync.RoomStore
|
import app.dapk.st.matrix.sync.RoomStore
|
||||||
|
|
||||||
|
@ -26,7 +24,7 @@ class RoomDataSource(
|
||||||
logger.matrixLog(MatrixLogTag.SYNC, "no changes, not persisting")
|
logger.matrixLog(MatrixLogTag.SYNC, "no changes, not persisting")
|
||||||
} else {
|
} else {
|
||||||
roomCache[roomId] = newState
|
roomCache[roomId] = newState
|
||||||
roomStore.persist(roomId, newState)
|
roomStore.persist(roomId, newState.events)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,4 +32,35 @@ class RoomDataSource(
|
||||||
roomsLeft.forEach { roomCache.remove(it) }
|
roomsLeft.forEach { roomCache.remove(it) }
|
||||||
roomStore.remove(roomsLeft)
|
roomStore.remove(roomsLeft)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun redact(roomId: RoomId, event: EventId) {
|
||||||
|
val eventToRedactFromCache = roomCache[roomId]?.events?.find { it.eventId == event }
|
||||||
|
val redactedEvent = when {
|
||||||
|
eventToRedactFromCache != null -> {
|
||||||
|
eventToRedactFromCache.redact().also { redacted ->
|
||||||
|
val cachedRoomState = roomCache[roomId]
|
||||||
|
requireNotNull(cachedRoomState)
|
||||||
|
roomCache[roomId] = cachedRoomState.replaceEvent(eventToRedactFromCache, redacted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
else -> roomStore.findEvent(event)?.redact()
|
||||||
|
}
|
||||||
|
|
||||||
|
redactedEvent?.let { roomStore.persist(roomId, listOf(it)) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun RoomEvent.redact() = when (this) {
|
||||||
|
is RoomEvent.Image -> RoomEvent.Message(this.eventId, this.utcTimestamp, "Redacted", this.author, this.meta, redacted = true)
|
||||||
|
is RoomEvent.Message -> RoomEvent.Message(this.eventId, this.utcTimestamp, "Redacted", this.author, this.meta, redacted = true)
|
||||||
|
is RoomEvent.Reply -> RoomEvent.Message(this.eventId, this.utcTimestamp, "Redacted", this.author, this.meta, redacted = true)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun RoomState.replaceEvent(old: RoomEvent, new: RoomEvent): RoomState {
|
||||||
|
val updatedEvents = this.events.toMutableList().apply {
|
||||||
|
remove(old)
|
||||||
|
add(new)
|
||||||
|
}
|
||||||
|
return this.copy(events = updatedEvents)
|
||||||
}
|
}
|
|
@ -21,6 +21,10 @@ internal class RoomProcessor(
|
||||||
val members = roomToProcess.apiSyncRoom.collectMembers(roomToProcess.userCredentials)
|
val members = roomToProcess.apiSyncRoom.collectMembers(roomToProcess.userCredentials)
|
||||||
roomMembersService.insert(roomToProcess.roomId, members)
|
roomMembersService.insert(roomToProcess.roomId, members)
|
||||||
|
|
||||||
|
roomToProcess.apiSyncRoom.timeline.apiTimelineEvents.filterIsInstance<ApiTimelineEvent.RoomRedcation>().forEach {
|
||||||
|
roomDataSource.redact(roomToProcess.roomId, it.redactedId)
|
||||||
|
}
|
||||||
|
|
||||||
val previousState = roomDataSource.read(roomToProcess.roomId)
|
val previousState = roomDataSource.read(roomToProcess.roomId)
|
||||||
|
|
||||||
val (newEvents, distinctEvents) = timelineEventsProcessor.process(
|
val (newEvents, distinctEvents) = timelineEventsProcessor.process(
|
||||||
|
|
|
@ -61,8 +61,6 @@ internal class SyncReducer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
roomDataSource.remove(roomsLeft)
|
|
||||||
|
|
||||||
return ReducerResult(
|
return ReducerResult(
|
||||||
newRooms,
|
newRooms,
|
||||||
(apiRoomsToProcess + roomsWithSideEffects).awaitAll().filterNotNull(),
|
(apiRoomsToProcess + roomsWithSideEffects).awaitAll().filterNotNull(),
|
||||||
|
|
Loading…
Reference in New Issue