Merge pull request #2235 from vector-im/feature/bca/perf_better_send_time

Feature/bca/perf better send time
This commit is contained in:
Benoit Marty 2020-10-20 22:12:19 +02:00 committed by GitHub
commit c88c2a17ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 995 additions and 528 deletions

View File

@ -5,7 +5,7 @@ Features ✨:
-
Improvements 🙌:
-
- Rework sending Event management (#154)
Bugfix 🐛:
-

View File

@ -23,7 +23,7 @@ import com.squareup.moshi.JsonClass
data class ReactionInfo(
@Json(name = "rel_type") override val type: String?,
@Json(name = "event_id") override val eventId: String,
val key: String,
@Json(name = "key") val key: String,
// always null for reaction
@Json(name = "m.in_reply_to") override val inReplyTo: ReplyToContent? = null,
@Json(name = "option") override val option: Int? = null

View File

@ -123,11 +123,6 @@ interface SendService {
*/
fun deleteFailedEcho(localEcho: TimelineEvent)
/**
* Delete all the events in one of the sending states
*/
fun clearSendingQueue()
/**
* Cancel sending a specific event. It has to be in one of the sending states
*/

View File

@ -420,7 +420,7 @@ internal class MXMegolmEncryption(
sendToDeviceTask.execute(sendToDeviceParams)
true
} catch (failure: Throwable) {
Timber.v("## CRYPTO | CRYPTO | reshareKey() : fail to send <$sessionId> to $userId:$deviceId")
Timber.e(failure, "## CRYPTO | CRYPTO | reshareKey() : fail to send <$sessionId> to $userId:$deviceId")
false
}
}

View File

@ -17,8 +17,13 @@ package org.matrix.android.sdk.internal.crypto.tasks
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitCallback
@ -38,13 +43,14 @@ internal class DefaultEncryptEventTask @Inject constructor(
private val localEchoRepository: LocalEchoRepository
) : EncryptEventTask {
override suspend fun execute(params: EncryptEventTask.Params): Event {
if (!params.crypto.isRoomEncrypted(params.roomId)) return params.event
// don't want to wait for any query
// if (!params.crypto.isRoomEncrypted(params.roomId)) return params.event
val localEvent = params.event
if (localEvent.eventId == null) {
throw IllegalArgumentException()
}
localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
localEchoRepository.updateSendState(localEvent.eventId, localEvent.roomId, SendState.ENCRYPTING)
val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf()
params.keepKeys?.forEach {
@ -52,6 +58,7 @@ internal class DefaultEncryptEventTask @Inject constructor(
}
// try {
// let it throws
awaitCallback<MXEncryptEventContentResult> {
params.crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, it)
}.let { result ->
@ -63,18 +70,34 @@ internal class DefaultEncryptEventTask @Inject constructor(
}
}
val safeResult = result.copy(eventContent = modifiedContent)
// Better handling of local echo, to avoid decrypting transition on remote echo
// Should I only do it for text messages?
val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) {
MXEventDecryptionResult(
clearEvent = Event(
type = localEvent.type,
content = localEvent.content,
roomId = localEvent.roomId
).toContent(),
forwardingCurve25519KeyChain = emptyList(),
senderCurve25519Key = result.eventContent["sender_key"] as? String,
claimedEd25519Key = params.crypto.getMyDevice().fingerprint()
)
} else {
null
}
localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho ->
localEcho.type = EventType.ENCRYPTED
localEcho.content = ContentMapper.map(modifiedContent)
decryptionLocalEcho?.also {
localEcho.setDecryptionResult(it)
}
}
return localEvent.copy(
type = safeResult.eventType,
content = safeResult.eventContent
)
}
// } catch (throwable: Throwable) {
// val sendState = when (throwable) {
// is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES
// else -> SendState.UNDELIVERED
// }
// localEchoUpdater.updateSendState(localEvent.eventId, sendState)
// throw throwable
// }
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.crypto.tasks
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.task.Task
import javax.inject.Inject
internal interface RedactEventTask : Task<RedactEventTask.Params, String> {
data class Params(
val txID: String,
val roomId: String,
val eventId: String,
val reason: String?
)
}
internal class DefaultRedactEventTask @Inject constructor(
private val roomAPI: RoomAPI,
private val eventBus: EventBus) : RedactEventTask {
override suspend fun execute(params: RedactEventTask.Params): String {
val executeRequest = executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.redactEvent(
txId = params.txID,
roomId = params.roomId,
eventId = params.eventId,
reason = if (params.reason == null) emptyMap() else mapOf("reason" to params.reason)
)
}
return executeRequest.eventId
}
}

View File

@ -15,6 +15,7 @@
*/
package org.matrix.android.sdk.internal.crypto.tasks
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState
@ -23,12 +24,12 @@ import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.task.Task
import org.greenrobot.eventbus.EventBus
import javax.inject.Inject
internal interface SendEventTask : Task<SendEventTask.Params, String> {
data class Params(
val event: Event,
val encrypt: Boolean,
val cryptoService: CryptoService?
)
}
@ -40,11 +41,11 @@ internal class DefaultSendEventTask @Inject constructor(
private val eventBus: EventBus) : SendEventTask {
override suspend fun execute(params: SendEventTask.Params): String {
val event = handleEncryption(params)
val localId = event.eventId!!
try {
localEchoRepository.updateSendState(localId, SendState.SENDING)
val event = handleEncryption(params)
val localId = event.eventId!!
localEchoRepository.updateSendState(localId, params.event.roomId, SendState.SENDING)
val executeRequest = executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(
localId,
@ -53,26 +54,23 @@ internal class DefaultSendEventTask @Inject constructor(
eventType = event.type
)
}
localEchoRepository.updateSendState(localId, SendState.SENT)
localEchoRepository.updateSendState(localId, params.event.roomId, SendState.SENT)
return executeRequest.eventId
} catch (e: Throwable) {
localEchoRepository.updateSendState(localId, SendState.UNDELIVERED)
// localEchoRepository.updateSendState(params.event.eventId!!, SendState.UNDELIVERED)
throw e
}
}
@Throws
private suspend fun handleEncryption(params: SendEventTask.Params): Event {
if (params.cryptoService?.isRoomEncrypted(params.event.roomId ?: "") == true) {
try {
return encryptEventTask.execute(EncryptEventTask.Params(
params.event.roomId ?: "",
params.event,
listOf("m.relates_to"),
params.cryptoService
))
} catch (throwable: Throwable) {
// We said it's ok to send verification request in clear
}
if (params.encrypt && !params.event.isEncrypted()) {
return encryptEventTask.execute(EncryptEventTask.Params(
params.event.roomId ?: "",
params.event,
listOf("m.relates_to"),
params.cryptoService!!
))
}
return params.event
}

View File

@ -44,7 +44,7 @@ internal class DefaultSendVerificationMessageTask @Inject constructor(
val localId = event.eventId!!
try {
localEchoRepository.updateSendState(localId, SendState.SENDING)
localEchoRepository.updateSendState(localId, event.roomId, SendState.SENDING)
val executeRequest = executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(
localId,
@ -53,10 +53,10 @@ internal class DefaultSendVerificationMessageTask @Inject constructor(
eventType = event.type
)
}
localEchoRepository.updateSendState(localId, SendState.SENT)
localEchoRepository.updateSendState(localId, event.roomId, SendState.SENT)
return executeRequest.eventId
} catch (e: Throwable) {
localEchoRepository.updateSendState(localId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(localId, event.roomId, SendState.UNDELIVERED)
throw e
}
}

View File

@ -64,6 +64,7 @@ import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.session.room.timeline.TimelineEventDecryptor
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
import org.matrix.android.sdk.internal.session.sync.job.SyncThread
@ -121,7 +122,8 @@ internal class DefaultSession @Inject constructor(
private val taskExecutor: TaskExecutor,
private val callSignalingService: Lazy<CallSignalingService>,
@UnauthenticatedWithCertificate
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>,
private val eventSenderProcessor: EventSenderProcessor
) : Session,
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
@ -161,6 +163,7 @@ internal class DefaultSession @Inject constructor(
}
eventBus.register(this)
timelineEventDecryptor.start()
eventSenderProcessor.start()
}
override fun requireBackgroundSync() {
@ -204,6 +207,7 @@ internal class DefaultSession @Inject constructor(
cryptoService.get().close()
isOpen = false
eventBus.unregister(this)
eventSenderProcessor.interrupt()
}
override fun getSyncStateLive() = getSyncThread().liveState()

View File

@ -45,7 +45,6 @@ import org.matrix.android.sdk.internal.session.pushers.AddHttpPusherWorker
import org.matrix.android.sdk.internal.session.pushers.PushersModule
import org.matrix.android.sdk.internal.session.room.RoomModule
import org.matrix.android.sdk.internal.session.room.relation.SendRelationWorker
import org.matrix.android.sdk.internal.session.room.send.EncryptEventWorker
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker
import org.matrix.android.sdk.internal.session.room.send.SendEventWorker
@ -109,8 +108,6 @@ internal interface SessionComponent {
fun inject(worker: SendRelationWorker)
fun inject(worker: EncryptEventWorker)
fun inject(worker: MultipleEventSendingDispatcherWorker)
fun inject(worker: RedactEventWorker)

View File

@ -43,6 +43,8 @@ import org.matrix.android.sdk.api.session.securestorage.SharedSecretStorageServi
import org.matrix.android.sdk.api.session.typing.TypingUsersTracker
import org.matrix.android.sdk.internal.crypto.crosssigning.ShieldTrustUpdater
import org.matrix.android.sdk.internal.crypto.secrets.DefaultSharedSecretStorageService
import org.matrix.android.sdk.internal.crypto.tasks.DefaultRedactEventTask
import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask
import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor
import org.matrix.android.sdk.internal.database.DatabaseCleaner
import org.matrix.android.sdk.internal.database.EventInsertLiveObserver
@ -367,4 +369,7 @@ internal abstract class SessionModule {
@Binds
abstract fun bindTypingUsersTracker(tracker: DefaultTypingUsersTracker): TypingUsersTracker
@Binds
abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask
}

View File

@ -36,8 +36,8 @@ import org.matrix.android.sdk.api.util.NoOpCancellable
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.session.call.model.MxCallImpl
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.send.RoomEventSender
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import timber.log.Timber
@ -50,7 +50,7 @@ internal class DefaultCallSignalingService @Inject constructor(
private val userId: String,
private val activeCallHandler: ActiveCallHandler,
private val localEchoEventFactory: LocalEchoEventFactory,
private val roomEventSender: RoomEventSender,
private val eventSenderProcessor: EventSenderProcessor,
private val taskExecutor: TaskExecutor,
private val turnServerTask: GetTurnServerTask
) : CallSignalingService {
@ -103,7 +103,7 @@ internal class DefaultCallSignalingService @Inject constructor(
otherUserId = otherUserId,
isVideoCall = isVideoCall,
localEchoEventFactory = localEchoEventFactory,
roomEventSender = roomEventSender
eventSenderProcessor = eventSenderProcessor
)
activeCallHandler.addCall(call).also {
return call
@ -165,7 +165,7 @@ internal class DefaultCallSignalingService @Inject constructor(
otherUserId = event.senderId ?: return@let,
isVideoCall = content.isVideo(),
localEchoEventFactory = localEchoEventFactory,
roomEventSender = roomEventSender
eventSenderProcessor = eventSenderProcessor
)
activeCallHandler.addCall(incomingCall)
onCallInvite(incomingCall, content)

View File

@ -29,8 +29,8 @@ import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent
import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent
import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent
import org.matrix.android.sdk.internal.session.call.DefaultCallSignalingService
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.send.RoomEventSender
import org.webrtc.IceCandidate
import org.webrtc.SessionDescription
import timber.log.Timber
@ -43,7 +43,7 @@ internal class MxCallImpl(
override val otherUserId: String,
override val isVideoCall: Boolean,
private val localEchoEventFactory: LocalEchoEventFactory,
private val roomEventSender: RoomEventSender
private val eventSenderProcessor: EventSenderProcessor
) : MxCall {
override var state: CallState = CallState.Idle
@ -91,7 +91,7 @@ internal class MxCallImpl(
offer = CallInviteContent.Offer(sdp = sdp.description)
)
.let { createEventAndLocalEcho(type = EventType.CALL_INVITE, roomId = roomId, content = it.toContent()) }
.also { roomEventSender.sendEvent(it) }
.also { eventSenderProcessor.postEvent(it) }
}
override fun sendLocalIceCandidates(candidates: List<IceCandidate>) {
@ -106,7 +106,7 @@ internal class MxCallImpl(
}
)
.let { createEventAndLocalEcho(type = EventType.CALL_CANDIDATES, roomId = roomId, content = it.toContent()) }
.also { roomEventSender.sendEvent(it) }
.also { eventSenderProcessor.postEvent(it) }
}
override fun sendLocalIceCandidateRemovals(candidates: List<IceCandidate>) {
@ -119,7 +119,7 @@ internal class MxCallImpl(
callId = callId
)
.let { createEventAndLocalEcho(type = EventType.CALL_HANGUP, roomId = roomId, content = it.toContent()) }
.also { roomEventSender.sendEvent(it) }
.also { eventSenderProcessor.postEvent(it) }
state = CallState.Terminated
}
@ -132,7 +132,7 @@ internal class MxCallImpl(
answer = CallAnswerContent.Answer(sdp = sdp.description)
)
.let { createEventAndLocalEcho(type = EventType.CALL_ANSWER, roomId = roomId, content = it.toContent()) }
.also { roomEventSender.sendEvent(it) }
.also { eventSenderProcessor.postEvent(it) }
}
private fun createEventAndLocalEcho(localId: String = LocalEcho.createLocalEchoId(), type: String, roomId: String, content: Content): Event {

View File

@ -17,7 +17,6 @@ package org.matrix.android.sdk.internal.session.room.relation
import androidx.lifecycle.LiveData
import androidx.lifecycle.Transformations
import androidx.work.OneTimeWorkRequest
import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import com.zhuinden.monarchy.Monarchy
@ -39,21 +38,18 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.session.room.send.EncryptEventWorker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker
import org.matrix.android.sdk.internal.session.room.send.SendEventWorker
import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import org.matrix.android.sdk.internal.util.fetchCopyMap
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import timber.log.Timber
internal class DefaultRelationService @AssistedInject constructor(
@Assisted private val roomId: String,
@SessionId private val sessionId: String,
private val timeLineSendEventWorkCommon: TimelineSendEventWorkCommon,
// private val timeLineSendEventWorkCommon: TimelineSendEventWorkCommon,
private val eventSenderProcessor: EventSenderProcessor,
private val eventFactory: LocalEchoEventFactory,
private val cryptoService: CryptoService,
private val findReactionEventForUndoTask: FindReactionEventForUndoTask,
@ -83,8 +79,7 @@ internal class DefaultRelationService @AssistedInject constructor(
.none { it.addedByMe && it.key == reaction }) {
val event = eventFactory.createReactionEvent(roomId, targetEventId, reaction)
.also { saveLocalEcho(it) }
val sendRelationWork = createSendEventWork(event, true)
timeLineSendEventWorkCommon.postWork(roomId, sendRelationWork)
return eventSenderProcessor.postEvent(event, false /* reaction are not encrypted*/)
} else {
Timber.w("Reaction already added")
NoOpCancellable
@ -107,9 +102,7 @@ internal class DefaultRelationService @AssistedInject constructor(
data.redactEventId?.let { toRedact ->
val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null)
.also { saveLocalEcho(it) }
val redactWork = createRedactEventWork(redactEvent, toRedact, null)
timeLineSendEventWorkCommon.postWork(roomId, redactWork)
eventSenderProcessor.postRedaction(redactEvent, null)
}
}
}
@ -121,18 +114,6 @@ internal class DefaultRelationService @AssistedInject constructor(
.executeBy(taskExecutor)
}
// TODO duplicate with send service?
private fun createRedactEventWork(localEvent: Event, eventId: String, reason: String?): OneTimeWorkRequest {
val sendContentWorkerParams = RedactEventWorker.Params(
sessionId,
localEvent.eventId!!,
roomId,
eventId,
reason)
val redactWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timeLineSendEventWorkCommon.createWork<RedactEventWorker>(redactWorkData, true)
}
override fun editTextMessage(targetEventId: String,
msgType: String,
newBodyText: CharSequence,
@ -141,14 +122,7 @@ internal class DefaultRelationService @AssistedInject constructor(
val event = eventFactory
.createReplaceTextEvent(roomId, targetEventId, newBodyText, newBodyAutoMarkdown, msgType, compatibilityBodyText)
.also { saveLocalEcho(it) }
return if (cryptoService.isRoomEncrypted(roomId)) {
val encryptWork = createEncryptEventWork(event, listOf("m.relates_to"))
val workRequest = createSendEventWork(event, false)
timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest)
} else {
val workRequest = createSendEventWork(event, true)
timeLineSendEventWorkCommon.postWork(roomId, workRequest)
}
return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId))
}
override fun editReply(replyToEdit: TimelineEvent,
@ -165,14 +139,7 @@ internal class DefaultRelationService @AssistedInject constructor(
compatibilityBodyText
)
.also { saveLocalEcho(it) }
return if (cryptoService.isRoomEncrypted(roomId)) {
val encryptWork = createEncryptEventWork(event, listOf("m.relates_to"))
val workRequest = createSendEventWork(event, false)
timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest)
} else {
val workRequest = createSendEventWork(event, true)
timeLineSendEventWorkCommon.postWork(roomId, workRequest)
}
return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId))
}
override fun fetchEditHistory(eventId: String, callback: MatrixCallback<List<Event>>) {
@ -189,27 +156,7 @@ internal class DefaultRelationService @AssistedInject constructor(
?.also { saveLocalEcho(it) }
?: return null
return if (cryptoService.isRoomEncrypted(roomId)) {
val encryptWork = createEncryptEventWork(event, listOf("m.relates_to"))
val workRequest = createSendEventWork(event, false)
timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest)
} else {
val workRequest = createSendEventWork(event, true)
timeLineSendEventWorkCommon.postWork(roomId, workRequest)
}
}
private fun createEncryptEventWork(event: Event, keepKeys: List<String>?): OneTimeWorkRequest {
// Same parameter
val params = EncryptEventWorker.Params(sessionId, event.eventId!!, keepKeys)
val sendWorkData = WorkerParamsFactory.toData(params)
return timeLineSendEventWorkCommon.createWork<EncryptEventWorker>(sendWorkData, true)
}
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timeLineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)
return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId))
}
override fun getEventAnnotationsSummary(eventId: String): EventAnnotationsSummary? {

View File

@ -48,10 +48,9 @@ import org.matrix.android.sdk.api.util.NoOpCancellable
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.content.UploadContentWorker
import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.CancelableWork
import org.matrix.android.sdk.internal.worker.AlwaysSuccessfulWorker
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.startChain
import timber.log.Timber
@ -63,13 +62,12 @@ private const val UPLOAD_WORK = "UPLOAD_WORK"
internal class DefaultSendService @AssistedInject constructor(
@Assisted private val roomId: String,
private val workManagerProvider: WorkManagerProvider,
private val timelineSendEventWorkCommon: TimelineSendEventWorkCommon,
@SessionId private val sessionId: String,
private val localEchoEventFactory: LocalEchoEventFactory,
private val cryptoService: CryptoService,
private val taskExecutor: TaskExecutor,
private val localEchoRepository: LocalEchoRepository,
private val roomEventSender: RoomEventSender,
private val eventSenderProcessor: EventSenderProcessor,
private val cancelSendTracker: CancelSendTracker
) : SendService {
@ -92,19 +90,6 @@ internal class DefaultSendService @AssistedInject constructor(
.let { sendEvent(it) }
}
// For test only
private fun sendTextMessages(text: CharSequence, msgType: String, autoMarkdown: Boolean, times: Int): Cancelable {
return CancelableBag().apply {
// Send the event several times
repeat(times) { i ->
localEchoEventFactory.createTextEvent(roomId, msgType, "$text - $i", autoMarkdown)
.also { createLocalEcho(it) }
.let { sendEvent(it) }
.also { add(it) }
}
}
}
override fun sendFormattedTextMessage(text: String, formattedText: String, msgType: String): Cancelable {
return localEchoEventFactory.createFormattedTextEvent(roomId, TextContent(text, formattedText), msgType)
.also { createLocalEcho(it) }
@ -133,13 +118,14 @@ internal class DefaultSendService @AssistedInject constructor(
override fun redactEvent(event: Event, reason: String?): Cancelable {
// TODO manage media/attachements?
return createRedactEventWork(event, reason)
.let { timelineSendEventWorkCommon.postWork(roomId, it) }
val redactionEcho = localEchoEventFactory.createRedactEvent(roomId, event.eventId!!, reason)
.also { createLocalEcho(it) }
return eventSenderProcessor.postRedaction(redactionEcho, reason)
}
override fun resendTextMessage(localEcho: TimelineEvent): Cancelable {
if (localEcho.root.isTextMessage() && localEcho.root.sendState.hasFailed()) {
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
return sendEvent(localEcho.root)
}
return NoOpCancellable
@ -153,7 +139,7 @@ internal class DefaultSendService @AssistedInject constructor(
val url = messageContent.getFileUrl() ?: return NoOpCancellable
if (url.startsWith("mxc://")) {
// We need to resend only the message as the attachment is ok
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
return sendEvent(localEcho.root)
}
@ -170,7 +156,7 @@ internal class DefaultSendService @AssistedInject constructor(
queryUri = Uri.parse(messageContent.url),
type = ContentAttachmentData.Type.IMAGE
)
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
internalSendMedia(listOf(localEcho.root), attachmentData, true)
}
is MessageVideoContent -> {
@ -184,7 +170,7 @@ internal class DefaultSendService @AssistedInject constructor(
queryUri = Uri.parse(messageContent.url),
type = ContentAttachmentData.Type.VIDEO
)
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
internalSendMedia(listOf(localEcho.root), attachmentData, true)
}
is MessageFileContent -> {
@ -195,7 +181,7 @@ internal class DefaultSendService @AssistedInject constructor(
queryUri = Uri.parse(messageContent.url),
type = ContentAttachmentData.Type.FILE
)
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
internalSendMedia(listOf(localEcho.root), attachmentData, true)
}
is MessageAudioContent -> {
@ -207,7 +193,7 @@ internal class DefaultSendService @AssistedInject constructor(
queryUri = Uri.parse(messageContent.url),
type = ContentAttachmentData.Type.AUDIO
)
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT)
internalSendMedia(listOf(localEcho.root), attachmentData, true)
}
else -> NoOpCancellable
@ -222,25 +208,6 @@ internal class DefaultSendService @AssistedInject constructor(
}
}
override fun clearSendingQueue() {
timelineSendEventWorkCommon.cancelAllWorks(roomId)
workManagerProvider.workManager.cancelUniqueWork(buildWorkName(UPLOAD_WORK))
// Replace the worker chains with a AlwaysSuccessfulWorker, to ensure the queues are well emptied
workManagerProvider.matrixOneTimeWorkRequestBuilder<AlwaysSuccessfulWorker>()
.build().let {
timelineSendEventWorkCommon.postWork(roomId, it, ExistingWorkPolicy.REPLACE)
// need to clear also image sending queue
workManagerProvider.workManager
.beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.REPLACE, it)
.enqueue()
}
taskExecutor.executorScope.launch {
localEchoRepository.clearSendingQueue(roomId)
}
}
override fun cancelSend(eventId: String) {
cancelSendTracker.markLocalEchoForCancel(eventId, roomId)
taskExecutor.executorScope.launch {
@ -262,13 +229,6 @@ internal class DefaultSendService @AssistedInject constructor(
}
}
// override fun failAllPendingMessages() {
// taskExecutor.executorScope.launch {
// val eventsToResend = localEchoRepository.getAllEventsWithStates(roomId, SendState.PENDING_STATES)
// localEchoRepository.updateSendState(roomId, eventsToResend.map { it.eventId }, SendState.UNDELIVERED)
// }
// }
override fun sendMedia(attachment: ContentAttachmentData,
compressBeforeSending: Boolean,
roomIds: Set<String>): Cancelable {
@ -301,7 +261,7 @@ internal class DefaultSendService @AssistedInject constructor(
val dispatcherWork = createMultipleEventDispatcherWork(isRoomEncrypted)
workManagerProvider.workManager
.beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.APPEND, uploadWork)
.beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.APPEND_OR_REPLACE, uploadWork)
.then(dispatcherWork)
.enqueue()
.also { operation ->
@ -322,7 +282,7 @@ internal class DefaultSendService @AssistedInject constructor(
}
private fun sendEvent(event: Event): Cancelable {
return roomEventSender.sendEvent(event)
return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(event.roomId!!))
}
private fun createLocalEcho(event: Event) {
@ -333,28 +293,6 @@ internal class DefaultSendService @AssistedInject constructor(
return "${roomId}_$identifier"
}
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter
return EncryptEventWorker.Params(sessionId, event.eventId ?: "")
.let { WorkerParamsFactory.toData(it) }
.let {
workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(it)
.startChain(startChain)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS)
.build()
}
}
private fun createRedactEventWork(event: Event, reason: String?): OneTimeWorkRequest {
return localEchoEventFactory.createRedactEvent(roomId, event.eventId!!, reason)
.also { createLocalEcho(it) }
.let { RedactEventWorker.Params(sessionId, it.eventId!!, roomId, event.eventId, reason) }
.let { WorkerParamsFactory.toData(it) }
.let { timelineSendEventWorkCommon.createWork<RedactEventWorker>(it, true) }
}
private fun createUploadMediaWork(allLocalEchos: List<Event>,
attachment: ContentAttachmentData,
isRoomEncrypted: Boolean,

View File

@ -1,146 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.util.awaitCallback
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import timber.log.Timber
import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : Always [SendEventWorker]
*/
internal class EncryptEventWorker(context: Context, params: WorkerParameters)
: SessionSafeCoroutineWorker<EncryptEventWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val eventId: String,
/** Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to) */
val keepKeys: List<String>? = null,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject lateinit var crypto: CryptoService
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var cancelSendTracker: CancelSendTracker
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
Timber.v("## SendEvent: Start Encrypt work for event ${params.eventId}")
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent?.eventId == null) {
return Result.success()
}
if (cancelSendTracker.isCancelRequestedFor(localEvent.eventId, localEvent.roomId)) {
return Result.success()
.also { Timber.e("## SendEvent: Event sending has been cancelled ${localEvent.eventId}") }
}
localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf()
params.keepKeys?.forEach {
localMutableContent.remove(it)
}
var error: Throwable? = null
var result: MXEncryptEventContentResult? = null
try {
result = awaitCallback {
crypto.encryptEventContent(localMutableContent, localEvent.type, localEvent.roomId!!, it)
}
} catch (throwable: Throwable) {
error = throwable
}
if (result != null) {
val modifiedContent = HashMap(result.eventContent)
params.keepKeys?.forEach { toKeep ->
localEvent.content?.get(toKeep)?.let {
// put it back in the encrypted thing
modifiedContent[toKeep] = it
}
}
// Better handling of local echo, to avoid decrypting transition on remote echo
// Should I only do it for text messages?
val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) {
MXEventDecryptionResult(
clearEvent = Event(
type = localEvent.type,
content = localEvent.content,
roomId = localEvent.roomId
).toContent(),
forwardingCurve25519KeyChain = emptyList(),
senderCurve25519Key = result.eventContent["sender_key"] as? String,
claimedEd25519Key = crypto.getMyDevice().fingerprint()
)
} else {
null
}
localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho ->
localEcho.type = EventType.ENCRYPTED
localEcho.content = ContentMapper.map(modifiedContent)
decryptionLocalEcho?.also {
localEcho.setDecryptionResult(it)
}
}
val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, eventId = params.eventId)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
} else {
val sendState = when (error) {
is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES
else -> SendState.UNDELIVERED
}
localEchoRepository.updateSendState(localEvent.eventId, sendState)
// always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params(
sessionId = params.sessionId,
eventId = localEvent.eventId,
lastFailureMessage = error?.localizedMessage ?: "Error"
)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -88,8 +88,9 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
}
}
fun updateSendState(eventId: String, sendState: SendState) {
fun updateSendState(eventId: String, roomId: String?, sendState: SendState) {
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}")
eventBus.post(DefaultTimeline.OnLocalEchoUpdated(roomId ?: "", eventId, sendState))
updateEchoAsync(eventId) { realm, sendingEventEntity ->
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
@ -137,6 +138,14 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
}
}
fun deleteFailedEchoAsync(roomId: String, eventId: String?) {
monarchy.runTransactionSync { realm ->
TimelineEventEntity.where(realm, roomId = roomId, eventId = eventId ?: "").findFirst()?.deleteFromRealm()
EventEntity.where(realm, eventId = eventId ?: "").findFirst()?.deleteFromRealm()
roomSummaryUpdater.updateSendingInformation(realm, roomId)
}
}
suspend fun clearSendingQueue(roomId: String) {
monarchy.awaitTransaction { realm ->
TimelineEventEntity

View File

@ -17,7 +17,6 @@
package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.BackoffPolicy
import androidx.work.OneTimeWorkRequest
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
@ -31,7 +30,6 @@ import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.startChain
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
/**
@ -57,7 +55,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
override fun doOnError(params: Params): Result {
params.localEchoIds.forEach { localEchoIds ->
localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(localEchoIds.eventId, localEchoIds.roomId, SendState.UNDELIVERED)
}
return super.doOnError(params)
@ -73,19 +71,10 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
params.localEchoIds.forEach { localEchoIds ->
val roomId = localEchoIds.roomId
val eventId = localEchoIds.eventId
if (params.isEncrypted) {
localEchoRepository.updateSendState(eventId, SendState.ENCRYPTING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event $eventId")
val encryptWork = createEncryptEventWork(params.sessionId, eventId, true)
// Note that event will be replaced by the result of the previous work
val sendWork = createSendEventWork(params.sessionId, eventId, false)
timelineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, sendWork)
} else {
localEchoRepository.updateSendState(eventId, SendState.SENDING)
localEchoRepository.updateSendState(eventId, roomId, SendState.SENDING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event $eventId")
val sendWork = createSendEventWork(params.sessionId, eventId, true)
timelineSendEventWorkCommon.postWork(roomId, sendWork)
}
}
return Result.success()
@ -95,18 +84,6 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private fun createEncryptEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val params = EncryptEventWorker.Params(sessionId, eventId)
val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(sendWorkData)
.startChain(startChain)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS)
.build()
}
private fun createSendEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = eventId)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)

View File

@ -1,75 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send
import androidx.work.BackoffPolicy
import androidx.work.OneTimeWorkRequest
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.startChain
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
internal class RoomEventSender @Inject constructor(
private val workManagerProvider: WorkManagerProvider,
private val timelineSendEventWorkCommon: TimelineSendEventWorkCommon,
@SessionId private val sessionId: String,
private val cryptoService: CryptoService
) {
fun sendEvent(event: Event): Cancelable {
// Encrypted room handling
return if (cryptoService.isRoomEncrypted(event.roomId ?: "")
&& !event.isEncrypted() // In case of resend where it's already encrypted so skip to send
) {
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}")
val encryptWork = createEncryptEventWork(event, true)
// Note that event will be replaced by the result of the previous work
val sendWork = createSendEventWork(event, false)
timelineSendEventWorkCommon.postSequentialWorks(event.roomId ?: "", encryptWork, sendWork)
} else {
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}")
val sendWork = createSendEventWork(event, true)
timelineSendEventWorkCommon.postWork(event.roomId ?: "", sendWork)
}
}
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter
val params = EncryptEventWorker.Params(sessionId, event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(sendWorkData)
.startChain(startChain)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS)
.build()
}
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)
}
}

View File

@ -22,12 +22,11 @@ import com.squareup.moshi.JsonClass
import io.realm.RealmConfiguration
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import timber.log.Timber
@ -47,11 +46,14 @@ internal class SendEventWorker(context: Context,
internal data class Params(
override val sessionId: String,
override val lastFailureMessage: String? = null,
val eventId: String
val eventId: String,
// use this as an override if you want to send in clear in encrypted room
val isEncrypted: Boolean? = null
) : SessionWorkerParams
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var sendEventTask: SendEventTask
@Inject lateinit var cryptoService: CryptoService
@Inject lateinit var eventBus: EventBus
@Inject lateinit var cancelSendTracker: CancelSendTracker
@SessionDatabase @Inject lateinit var realmConfiguration: RealmConfiguration
@ -63,7 +65,7 @@ internal class SendEventWorker(context: Context,
override suspend fun doSafeWork(params: Params): Result {
val event = localEchoRepository.getUpToDateEcho(params.eventId)
if (event?.eventId == null || event.roomId == null) {
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(params.eventId, event?.roomId, SendState.UNDELIVERED)
return Result.success()
.also { Timber.e("Work cancelled due to bad input data") }
}
@ -77,7 +79,7 @@ internal class SendEventWorker(context: Context,
}
if (params.lastFailureMessage != null) {
localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(event.eventId, event.roomId, SendState.UNDELIVERED)
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
@ -85,12 +87,12 @@ internal class SendEventWorker(context: Context,
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Send event ${params.eventId}")
return try {
sendEvent(event.eventId, event.roomId, event.type, event.content)
sendEventTask.execute(SendEventTask.Params(event, params.isEncrypted ?: cryptoService.isRoomEncrypted(event.roomId), cryptoService))
Result.success()
} catch (exception: Throwable) {
if (/*currentAttemptCount >= MAX_NUMBER_OF_RETRY_BEFORE_FAILING ||**/ !exception.shouldBeRetried()) {
Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed cannot retry ${params.eventId} > ${exception.localizedMessage}")
localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(event.eventId, event.roomId, SendState.UNDELIVERED)
return Result.success()
} else {
Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed schedule retry ${params.eventId} > ${exception.localizedMessage}")
@ -102,12 +104,4 @@ internal class SendEventWorker(context: Context,
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun sendEvent(eventId: String, roomId: String, type: String, content: Content?) {
localEchoRepository.updateSendState(eventId, SendState.SENDING)
executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(eventId, roomId, type, content)
}
localEchoRepository.updateSendState(eventId, SendState.SENT)
}
}

View File

@ -0,0 +1,239 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.MatrixError
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.sync.SyncState
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.task.TaskExecutor
import timber.log.Timber
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.Socket
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.LinkedBlockingQueue
import javax.inject.Inject
import kotlin.concurrent.schedule
/**
* A simple ever running thread unique for that session responsible of sending events in order.
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
* periodically test reachability before resume (does not count as a retry)
*
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
*/
@SessionScope
internal class EventSenderProcessor @Inject constructor(
private val cryptoService: CryptoService,
private val sessionParams: SessionParams,
private val queuedTaskFactory: QueuedTaskFactory,
private val taskExecutor: TaskExecutor,
private val memento: QueueMemento
) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") {
private fun markAsManaged(task: QueuedTask) {
memento.track(task)
}
private fun markAsFinished(task: QueuedTask) {
memento.unTrack(task)
}
// API
fun postEvent(event: Event): Cancelable {
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
}
override fun start() {
super.start()
// We should check for sending events not handled because app was killed
// But we should be careful of only took those that was submitted to us, because if it's
// for example it's a media event it is handled by some worker and he will handle it
// This is a bit fragile :/
// also some events cannot be retried manually by users, e.g reactions
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
tryOrNull {
taskExecutor.executorScope.launch {
Timber.d("## Send relaunched pending events on restart")
memento.restoreTasks(this@EventSenderProcessor)
}
}
}
fun postEvent(event: Event, encrypt: Boolean): Cancelable {
val task = queuedTaskFactory.createSendTask(event, encrypt)
return postTask(task)
}
fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
}
fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
return postTask(task)
}
fun postTask(task: QueuedTask): Cancelable {
// non blocking add to queue
sendingQueue.add(task)
markAsManaged(task)
return object : Cancelable {
override fun cancel() {
task.cancel()
}
}
}
companion object {
private const val RETRY_WAIT_TIME_MS = 10_000L
}
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
private var networkAvailableLock = Object()
private var canReachServer = true
private var retryNoNetworkTask: TimerTask? = null
override fun run() {
Timber.v("## SendThread started ts:${System.currentTimeMillis()}")
try {
while (!isInterrupted) {
Timber.v("## SendThread wait for task to process")
val task = sendingQueue.take()
Timber.v("## SendThread Found task to process $task")
if (task.isCancelled()) {
Timber.v("## SendThread send cancelled for $task")
// we do not execute this one
continue
}
// we check for network connectivity
while (!canReachServer) {
Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
// if thread as been killed meanwhile
// if (state == State.KILLING) break
}
Timber.v("## Server is Reachable")
// so network is available
runBlocking {
retryLoop@ while (task.retryCount < 3) {
try {
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
task.execute()
// sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService))
// SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER)
break@retryLoop
} catch (exception: Throwable) {
when {
exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer = false
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
while (!canReachServer) {
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
}
}
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
// wait a bit
// Todo if its a quota exception can we get timout?
sleep(3_000)
continue@retryLoop
}
exception.isTokenError() -> {
Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt")
// we can exit the loop
task.onTaskFailed()
throw InterruptedException()
}
else -> {
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
// this task is in error, check next one?
break@retryLoop
}
}
}
}
}
markAsFinished(task)
}
} catch (interruptionException: InterruptedException) {
// will be thrown is thread is interrupted while seeping
interrupt()
Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}")
}
// state = State.KILLED
// is this needed?
retryNoNetworkTask?.cancel()
Timber.w("## SendThread finished ${System.currentTimeMillis()}")
}
private fun waitForNetwork() {
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
synchronized(networkAvailableLock) {
canReachServer = checkHostAvailable().also {
Timber.v("## SendThread checkHostAvailable $it")
}
networkAvailableLock.notify()
}
}
synchronized(networkAvailableLock) { networkAvailableLock.wait() }
}
/**
* Check if homeserver is reachable.
*/
private fun checkHostAvailable(): Boolean {
val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false
val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80
val timeout = 30_000
try {
Socket().use { socket ->
val inetAddress: InetAddress = InetAddress.getByName(host)
val inetSocketAddress = InetSocketAddress(inetAddress, port)
socket.connect(inetSocketAddress, timeout)
return true
}
} catch (e: IOException) {
Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}")
return false
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import android.content.Context
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import timber.log.Timber
import javax.inject.Inject
/**
* Simple lightweight persistence
* Don't want to go in DB due to current issues
* Will never manage lots of events, it simply uses sharedPreferences.
* It is just used to remember what events/localEchos was managed by the event sender in order to
* reschedule them (and only them) on next restart
*/
internal class QueueMemento @Inject constructor(context: Context,
@SessionId sessionId: String,
private val queuedTaskFactory: QueuedTaskFactory,
private val localEchoRepository: LocalEchoRepository,
private val cryptoService: CryptoService) {
private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE)
private val managedTaskInfos = mutableListOf<QueuedTask>()
fun track(task: QueuedTask) {
synchronized(managedTaskInfos) {
managedTaskInfos.add(task)
persist()
}
}
fun unTrack(task: QueuedTask) {
managedTaskInfos.remove(task)
persist()
}
private fun persist() {
managedTaskInfos.mapIndexedNotNull { index, queuedTask ->
toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) }
}.toSet().let { set ->
storage.edit()
.putStringSet("ManagedBySender", set)
.apply()
}
}
private fun toTaskInfo(task: QueuedTask, order: Int): TaskInfo? {
synchronized(managedTaskInfos) {
return when (task) {
is SendEventQueuedTask -> SendEventTaskInfo(
localEchoId = task.event.eventId ?: "",
encrypt = task.encrypt,
order = order
)
is RedactQueuedTask -> RedactEventTaskInfo(
redactionLocalEcho = task.redactionLocalEchoId,
order = order
)
else -> null
}
}
}
suspend fun restoreTasks(eventProcessor: EventSenderProcessor) {
// events should be restarted in correct order
storage.getStringSet("ManagedBySender", null)?.let { pending ->
Timber.d("## Send - Recovering unsent events $pending")
pending.mapNotNull { tryOrNull { TaskInfo.map(it) } }
}
?.sortedBy { it.order }
?.forEach { info ->
try {
when (info) {
is SendEventTaskInfo -> {
localEchoRepository.getUpToDateEcho(info.localEchoId)?.let {
if (it.sendState.isSending() && it.eventId != null && it.roomId != null) {
localEchoRepository.updateSendState(it.eventId, it.roomId, SendState.UNSENT)
Timber.d("## Send -Reschedule send $info")
eventProcessor.postTask(queuedTaskFactory.createSendTask(it, info.encrypt ?: cryptoService.isRoomEncrypted(it.roomId)))
}
}
}
is RedactEventTaskInfo -> {
info.redactionLocalEcho?.let { localEchoRepository.getUpToDateEcho(it) }?.let {
localEchoRepository.updateSendState(it.eventId!!, it.roomId, SendState.UNSENT)
// try to get reason
val reason = it.content?.get("reason") as? String
if (it.redacts != null && it.roomId != null) {
Timber.d("## Send -Reschedule redact $info")
eventProcessor.postTask(queuedTaskFactory.createRedactTask(it.eventId, it.redacts, it.roomId, reason))
}
}
// postTask(queuedTaskFactory.createRedactTask(info.eventToRedactId, info.)
}
}
} catch (failure: Throwable) {
Timber.e("failed to restore task $info")
}
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
abstract class QueuedTask {
var retryCount = 0
abstract suspend fun execute()
abstract fun onTaskFailed()
abstract fun isCancelled() : Boolean
abstract fun cancel()
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask
import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import javax.inject.Inject
internal class QueuedTaskFactory @Inject constructor(
private val sendEventTask: SendEventTask,
private val cryptoService: CryptoService,
private val localEchoRepository: LocalEchoRepository,
private val redactEventTask: RedactEventTask,
private val cancelSendTracker: CancelSendTracker
) {
fun createSendTask(event: Event, encrypt: Boolean): QueuedTask {
return SendEventQueuedTask(
event = event,
encrypt = encrypt,
cryptoService = cryptoService,
localEchoRepository = localEchoRepository,
sendEventTask = sendEventTask,
cancelSendTracker = cancelSendTracker
)
}
fun createRedactTask(redactionLocalEcho: String, eventId: String, roomId: String, reason: String?): QueuedTask {
return RedactQueuedTask(
redactionLocalEchoId = redactionLocalEcho,
toRedactEventId = eventId,
roomId = roomId,
reason = reason,
redactEventTask = redactEventTask,
localEchoRepository = localEchoRepository,
cancelSendTracker = cancelSendTracker
)
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
internal class RedactQueuedTask(
val toRedactEventId: String,
val redactionLocalEchoId: String,
val roomId: String,
val reason: String?,
val redactEventTask: RedactEventTask,
val localEchoRepository: LocalEchoRepository,
val cancelSendTracker: CancelSendTracker
) : QueuedTask() {
private var _isCancelled: Boolean = false
override fun toString() = "[RedactEventRunnableTask $redactionLocalEchoId]"
override suspend fun execute() {
redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason))
}
override fun onTaskFailed() {
localEchoRepository.updateSendState(redactionLocalEchoId, roomId, SendState.UNDELIVERED)
}
override fun isCancelled(): Boolean {
return _isCancelled || cancelSendTracker.isCancelRequestedFor(redactionLocalEchoId, roomId)
}
override fun cancel() {
_isCancelled = true
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
internal class SendEventQueuedTask(
val event: Event,
val encrypt: Boolean,
val sendEventTask: SendEventTask,
val cryptoService: CryptoService,
val localEchoRepository: LocalEchoRepository,
val cancelSendTracker: CancelSendTracker
) : QueuedTask() {
private var _isCancelled: Boolean = false
override fun toString() = "[SendEventRunnableTask ${event.eventId}]"
override suspend fun execute() {
sendEventTask.execute(SendEventTask.Params(event, encrypt, cryptoService))
}
override fun onTaskFailed() {
when (event.getClearType()) {
EventType.REDACTION,
EventType.REACTION -> {
// we just delete? it will not be present on timeline and no ux to retry
localEchoRepository.deleteFailedEchoAsync(eventId = event.eventId, roomId = event.roomId ?: "")
// TODO update aggregation :/ or it will stay locally
}
else -> {
localEchoRepository.updateSendState(event.eventId!!, event.roomId, SendState.UNDELIVERED)
}
}
}
override fun isCancelled(): Boolean {
return _isCancelled || cancelSendTracker.isCancelRequestedFor(event.eventId, event.roomId)
}
override fun cancel() {
_isCancelled = true
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import com.squareup.moshi.Json
import com.squareup.moshi.JsonClass
import com.squareup.moshi.Moshi
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.internal.di.SerializeNulls
import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory
/**
* Info that need to be persisted by the sender thread
* With polymorphic moshi parsing
*/
internal interface TaskInfo {
val type: String
val order: Int
companion object {
const val TYPE_UNKNOWN = "TYPE_UNKNOWN"
const val TYPE_SEND = "TYPE_SEND"
const val TYPE_REDACT = "TYPE_REDACT"
val moshi = Moshi.Builder()
.add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java)
.registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND)
.registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT)
)
.add(SerializeNulls.JSON_ADAPTER_FACTORY)
.build()
fun map(info: TaskInfo): String {
return moshi.adapter(TaskInfo::class.java).toJson(info)
}
fun map(string: String): TaskInfo? {
return tryOrNull { moshi.adapter(TaskInfo::class.java).fromJson(string) }
}
}
}
@JsonClass(generateAdapter = true)
internal data class SendEventTaskInfo(
@Json(name = "type") override val type: String = TaskInfo.TYPE_SEND,
@Json(name = "localEchoId") val localEchoId: String,
@Json(name = "encrypt") val encrypt: Boolean?,
@Json(name = "order") override val order: Int
) : TaskInfo
@JsonClass(generateAdapter = true)
internal data class RedactEventTaskInfo(
@Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT,
@Json(name = "redactionLocalEcho") val redactionLocalEcho: String?,
@Json(name = "order") override val order: Int
) : TaskInfo
@JsonClass(generateAdapter = true)
internal data class FallbackTaskInfo(
@Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT,
@Json(name = "order") override val order: Int
) : TaskInfo

View File

@ -32,8 +32,11 @@ import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.RelationType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.EventAnnotationsSummary
import org.matrix.android.sdk.api.session.room.model.ReactionAggregatedSummary
import org.matrix.android.sdk.api.session.room.model.ReadReceipt
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
@ -83,6 +86,7 @@ internal class DefaultTimeline(
data class OnNewTimelineEvents(val roomId: String, val eventIds: List<String>)
data class OnLocalEchoCreated(val roomId: String, val timelineEvent: TimelineEvent)
data class OnLocalEchoUpdated(val roomId: String, val eventId: String, val sendState: SendState)
companion object {
val BACKGROUND_HANDLER = createBackgroundHandler("TIMELINE_DB_THREAD")
@ -102,7 +106,9 @@ internal class DefaultTimeline(
private var prevDisplayIndex: Int? = null
private var nextDisplayIndex: Int? = null
private val inMemorySendingEvents = Collections.synchronizedList<TimelineEvent>(ArrayList())
private val uiEchoManager = UIEchoManager()
private val builtEvents = Collections.synchronizedList<TimelineEvent>(ArrayList())
private val builtEventsIdMap = Collections.synchronizedMap(HashMap<String, Int>())
private val backwardsState = AtomicReference(State())
@ -163,10 +169,7 @@ internal class DefaultTimeline(
sendingEvents = roomEntity.sendingTimelineEvents.where().filterEventsWithSettings().findAll()
sendingEvents.addChangeListener { events ->
// Remove in memory as soon as they are known by database
events.forEach { te ->
inMemorySendingEvents.removeAll { te.eventId == it.eventId }
}
uiEchoManager.sentEventsUpdated(events)
postSnapshot()
}
nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll()
@ -318,16 +321,15 @@ internal class DefaultTimeline(
@Subscribe(threadMode = ThreadMode.MAIN)
fun onLocalEchoCreated(onLocalEchoCreated: OnLocalEchoCreated) {
if (isLive && onLocalEchoCreated.roomId == roomId) {
// do not add events that would have been filtered
if (listOf(onLocalEchoCreated.timelineEvent).filterEventsWithSettings().isNotEmpty()) {
listeners.forEach {
it.onNewTimelineEvents(listOf(onLocalEchoCreated.timelineEvent.eventId))
}
Timber.v("On local echo created: ${onLocalEchoCreated.timelineEvent.eventId}")
inMemorySendingEvents.add(0, onLocalEchoCreated.timelineEvent)
postSnapshot()
}
if (uiEchoManager.onLocalEchoCreated(onLocalEchoCreated)) {
postSnapshot()
}
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun onLocalEchoUpdated(onLocalEchoUpdated: OnLocalEchoUpdated) {
if (uiEchoManager.onLocalEchoUpdated(onLocalEchoUpdated)) {
postSnapshot()
}
}
@ -407,10 +409,12 @@ internal class DefaultTimeline(
private fun buildSendingEvents(): List<TimelineEvent> {
val builtSendingEvents = ArrayList<TimelineEvent>()
if (hasReachedEnd(Timeline.Direction.FORWARDS) && !hasMoreInCache(Timeline.Direction.FORWARDS)) {
builtSendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings())
builtSendingEvents.addAll(uiEchoManager.getInMemorySendingEvents().filterEventsWithSettings())
sendingEvents.forEach { timelineEventEntity ->
if (builtSendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) {
builtSendingEvents.add(timelineEventMapper.map(timelineEventEntity))
val element = timelineEventMapper.map(timelineEventEntity)
uiEchoManager.updateSentStateWithUiEcho(element)
builtSendingEvents.add(element)
}
}
}
@ -622,10 +626,7 @@ internal class DefaultTimeline(
val timelineEvent = buildTimelineEvent(eventEntity)
val transactionId = timelineEvent.root.unsignedData?.transactionId
val sendingEvent = inMemorySendingEvents.find {
it.eventId == transactionId
}
inMemorySendingEvents.remove(sendingEvent)
uiEchoManager.onSyncedEvent(transactionId)
if (timelineEvent.isEncrypted()
&& timelineEvent.root.mxDecryptionResult == null) {
@ -649,7 +650,10 @@ internal class DefaultTimeline(
timelineEventEntity = eventEntity,
buildReadReceipts = settings.buildReadReceipts,
correctedReadReceipts = hiddenReadReceipts.correctedReadReceipts(eventEntity.eventId)
)
).let {
// eventually enhance with ui echo?
(uiEchoManager.decorateEventWithReactionUiEcho(it) ?: it)
}
/**
* This has to be called on TimelineThread as it accesses realm live results
@ -797,4 +801,155 @@ internal class DefaultTimeline(
val isPaginating: Boolean = false,
val requestedPaginationCount: Int = 0
)
private data class ReactionUiEchoData(
val localEchoId: String,
val reactedOnEventId: String,
val reaction: String
)
inner class UIEchoManager {
private val inMemorySendingEvents = Collections.synchronizedList<TimelineEvent>(ArrayList())
fun getInMemorySendingEvents(): List<TimelineEvent> {
return inMemorySendingEvents
}
/**
* Due to lag of DB updates, we keep some UI echo of some properties to update timeline faster
*/
private val inMemorySendingStates = Collections.synchronizedMap<String, SendState>(HashMap())
private val inMemoryReactions = Collections.synchronizedMap<String, MutableList<ReactionUiEchoData>>(HashMap())
fun sentEventsUpdated(events: RealmResults<TimelineEventEntity>) {
// Remove in memory as soon as they are known by database
events.forEach { te ->
inMemorySendingEvents.removeAll { te.eventId == it.eventId }
}
inMemorySendingStates.keys.removeAll { key ->
events.find { it.eventId == key } == null
}
inMemoryReactions.keys.removeAll { key ->
events.find { it.eventId == key } == null
}
}
fun onLocalEchoUpdated(onLocalEchoUpdated: OnLocalEchoUpdated): Boolean {
if (isLive && onLocalEchoUpdated.roomId == roomId) {
val existingState = inMemorySendingStates[onLocalEchoUpdated.eventId]
inMemorySendingStates[onLocalEchoUpdated.eventId] = onLocalEchoUpdated.sendState
if (existingState != onLocalEchoUpdated.sendState) {
return true
}
}
return false
}
// return true if should update
fun onLocalEchoCreated(onLocalEchoCreated: OnLocalEchoCreated): Boolean {
var postSnapshot = false
if (isLive && onLocalEchoCreated.roomId == roomId) {
// Manage some ui echos (do it before filter because actual event could be filtered out)
when (onLocalEchoCreated.timelineEvent.root.getClearType()) {
EventType.REDACTION -> {
}
EventType.REACTION -> {
val content = onLocalEchoCreated.timelineEvent.root.content?.toModel<ReactionContent>()
if (RelationType.ANNOTATION == content?.relatesTo?.type) {
val reaction = content.relatesTo.key
val relatedEventID = content.relatesTo.eventId
inMemoryReactions.getOrPut(relatedEventID) { mutableListOf() }
.add(
ReactionUiEchoData(
localEchoId = onLocalEchoCreated.timelineEvent.eventId,
reactedOnEventId = relatedEventID,
reaction = reaction
)
)
postSnapshot = rebuildEvent(relatedEventID) {
decorateEventWithReactionUiEcho(it)
} || postSnapshot
}
}
}
// do not add events that would have been filtered
if (listOf(onLocalEchoCreated.timelineEvent).filterEventsWithSettings().isNotEmpty()) {
listeners.forEach {
it.onNewTimelineEvents(listOf(onLocalEchoCreated.timelineEvent.eventId))
}
Timber.v("On local echo created: ${onLocalEchoCreated.timelineEvent.eventId}")
inMemorySendingEvents.add(0, onLocalEchoCreated.timelineEvent)
postSnapshot = true
}
}
return postSnapshot
}
fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent? {
val relatedEventID = timelineEvent.eventId
val contents = inMemoryReactions[relatedEventID] ?: return null
var existingAnnotationSummary = timelineEvent.annotations ?: EventAnnotationsSummary(
relatedEventID
)
val updateReactions = existingAnnotationSummary.reactionsSummary.toMutableList()
contents.forEach { uiEchoReaction ->
val existing = updateReactions.firstOrNull { it.key == uiEchoReaction.reaction }
if (existing == null) {
// just add the new key
ReactionAggregatedSummary(
key = uiEchoReaction.reaction,
count = 1,
addedByMe = true,
firstTimestamp = System.currentTimeMillis(),
sourceEvents = emptyList(),
localEchoEvents = listOf(uiEchoReaction.localEchoId)
).let { updateReactions.add(it) }
} else {
// update Existing Key
if (!existing.localEchoEvents.contains(uiEchoReaction.localEchoId)) {
updateReactions.remove(existing)
// only update if echo is not yet there
ReactionAggregatedSummary(
key = existing.key,
count = existing.count + 1,
addedByMe = true,
firstTimestamp = existing.firstTimestamp,
sourceEvents = existing.sourceEvents,
localEchoEvents = existing.localEchoEvents + uiEchoReaction.localEchoId
).let { updateReactions.add(it) }
}
}
}
existingAnnotationSummary = existingAnnotationSummary.copy(
reactionsSummary = updateReactions
)
return timelineEvent.copy(
annotations = existingAnnotationSummary
)
}
fun updateSentStateWithUiEcho(element: TimelineEvent) {
inMemorySendingStates[element.eventId]?.let {
// Timber.v("## ${System.currentTimeMillis()} Send event refresh echo with live state ${it} from state ${element.root.sendState}")
element.root.sendState = element.root.sendState.takeIf { it == SendState.SENT } ?: it
}
}
fun onSyncedEvent(transactionId: String?) {
val sendingEvent = inMemorySendingEvents.find {
it.eventId == transactionId
}
inMemorySendingEvents.remove(sendingEvent)
// Is it too early to clear it? will be done when removed from sending anyway?
inMemoryReactions.forEach { (_, u) ->
u.filterNot { it.localEchoId == transactionId }
}
}
}
}

View File

@ -21,7 +21,6 @@ import androidx.work.ExistingWorkPolicy
import androidx.work.ListenableWorker
import androidx.work.OneTimeWorkRequest
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.api.util.NoOpCancellable
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.util.CancelableWork
import org.matrix.android.sdk.internal.worker.startChain
@ -38,24 +37,6 @@ internal class TimelineSendEventWorkCommon @Inject constructor(
private val workManagerProvider: WorkManagerProvider
) {
fun postSequentialWorks(roomId: String, vararg workRequests: OneTimeWorkRequest): Cancelable {
return when {
workRequests.isEmpty() -> NoOpCancellable
workRequests.size == 1 -> postWork(roomId, workRequests.first())
else -> {
val firstWork = workRequests.first()
var continuation = workManagerProvider.workManager
.beginUniqueWork(buildWorkName(roomId), ExistingWorkPolicy.APPEND, firstWork)
for (i in 1 until workRequests.size) {
val workRequest = workRequests[i]
continuation = continuation.then(workRequest)
}
continuation.enqueue()
CancelableWork(workManagerProvider.workManager, firstWork.id)
}
}
}
fun postWork(roomId: String, workRequest: OneTimeWorkRequest, policy: ExistingWorkPolicy = ExistingWorkPolicy.APPEND_OR_REPLACE): Cancelable {
workManagerProvider.workManager
.beginUniqueWork(buildWorkName(roomId), policy, workRequest)
@ -77,11 +58,6 @@ internal class TimelineSendEventWorkCommon @Inject constructor(
return "${roomId}_$SEND_WORK"
}
fun cancelAllWorks(roomId: String) {
workManagerProvider.workManager
.cancelUniqueWork(buildWorkName(roomId))
}
companion object {
private const val SEND_WORK = "SEND_WORK"
private const val BACKOFF_DELAY = 10_000L

View File

@ -68,7 +68,6 @@ sealed class RoomDetailAction : VectorViewModelAction {
data class IgnoreUser(val userId: String?) : RoomDetailAction()
object ClearSendQueue : RoomDetailAction()
object ResendAll : RoomDetailAction()
data class StartCall(val isVideo: Boolean) : RoomDetailAction()
object EndCall : RoomDetailAction()

View File

@ -645,13 +645,6 @@ class RoomDetailFragment @Inject constructor(
override fun onOptionsItemSelected(item: MenuItem): Boolean {
return when (item.itemId) {
R.id.clear_message_queue -> {
// This a temporary option during dev as it is not super stable
// Cancel all pending actions in room queue and post a dummy
// Then mark all sending events as undelivered
roomDetailViewModel.handle(RoomDetailAction.ClearSendQueue)
true
}
R.id.invite -> {
navigator.openInviteUsersToRoom(requireActivity(), roomDetailArgs.roomId)
true

View File

@ -251,7 +251,6 @@ class RoomDetailViewModel @AssistedInject constructor(
is RoomDetailAction.HandleTombstoneEvent -> handleTombstoneEvent(action)
is RoomDetailAction.ResendMessage -> handleResendEvent(action)
is RoomDetailAction.RemoveFailedEcho -> handleRemove(action)
is RoomDetailAction.ClearSendQueue -> handleClearSendQueue()
is RoomDetailAction.ResendAll -> handleResendAll()
is RoomDetailAction.MarkAllAsRead -> handleMarkAllAsRead()
is RoomDetailAction.ReportContent -> handleReportContent(action)
@ -542,9 +541,6 @@ class RoomDetailViewModel @AssistedInject constructor(
return@withState false
}
when (itemId) {
R.id.clear_message_queue ->
// For now always disable when not in developer mode, worker cancellation is not working properly
timeline.pendingEventCount() > 0 && vectorPreferences.developerMode()
R.id.resend_all -> state.asyncRoomSummary()?.hasFailedSending == true
R.id.timeline_setting -> true
R.id.invite -> state.canInvite
@ -1065,10 +1061,6 @@ class RoomDetailViewModel @AssistedInject constructor(
}
}
private fun handleClearSendQueue() {
room.clearSendingQueue()
}
private fun handleResendAll() {
room.resendAllFailedMessages()
}

View File

@ -41,23 +41,27 @@ abstract class MessageTextItem : AbsMessageItem<MessageTextItem.Holder>() {
var movementMethod: MovementMethod? = null
override fun bind(holder: Holder) {
super.bind(holder)
holder.messageView.movementMethod = movementMethod
if (useBigFont) {
holder.messageView.textSize = 44F
} else {
holder.messageView.textSize = 14F
}
renderSendState(holder.messageView, holder.messageView)
holder.messageView.setOnClickListener(attributes.itemClickListener)
holder.messageView.setOnLongClickListener(attributes.itemLongClickListener)
if (searchForPills) {
message?.findPillsAndProcess(coroutineScope) { it.bind(holder.messageView) }
message?.findPillsAndProcess(coroutineScope) {
// mmm.. not sure this is so safe in regards to cell reuse
it.bind(holder.messageView)
}
}
val textFuture = PrecomputedTextCompat.getTextFuture(
message ?: "",
TextViewCompat.getTextMetricsParams(holder.messageView),
null)
super.bind(holder)
holder.messageView.movementMethod = movementMethod
renderSendState(holder.messageView, holder.messageView)
holder.messageView.setOnClickListener(attributes.itemClickListener)
holder.messageView.setOnLongClickListener(attributes.itemLongClickListener)
holder.messageView.setTextFuture(textFuture)
}

View File

@ -68,11 +68,4 @@
app:showAsAction="never"
tools:visible="true" />
<item
android:id="@+id/clear_message_queue"
android:title="@string/clear_timeline_send_queue"
android:visible="@bool/debug_mode"
app:showAsAction="never"
tools:visible="true" />
</menu>