Merge branch 'main' of github.com:ouchadam/helium into feature/share-images-via-small-talk
This commit is contained in:
commit
1450a70283
|
@ -129,13 +129,13 @@ ext.Dependencies.with {
|
||||||
ktorContentNegotiation = "io.ktor:ktor-client-content-negotiation:${ktorVer}"
|
ktorContentNegotiation = "io.ktor:ktor-client-content-negotiation:${ktorVer}"
|
||||||
|
|
||||||
coil = "io.coil-kt:coil-compose:2.1.0"
|
coil = "io.coil-kt:coil-compose:2.1.0"
|
||||||
accompanistSystemuicontroller = "com.google.accompanist:accompanist-systemuicontroller:0.24.9-beta"
|
accompanistSystemuicontroller = "com.google.accompanist:accompanist-systemuicontroller:0.24.10-beta"
|
||||||
|
|
||||||
junit = "junit:junit:4.13.2"
|
junit = "junit:junit:4.13.2"
|
||||||
kluent = "org.amshove.kluent:kluent:1.68"
|
kluent = "org.amshove.kluent:kluent:1.68"
|
||||||
mockk = 'io.mockk:mockk:1.12.4'
|
mockk = 'io.mockk:mockk:1.12.4'
|
||||||
|
|
||||||
matrixOlm = "org.matrix.android:olm-sdk:3.2.11"
|
matrixOlm = "org.matrix.android:olm-sdk:3.2.12"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
plugins { id 'kotlin' }
|
applyAndroidLibraryModule(project)
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
compileOnly project(":domains:android:stub")
|
|
||||||
implementation project(':core')
|
implementation project(':core')
|
||||||
implementation project(':domains:android:core')
|
implementation project(':domains:android:core')
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,9 +8,7 @@ import app.dapk.st.matrix.message.MessageService
|
||||||
import app.dapk.st.matrix.room.RoomService
|
import app.dapk.st.matrix.room.RoomService
|
||||||
import app.dapk.st.matrix.sync.*
|
import app.dapk.st.matrix.sync.*
|
||||||
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing
|
import app.dapk.st.matrix.sync.SyncService.SyncEvent.Typing
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.combine
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
|
|
||||||
@JvmInline
|
@JvmInline
|
||||||
value class UnreadCount(val value: Int)
|
value class UnreadCount(val value: Int)
|
||||||
|
@ -32,23 +30,34 @@ class DirectoryUseCase(
|
||||||
) {
|
) {
|
||||||
|
|
||||||
fun state(): Flow<DirectoryState> {
|
fun state(): Flow<DirectoryState> {
|
||||||
return combine(
|
return flow { emit(credentialsStore.credentials()!!.userId) }.flatMapMerge { userId ->
|
||||||
syncService.startSyncing().map { credentialsStore.credentials()!!.userId },
|
combine(
|
||||||
syncService.overview(),
|
overviewDatasource(),
|
||||||
messageService.localEchos(),
|
messageService.localEchos(),
|
||||||
roomStore.observeUnreadCountById(),
|
roomStore.observeUnreadCountById(),
|
||||||
syncService.events()
|
syncService.events()
|
||||||
) { userId, overviewState, localEchos, unread, events ->
|
) { overviewState, localEchos, unread, events ->
|
||||||
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
|
overviewState.mergeWithLocalEchos(localEchos, userId).map { roomOverview ->
|
||||||
RoomFoo(
|
RoomFoo(
|
||||||
overview = roomOverview,
|
overview = roomOverview,
|
||||||
unreadCount = UnreadCount(unread[roomOverview.roomId] ?: 0),
|
unreadCount = UnreadCount(unread[roomOverview.roomId] ?: 0),
|
||||||
typing = events.filterIsInstance<Typing>().firstOrNull { it.roomId == roomOverview.roomId }
|
typing = events.filterIsInstance<Typing>().firstOrNull { it.roomId == roomOverview.roomId }
|
||||||
)
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun overviewDatasource() = combine(
|
||||||
|
syncService.startSyncing().map { false }.onStart { emit(true) },
|
||||||
|
syncService.overview()
|
||||||
|
) { isFirstLoad, overview ->
|
||||||
|
when {
|
||||||
|
isFirstLoad && overview.isEmpty() -> null
|
||||||
|
else -> overview
|
||||||
|
}
|
||||||
|
}.filterNotNull()
|
||||||
|
|
||||||
private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
|
private suspend fun OverviewState.mergeWithLocalEchos(localEchos: Map<RoomId, List<MessageService.LocalEcho>>, userId: UserId): OverviewState {
|
||||||
return when {
|
return when {
|
||||||
localEchos.isEmpty() -> this
|
localEchos.isEmpty() -> this
|
||||||
|
|
|
@ -9,6 +9,8 @@ import app.dapk.st.matrix.sync.RoomState
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.combine
|
import kotlinx.coroutines.flow.combine
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.coroutines.flow.onStart
|
||||||
|
|
||||||
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
internal typealias ObserveTimelineUseCase = (RoomId, UserId) -> Flow<MessengerState>
|
||||||
|
|
||||||
|
@ -21,11 +23,10 @@ internal class TimelineUseCaseImpl(
|
||||||
|
|
||||||
override fun invoke(roomId: RoomId, userId: UserId): Flow<MessengerState> {
|
override fun invoke(roomId: RoomId, userId: UserId): Flow<MessengerState> {
|
||||||
return combine(
|
return combine(
|
||||||
syncService.startSyncing(),
|
roomDatasource(roomId),
|
||||||
syncService.room(roomId),
|
|
||||||
messageService.localEchos(roomId),
|
messageService.localEchos(roomId),
|
||||||
syncService.events()
|
syncService.events()
|
||||||
) { _, roomState, localEchos, events ->
|
) { roomState, localEchos, events ->
|
||||||
MessengerState(
|
MessengerState(
|
||||||
roomState = when {
|
roomState = when {
|
||||||
localEchos.isEmpty() -> roomState
|
localEchos.isEmpty() -> roomState
|
||||||
|
@ -43,6 +44,10 @@ internal class TimelineUseCaseImpl(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun roomDatasource(roomId: RoomId) = combine(
|
||||||
|
syncService.startSyncing().map { false }.onStart { emit(true) },
|
||||||
|
syncService.room(roomId)
|
||||||
|
) { _, room -> room }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun UserId.toFallbackMember() = RoomMember(this, displayName = null, avatarUrl = null)
|
private fun UserId.toFallbackMember() = RoomMember(this, displayName = null, avatarUrl = null)
|
||||||
|
|
|
@ -91,7 +91,7 @@ class TimelineUseCaseTest {
|
||||||
echos: List<MessageService.LocalEcho> = emptyList(),
|
echos: List<MessageService.LocalEcho> = emptyList(),
|
||||||
events: List<SyncService.SyncEvent> = emptyList()
|
events: List<SyncService.SyncEvent> = emptyList()
|
||||||
) {
|
) {
|
||||||
fakeSyncService.givenSyncs()
|
fakeSyncService.givenStartsSyncing()
|
||||||
fakeSyncService.givenRoom(A_ROOM_ID).returns(flowOf(roomState))
|
fakeSyncService.givenRoom(A_ROOM_ID).returns(flowOf(roomState))
|
||||||
fakeMessageService.givenEchos(A_ROOM_ID).returns(flowOf(echos))
|
fakeMessageService.givenEchos(A_ROOM_ID).returns(flowOf(echos))
|
||||||
fakeSyncService.givenEvents(A_ROOM_ID).returns(flowOf(events))
|
fakeSyncService.givenEvents(A_ROOM_ID).returns(flowOf(events))
|
||||||
|
|
|
@ -11,9 +11,7 @@ import app.dapk.st.work.WorkScheduler
|
||||||
import com.google.firebase.messaging.FirebaseMessagingService
|
import com.google.firebase.messaging.FirebaseMessagingService
|
||||||
import com.google.firebase.messaging.RemoteMessage
|
import com.google.firebase.messaging.RemoteMessage
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.flow.combine
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.flow.first
|
|
||||||
import kotlinx.coroutines.flow.firstOrNull
|
|
||||||
|
|
||||||
private var previousJob: Job? = null
|
private var previousJob: Job? = null
|
||||||
|
|
||||||
|
@ -72,7 +70,7 @@ class PushAndroidService : FirebaseMessagingService() {
|
||||||
|
|
||||||
private suspend fun waitForEvent(timeout: Long, eventId: EventId): EventId? {
|
private suspend fun waitForEvent(timeout: Long, eventId: EventId): EventId? {
|
||||||
return withTimeoutOrNull(timeout) {
|
return withTimeoutOrNull(timeout) {
|
||||||
combine(module.syncService().startSyncing(), module.syncService().observeEvent(eventId)) { _, event -> event }
|
combine(module.syncService().startSyncing().startInstantly(), module.syncService().observeEvent(eventId)) { _, event -> event }
|
||||||
.firstOrNull {
|
.firstOrNull {
|
||||||
it == eventId
|
it == eventId
|
||||||
}
|
}
|
||||||
|
@ -81,10 +79,12 @@ class PushAndroidService : FirebaseMessagingService() {
|
||||||
|
|
||||||
private suspend fun waitForUnreadChange(timeout: Long): String? {
|
private suspend fun waitForUnreadChange(timeout: Long): String? {
|
||||||
return withTimeoutOrNull(timeout) {
|
return withTimeoutOrNull(timeout) {
|
||||||
combine(module.syncService().startSyncing(), module.roomStore().observeUnread()) { _, unread -> unread }
|
combine(module.syncService().startSyncing().startInstantly(), module.roomStore().observeUnread()) { _, unread -> unread }
|
||||||
.first()
|
.first()
|
||||||
"ignored"
|
"ignored"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun Flow<Unit>.startInstantly() = this.onStart { emit(Unit) }
|
|
@ -95,7 +95,6 @@ internal class DefaultSyncService(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.onStart {
|
.onStart {
|
||||||
emit(Unit)
|
|
||||||
val subscriptions = syncSubscriptionCount.incrementAndGet()
|
val subscriptions = syncSubscriptionCount.incrementAndGet()
|
||||||
logger.matrixLog(MatrixLogTag.SYNC, "flow onStart - count: $subscriptions")
|
logger.matrixLog(MatrixLogTag.SYNC, "flow onStart - count: $subscriptions")
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,16 @@ import app.dapk.st.matrix.common.matrixLog
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
|
|
||||||
internal class SideEffectFlowIterator(private val logger: MatrixLogger) {
|
internal class SideEffectFlowIterator(private val logger: MatrixLogger) {
|
||||||
suspend fun <T> loop(initial: T?, action: suspend (T?) -> T?) {
|
suspend fun <T> loop(initial: T?, onPost: suspend () -> Unit, onIteration: suspend (T?) -> T?) {
|
||||||
var previousState = initial
|
var previousState = initial
|
||||||
|
|
||||||
while (currentCoroutineContext().isActive) {
|
while (currentCoroutineContext().isActive) {
|
||||||
logger.matrixLog(SYNC, "loop iteration")
|
logger.matrixLog(SYNC, "loop iteration")
|
||||||
try {
|
try {
|
||||||
previousState = withContext(NonCancellable) {
|
previousState = withContext(NonCancellable) {
|
||||||
action(previousState)
|
onIteration(previousState)
|
||||||
}
|
}
|
||||||
|
onPost()
|
||||||
} catch (error: Throwable) {
|
} catch (error: Throwable) {
|
||||||
logger.matrixLog(SYNC, "on loop error: ${error.message}")
|
logger.matrixLog(SYNC, "on loop error: ${error.message}")
|
||||||
error.printStackTrace()
|
error.printStackTrace()
|
||||||
|
|
|
@ -8,12 +8,9 @@ import app.dapk.st.matrix.sync.internal.SideEffectFlowIterator
|
||||||
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
|
import app.dapk.st.matrix.sync.internal.overview.ReducedSyncFilterUseCase
|
||||||
import app.dapk.st.matrix.sync.internal.request.syncRequest
|
import app.dapk.st.matrix.sync.internal.request.syncRequest
|
||||||
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
import app.dapk.st.matrix.sync.internal.room.SyncSideEffects
|
||||||
import kotlinx.coroutines.currentCoroutineContext
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.cancellable
|
import kotlinx.coroutines.flow.cancellable
|
||||||
import kotlinx.coroutines.flow.flow
|
import kotlinx.coroutines.flow.flow
|
||||||
import kotlinx.coroutines.flow.isActive
|
|
||||||
import kotlinx.coroutines.isActive
|
|
||||||
|
|
||||||
internal class SyncUseCase(
|
internal class SyncUseCase(
|
||||||
private val persistence: OverviewStore,
|
private val persistence: OverviewStore,
|
||||||
|
@ -29,50 +26,55 @@ internal class SyncUseCase(
|
||||||
) {
|
) {
|
||||||
|
|
||||||
fun sync(): Flow<Unit> {
|
fun sync(): Flow<Unit> {
|
||||||
return flow<Unit> {
|
return flow {
|
||||||
val credentials = credentialsStore.credentials()!!
|
val credentials = credentialsStore.credentials()!!
|
||||||
val filterId = filterUseCase.reducedFilter(credentials.userId)
|
val filterId = filterUseCase.reducedFilter(credentials.userId)
|
||||||
|
|
||||||
with(flowIterator) {
|
with(flowIterator) {
|
||||||
loop<OverviewState>(initial = null) { previousState ->
|
loop<OverviewState>(
|
||||||
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
|
initial = null,
|
||||||
val response = doSyncRequest(filterId, syncToken)
|
onPost = { emit(Unit) },
|
||||||
if (credentialsStore.isSignedIn()) {
|
onIteration = { onEachSyncIteration(filterId, credentials, previousState = it) }
|
||||||
logger.logP("sync processing") {
|
)
|
||||||
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
|
|
||||||
val sideEffects = logger.logP("side effects processing") {
|
|
||||||
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
|
|
||||||
}
|
|
||||||
|
|
||||||
val isInitialSync = syncToken == null
|
|
||||||
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
|
|
||||||
val overview = nextState.roomState.map { it.roomOverview }
|
|
||||||
|
|
||||||
if (nextState.roomsLeft.isNotEmpty()) {
|
|
||||||
persistence.removeRooms(nextState.roomsLeft)
|
|
||||||
}
|
|
||||||
if (nextState.invites.isNotEmpty()) {
|
|
||||||
persistence.persistInvites(nextState.invites)
|
|
||||||
}
|
|
||||||
if (nextState.newRoomsJoined.isNotEmpty()) {
|
|
||||||
persistence.removeInvites(nextState.newRoomsJoined)
|
|
||||||
}
|
|
||||||
|
|
||||||
when {
|
|
||||||
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
|
|
||||||
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
|
|
||||||
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
|
|
||||||
null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}.cancellable()
|
}.cancellable()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private suspend fun onEachSyncIteration(filterId: SyncService.FilterId, credentials: UserCredentials, previousState: OverviewState?): OverviewState? {
|
||||||
|
val syncToken = syncStore.read(key = SyncStore.SyncKey.Overview)
|
||||||
|
val response = doSyncRequest(filterId, syncToken)
|
||||||
|
return if (credentialsStore.isSignedIn()) {
|
||||||
|
logger.logP("sync processing") {
|
||||||
|
syncStore.store(key = SyncStore.SyncKey.Overview, syncToken = response.nextBatch)
|
||||||
|
val sideEffects = logger.logP("side effects processing") {
|
||||||
|
syncSideEffects.blockingSideEffects(credentials.userId, response, syncToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
val isInitialSync = syncToken == null
|
||||||
|
val nextState = logger.logP("reducing") { syncReducer.reduce(isInitialSync, sideEffects, response, credentials) }
|
||||||
|
val overview = nextState.roomState.map { it.roomOverview }
|
||||||
|
|
||||||
|
if (nextState.roomsLeft.isNotEmpty()) {
|
||||||
|
persistence.removeRooms(nextState.roomsLeft)
|
||||||
|
}
|
||||||
|
if (nextState.invites.isNotEmpty()) {
|
||||||
|
persistence.persistInvites(nextState.invites)
|
||||||
|
}
|
||||||
|
if (nextState.newRoomsJoined.isNotEmpty()) {
|
||||||
|
persistence.removeInvites(nextState.newRoomsJoined)
|
||||||
|
}
|
||||||
|
|
||||||
|
when {
|
||||||
|
previousState == overview -> previousState.also { logger.matrixLog(SYNC, "no changes, not persisting new state") }
|
||||||
|
overview.isNotEmpty() -> overview.also { persistence.persist(overview) }
|
||||||
|
else -> previousState.also { logger.matrixLog(SYNC, "nothing to do") }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.matrixLog(SYNC, "sync processing skipped due to being signed out")
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun doSyncRequest(filterId: SyncService.FilterId, syncToken: SyncToken?) = logger.logP("sync api") {
|
private suspend fun doSyncRequest(filterId: SyncService.FilterId, syncToken: SyncToken?) = logger.logP("sync api") {
|
||||||
client.execute(
|
client.execute(
|
||||||
syncRequest(
|
syncRequest(
|
||||||
|
|
|
@ -4,12 +4,12 @@ import app.dapk.st.matrix.common.RoomId
|
||||||
import app.dapk.st.matrix.sync.SyncService
|
import app.dapk.st.matrix.sync.SyncService
|
||||||
import io.mockk.every
|
import io.mockk.every
|
||||||
import io.mockk.mockk
|
import io.mockk.mockk
|
||||||
import kotlinx.coroutines.flow.flowOf
|
import kotlinx.coroutines.flow.emptyFlow
|
||||||
import test.delegateReturn
|
import test.delegateReturn
|
||||||
|
|
||||||
class FakeSyncService : SyncService by mockk() {
|
class FakeSyncService : SyncService by mockk() {
|
||||||
fun givenSyncs() {
|
fun givenStartsSyncing() {
|
||||||
every { startSyncing() }.returns(flowOf(Unit))
|
every { startSyncing() }.returns(emptyFlow())
|
||||||
}
|
}
|
||||||
|
|
||||||
fun givenRoom(roomId: RoomId) = every { room(roomId) }.delegateReturn()
|
fun givenRoom(roomId: RoomId) = every { room(roomId) }.delegateReturn()
|
||||||
|
|
|
@ -65,31 +65,14 @@ class SmokeTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(4)
|
@Order(4)
|
||||||
fun `can send and receive encrypted messages`() = testAfterInitialSync { alice, bob ->
|
fun `can send and receive clear text messages`() = testTextMessaging(isEncrypted = false)
|
||||||
val message = "from alice to bob : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
|
|
||||||
alice.sendEncryptedMessage(SharedState.sharedRoom, message.content)
|
|
||||||
bob.expectMessage(SharedState.sharedRoom, message)
|
|
||||||
|
|
||||||
val message2 = "from bob to alice : ${System.currentTimeMillis()}".from(SharedState.bob.roomMember)
|
|
||||||
bob.sendEncryptedMessage(SharedState.sharedRoom, message2.content)
|
|
||||||
alice.expectMessage(SharedState.sharedRoom, message2)
|
|
||||||
|
|
||||||
val aliceSecondDevice = TestMatrix(SharedState.alice).also { it.newlogin() }
|
|
||||||
aliceSecondDevice.client.syncService().startSyncing().collectAsync {
|
|
||||||
val message3 = "from alice to bob and alice's second device : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
|
|
||||||
alice.sendEncryptedMessage(SharedState.sharedRoom, message3.content)
|
|
||||||
aliceSecondDevice.expectMessage(SharedState.sharedRoom, message3)
|
|
||||||
bob.expectMessage(SharedState.sharedRoom, message3)
|
|
||||||
|
|
||||||
val message4 = "from alice's second device to bob and alice's first device : ${System.currentTimeMillis()}".from(SharedState.alice.roomMember)
|
|
||||||
aliceSecondDevice.sendEncryptedMessage(SharedState.sharedRoom, message4.content)
|
|
||||||
alice.expectMessage(SharedState.sharedRoom, message4)
|
|
||||||
bob.expectMessage(SharedState.sharedRoom, message4)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Order(5)
|
@Order(5)
|
||||||
|
fun `can send and receive encrypted text messages`() = testTextMessaging(isEncrypted = true)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(6)
|
||||||
fun `can request and verify devices`() = testAfterInitialSync { alice, bob ->
|
fun `can request and verify devices`() = testAfterInitialSync { alice, bob ->
|
||||||
alice.client.cryptoService().verificationAction(Verification.Action.Request(bob.userId(), bob.deviceId()))
|
alice.client.cryptoService().verificationAction(Verification.Action.Request(bob.userId(), bob.deviceId()))
|
||||||
alice.client.cryptoService().verificationState().automaticVerification(alice).expectAsync { it == Verification.State.Done }
|
alice.client.cryptoService().verificationState().automaticVerification(alice).expectAsync { it == Verification.State.Done }
|
||||||
|
@ -110,6 +93,30 @@ class SmokeTest {
|
||||||
|
|
||||||
result shouldBeEqualTo listOf(RoomId(value = "!qOSENTtFUuCEKJSVzl:matrix.org"))
|
result shouldBeEqualTo listOf(RoomId(value = "!qOSENTtFUuCEKJSVzl:matrix.org"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun testTextMessaging(isEncrypted: Boolean) = testAfterInitialSync { alice, bob ->
|
||||||
|
val message = "from alice to bob".from(SharedState.alice.roomMember)
|
||||||
|
alice.sendTextMessage(SharedState.sharedRoom, message.content, isEncrypted)
|
||||||
|
bob.expectTextMessage(SharedState.sharedRoom, message)
|
||||||
|
|
||||||
|
val message2 = "from bob to alice".from(SharedState.bob.roomMember)
|
||||||
|
bob.sendTextMessage(SharedState.sharedRoom, message2.content, isEncrypted)
|
||||||
|
alice.expectTextMessage(SharedState.sharedRoom, message2)
|
||||||
|
|
||||||
|
// Needs investigation
|
||||||
|
// val aliceSecondDevice = testMatrix(SharedState.alice, isTemp = true, withLogging = true).also { it.newlogin() }
|
||||||
|
// aliceSecondDevice.client.syncService().startSyncing().collectAsync {
|
||||||
|
// val message3 = "from alice to bob and alice's second device".from(SharedState.alice.roomMember)
|
||||||
|
// alice.sendTextMessage(SharedState.sharedRoom, message3.content, isEncrypted)
|
||||||
|
// aliceSecondDevice.expectTextMessage(SharedState.sharedRoom, message3)
|
||||||
|
// bob.expectTextMessage(SharedState.sharedRoom, message3)
|
||||||
|
//
|
||||||
|
// val message4 = "from alice's second device to bob and alice's first device".from(SharedState.alice.roomMember)
|
||||||
|
// aliceSecondDevice.sendTextMessage(SharedState.sharedRoom, message4.content, isEncrypted)
|
||||||
|
// alice.expectTextMessage(SharedState.sharedRoom, message4)
|
||||||
|
// bob.expectTextMessage(SharedState.sharedRoom, message4)
|
||||||
|
// }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun createAndRegisterAccount(): TestUser {
|
private suspend fun createAndRegisterAccount(): TestUser {
|
||||||
|
@ -135,7 +142,6 @@ private suspend fun login(user: TestUser) {
|
||||||
.authService()
|
.authService()
|
||||||
.login(AuthService.LoginRequest(userName = user.roomMember.id.value, password = user.password, serverUrl = null))
|
.login(AuthService.LoginRequest(userName = user.roomMember.id.value, password = user.password, serverUrl = null))
|
||||||
|
|
||||||
|
|
||||||
result shouldBeInstanceOf AuthService.LoginResult.Success::class.java
|
result shouldBeInstanceOf AuthService.LoginResult.Success::class.java
|
||||||
(result as AuthService.LoginResult.Success).userCredentials.let { credentials ->
|
(result as AuthService.LoginResult.Success).userCredentials.let { credentials ->
|
||||||
credentials.accessToken shouldNotBeEqualTo null
|
credentials.accessToken shouldNotBeEqualTo null
|
||||||
|
@ -163,7 +169,7 @@ object SharedState {
|
||||||
data class TestUser(val password: String, val roomMember: RoomMember, val homeServer: String)
|
data class TestUser(val password: String, val roomMember: RoomMember, val homeServer: String)
|
||||||
data class TestMessage(val content: String, val author: RoomMember)
|
data class TestMessage(val content: String, val author: RoomMember)
|
||||||
|
|
||||||
fun String.from(roomMember: RoomMember) = TestMessage(this, roomMember)
|
fun String.from(roomMember: RoomMember) = TestMessage("$this - ${UUID.randomUUID()}", roomMember)
|
||||||
|
|
||||||
fun testAfterInitialSync(block: suspend MatrixTestScope.(TestMatrix, TestMatrix) -> Unit) {
|
fun testAfterInitialSync(block: suspend MatrixTestScope.(TestMatrix, TestMatrix) -> Unit) {
|
||||||
restoreLoginAndInitialSync(TestMatrix(SharedState.alice, includeLogging = false), TestMatrix(SharedState.bob, includeLogging = false), block)
|
restoreLoginAndInitialSync(TestMatrix(SharedState.alice, includeLogging = false), TestMatrix(SharedState.bob, includeLogging = false), block)
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
package test
|
package test
|
||||||
|
|
||||||
import TestMessage
|
import TestMessage
|
||||||
|
import TestUser
|
||||||
import app.dapk.st.core.extensions.ifNull
|
import app.dapk.st.core.extensions.ifNull
|
||||||
import app.dapk.st.matrix.common.RoomId
|
import app.dapk.st.matrix.common.RoomId
|
||||||
import app.dapk.st.matrix.message.MessageService
|
import app.dapk.st.matrix.message.MessageService
|
||||||
|
@ -31,6 +32,8 @@ fun restoreLoginAndInitialSync(m1: TestMatrix, m2: TestMatrix, testBody: suspend
|
||||||
println("restore login 2")
|
println("restore login 2")
|
||||||
m2.restoreLogin()
|
m2.restoreLogin()
|
||||||
val testHelper = MatrixTestScope(this)
|
val testHelper = MatrixTestScope(this)
|
||||||
|
testHelper.testMatrix(m1)
|
||||||
|
testHelper.testMatrix(m2)
|
||||||
with(testHelper) {
|
with(testHelper) {
|
||||||
combine(m1.client.syncService().startSyncing(), m2.client.syncService().startSyncing()) { _, _ -> }.collectAsync {
|
combine(m1.client.syncService().startSyncing(), m2.client.syncService().startSyncing()) { _, _ -> }.collectAsync {
|
||||||
m1.client.syncService().overview().first()
|
m1.client.syncService().overview().first()
|
||||||
|
@ -38,6 +41,7 @@ fun restoreLoginAndInitialSync(m1: TestMatrix, m2: TestMatrix, testBody: suspend
|
||||||
testBody(testHelper, m1, m2)
|
testBody(testHelper, m1, m2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
testHelper.release()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +58,7 @@ suspend fun <T> Flow<T>.collectAsync(scope: CoroutineScope, block: suspend () ->
|
||||||
class MatrixTestScope(private val testScope: TestScope) {
|
class MatrixTestScope(private val testScope: TestScope) {
|
||||||
|
|
||||||
private val inProgressExpects = mutableListOf<Deferred<*>>()
|
private val inProgressExpects = mutableListOf<Deferred<*>>()
|
||||||
|
private val inProgressInstances = mutableListOf<TestMatrix>()
|
||||||
|
|
||||||
suspend fun <T> Flow<T>.collectAsync(block: suspend () -> Unit) {
|
suspend fun <T> Flow<T>.collectAsync(block: suspend () -> Unit) {
|
||||||
collectAsync(testScope, block)
|
collectAsync(testScope, block)
|
||||||
|
@ -99,6 +104,7 @@ class MatrixTestScope(private val testScope: TestScope) {
|
||||||
val collected = mutableListOf<T>()
|
val collected = mutableListOf<T>()
|
||||||
val work = testScope.async {
|
val work = testScope.async {
|
||||||
flow.onEach {
|
flow.onEach {
|
||||||
|
println("found: $it")
|
||||||
collected.add(it)
|
collected.add(it)
|
||||||
}.first { it == expected }
|
}.first { it == expected }
|
||||||
}
|
}
|
||||||
|
@ -118,18 +124,20 @@ class MatrixTestScope(private val testScope: TestScope) {
|
||||||
.expect { it.any { it.roomId == roomId } }
|
.expect { it.any { it.roomId == roomId } }
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun TestMatrix.expectMessage(roomId: RoomId, message: TestMessage) {
|
suspend fun TestMatrix.expectTextMessage(roomId: RoomId, message: TestMessage) {
|
||||||
|
println("expecting ${message.content}")
|
||||||
this.client.syncService().room(roomId)
|
this.client.syncService().room(roomId)
|
||||||
.map { it.events.filterIsInstance<RoomEvent.Message>().map { TestMessage(it.content, it.author) }.firstOrNull() }
|
.map { it.events.filterIsInstance<RoomEvent.Message>().map { TestMessage(it.content, it.author) }.firstOrNull() }
|
||||||
.assert(message)
|
.assert(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun TestMatrix.sendEncryptedMessage(roomId: RoomId, content: String) {
|
suspend fun TestMatrix.sendTextMessage(roomId: RoomId, content: String, isEncrypted: Boolean) {
|
||||||
|
println("sending $content")
|
||||||
this.client.messageService().scheduleMessage(
|
this.client.messageService().scheduleMessage(
|
||||||
MessageService.Message.TextMessage(
|
MessageService.Message.TextMessage(
|
||||||
content = MessageService.Message.Content.TextContent(body = content),
|
content = MessageService.Message.Content.TextContent(body = content),
|
||||||
roomId = roomId,
|
roomId = roomId,
|
||||||
sendEncrypted = true,
|
sendEncrypted = isEncrypted,
|
||||||
localId = "local.${UUID.randomUUID()}",
|
localId = "local.${UUID.randomUUID()}",
|
||||||
timestampUtc = System.currentTimeMillis(),
|
timestampUtc = System.currentTimeMillis(),
|
||||||
)
|
)
|
||||||
|
@ -143,4 +151,16 @@ class MatrixTestScope(private val testScope: TestScope) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun testMatrix(user: TestUser, isTemp: Boolean, withLogging: Boolean = false) = TestMatrix(
|
||||||
|
user,
|
||||||
|
temporaryDatabase = isTemp,
|
||||||
|
includeLogging = withLogging
|
||||||
|
).also { inProgressInstances.add(it) }
|
||||||
|
|
||||||
|
fun testMatrix(testMatrix: TestMatrix) = inProgressInstances.add(testMatrix)
|
||||||
|
|
||||||
|
suspend fun release() {
|
||||||
|
inProgressInstances.forEach { it.release() }
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -33,10 +33,10 @@ import app.dapk.st.matrix.sync.internal.room.MessageDecrypter
|
||||||
import app.dapk.st.olm.DeviceKeyFactory
|
import app.dapk.st.olm.DeviceKeyFactory
|
||||||
import app.dapk.st.olm.OlmPersistenceWrapper
|
import app.dapk.st.olm.OlmPersistenceWrapper
|
||||||
import app.dapk.st.olm.OlmWrapper
|
import app.dapk.st.olm.OlmWrapper
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.Dispatchers
|
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
import org.amshove.kluent.fail
|
||||||
import test.impl.InMemoryDatabase
|
import test.impl.InMemoryDatabase
|
||||||
import test.impl.InMemoryPreferences
|
import test.impl.InMemoryPreferences
|
||||||
import test.impl.InstantScheduler
|
import test.impl.InstantScheduler
|
||||||
|
@ -45,6 +45,7 @@ import java.time.Clock
|
||||||
|
|
||||||
class TestMatrix(
|
class TestMatrix(
|
||||||
private val user: TestUser,
|
private val user: TestUser,
|
||||||
|
temporaryDatabase: Boolean = false,
|
||||||
includeHttpLogging: Boolean = false,
|
includeHttpLogging: Boolean = false,
|
||||||
includeLogging: Boolean = false,
|
includeLogging: Boolean = false,
|
||||||
) {
|
) {
|
||||||
|
@ -57,7 +58,10 @@ class TestMatrix(
|
||||||
}
|
}
|
||||||
|
|
||||||
private val preferences = InMemoryPreferences()
|
private val preferences = InMemoryPreferences()
|
||||||
private val database = InMemoryDatabase.realInstance(user.roomMember.id.value)
|
private val database = when (temporaryDatabase) {
|
||||||
|
true -> InMemoryDatabase.temp()
|
||||||
|
false -> InMemoryDatabase.realInstance(user.roomMember.id.value)
|
||||||
|
}
|
||||||
private val coroutineDispatchers = CoroutineDispatchers(
|
private val coroutineDispatchers = CoroutineDispatchers(
|
||||||
Dispatchers.Unconfined,
|
Dispatchers.Unconfined,
|
||||||
main = Dispatchers.Unconfined,
|
main = Dispatchers.Unconfined,
|
||||||
|
@ -261,8 +265,12 @@ class TestMatrix(
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun newlogin() {
|
suspend fun newlogin() {
|
||||||
client.authService()
|
val result = client.authService()
|
||||||
.login(AuthService.LoginRequest(user.roomMember.id.value, user.password, null))
|
.login(AuthService.LoginRequest(user.roomMember.id.value, user.password, null))
|
||||||
|
|
||||||
|
if (result !is AuthService.LoginResult.Success) {
|
||||||
|
fail("Login failed: $result")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun restoreLogin() {
|
suspend fun restoreLogin() {
|
||||||
|
@ -280,6 +288,24 @@ class TestMatrix(
|
||||||
|
|
||||||
suspend fun deviceId() = storeModule.credentialsStore().credentials()!!.deviceId
|
suspend fun deviceId() = storeModule.credentialsStore().credentials()!!.deviceId
|
||||||
suspend fun userId() = storeModule.credentialsStore().credentials()!!.userId
|
suspend fun userId() = storeModule.credentialsStore().credentials()!!.userId
|
||||||
|
|
||||||
|
suspend fun release() {
|
||||||
|
coroutineDispatchers.global.waitForCancel()
|
||||||
|
coroutineDispatchers.io.waitForCancel()
|
||||||
|
coroutineDispatchers.main.waitForCancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun CoroutineDispatcher.waitForCancel() {
|
||||||
|
if (this.isActive) {
|
||||||
|
this.job.cancelAndJoin()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun CoroutineScope.waitForCancel() {
|
||||||
|
if (this.isActive) {
|
||||||
|
this.coroutineContext.job.cancelAndJoin()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class JavaBase64 : Base64 {
|
class JavaBase64 : Base64 {
|
||||||
|
|
|
@ -19,4 +19,11 @@ object InMemoryDatabase {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun temp(): DapkDb {
|
||||||
|
val driver = JdbcSqliteDriver(JdbcSqliteDriver.IN_MEMORY)
|
||||||
|
return DapkDb(driver).also {
|
||||||
|
DapkDb.Schema.create(driver)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue