Fix cancellation of sending event (#2438)
This commit is contained in:
parent
f6cc05634f
commit
cd983de058
@ -8,7 +8,7 @@ Improvements 🙌:
|
||||
-
|
||||
|
||||
Bugfix 🐛:
|
||||
-
|
||||
- Fix cancellation of sending event (#2438)
|
||||
|
||||
Translations 🗣:
|
||||
-
|
||||
|
@ -210,6 +210,8 @@ internal class DefaultSendService @AssistedInject constructor(
|
||||
|
||||
override fun cancelSend(eventId: String) {
|
||||
cancelSendTracker.markLocalEchoForCancel(eventId, roomId)
|
||||
// This is maybe the current task, so cancel it too
|
||||
eventSenderProcessor.cancel(eventId, roomId)
|
||||
taskExecutor.executorScope.launch {
|
||||
localEchoRepository.deleteFailedEcho(roomId, eventId)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.matrix.android.sdk.api.auth.data.SessionParams
|
||||
@ -109,10 +110,18 @@ internal class EventSenderProcessor @Inject constructor(
|
||||
return task
|
||||
}
|
||||
|
||||
fun cancel(eventId: String, roomId: String) {
|
||||
(currentTask as? SendEventQueuedTask)
|
||||
?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId }
|
||||
?.cancel()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val RETRY_WAIT_TIME_MS = 10_000L
|
||||
}
|
||||
|
||||
private var currentTask: QueuedTask? = null
|
||||
|
||||
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
|
||||
|
||||
private var networkAvailableLock = Object()
|
||||
@ -125,6 +134,7 @@ internal class EventSenderProcessor @Inject constructor(
|
||||
while (!isInterrupted) {
|
||||
Timber.v("## SendThread wait for task to process")
|
||||
val task = sendingQueue.take()
|
||||
.also { currentTask = it }
|
||||
Timber.v("## SendThread Found task to process $task")
|
||||
|
||||
if (task.isCancelled()) {
|
||||
@ -179,6 +189,10 @@ internal class EventSenderProcessor @Inject constructor(
|
||||
task.onTaskFailed()
|
||||
throw InterruptedException()
|
||||
}
|
||||
exception is CancellationException -> {
|
||||
Timber.v("## SendThread task has been cancelled")
|
||||
break@retryLoop
|
||||
}
|
||||
else -> {
|
||||
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
|
||||
// this task is in error, check next one?
|
||||
|
@ -23,7 +23,13 @@ abstract class QueuedTask : Cancelable {
|
||||
|
||||
private var hasBeenCancelled: Boolean = false
|
||||
|
||||
abstract suspend fun execute()
|
||||
suspend fun execute() {
|
||||
if (!isCancelled()) {
|
||||
doExecute()
|
||||
}
|
||||
}
|
||||
|
||||
abstract suspend fun doExecute()
|
||||
|
||||
abstract fun onTaskFailed()
|
||||
|
||||
|
@ -33,7 +33,7 @@ internal class RedactQueuedTask(
|
||||
|
||||
override fun toString() = "[RedactQueuedTask $redactionLocalEchoId]"
|
||||
|
||||
override suspend fun execute() {
|
||||
override suspend fun doExecute() {
|
||||
redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason))
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ internal class SendEventQueuedTask(
|
||||
|
||||
override fun toString() = "[SendEventQueuedTask ${event.eventId}]"
|
||||
|
||||
override suspend fun execute() {
|
||||
override suspend fun doExecute() {
|
||||
sendEventTask.execute(SendEventTask.Params(event, encrypt))
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user