Merge pull request #2135 from vector-im/feature/safe_workers

Create parent class for all MatrixWorker
This commit is contained in:
Benoit Marty 2020-09-22 14:35:15 +02:00 committed by GitHub
commit 23d911cc2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 315 additions and 252 deletions

View File

@ -18,10 +18,9 @@
package org.matrix.android.sdk.internal.crypto
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Event
@ -32,28 +31,29 @@ import org.matrix.android.sdk.internal.crypto.model.MXUsersDevicesMap
import org.matrix.android.sdk.internal.crypto.model.rest.ShareRequestCancellation
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import javax.inject.Inject
internal class CancelGossipRequestWorker(context: Context,
params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<CancelGossipRequestWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
val sessionId: String,
override val sessionId: String,
val requestId: String,
val recipients: Map<String, List<String>>
) {
val recipients: Map<String, List<String>>,
override val lastFailureMessage: String? = null
) : SessionWorkerParams {
companion object {
fun fromRequest(sessionId: String, request: OutgoingGossipingRequest): Params {
return Params(
sessionId = sessionId,
requestId = request.requestId,
recipients = request.recipients
recipients = request.recipients,
lastFailureMessage = null
)
}
}
@ -64,18 +64,11 @@ internal class CancelGossipRequestWorker(context: Context,
@Inject lateinit var eventBus: EventBus
@Inject lateinit var credentials: Credentials
override suspend fun doWork(): Result {
val errorOutputData = Data.Builder().putBoolean("failed", true).build()
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success(errorOutputData)
val sessionComponent = getSessionComponent(params.sessionId)
?: return Result.success(errorOutputData).also {
// TODO, can this happen? should I update local echo?
Timber.e("Unknown Session, cannot send message, sessionId: ${params.sessionId}")
}
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val localId = LocalEcho.createLocalEchoId()
val contentMap = MXUsersDevicesMap<Any>()
val toDeviceContent = ShareRequestCancellation(
@ -107,13 +100,17 @@ internal class CancelGossipRequestWorker(context: Context,
)
cryptoStore.updateOutgoingGossipingRequestState(params.requestId, OutgoingGossipingRequestState.CANCELLED)
return Result.success()
} catch (exception: Throwable) {
return if (exception.shouldBeRetried()) {
} catch (throwable: Throwable) {
return if (throwable.shouldBeRetried()) {
Result.retry()
} else {
cryptoStore.updateOutgoingGossipingRequestState(params.requestId, OutgoingGossipingRequestState.FAILED_TO_CANCEL)
Result.success(errorOutputData)
buildErrorResult(params, throwable.localizedMessage ?: "error")
}
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -18,10 +18,9 @@
package org.matrix.android.sdk.internal.crypto
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Event
@ -34,40 +33,34 @@ import org.matrix.android.sdk.internal.crypto.model.rest.RoomKeyShareRequest
import org.matrix.android.sdk.internal.crypto.model.rest.SecretShareRequest
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import timber.log.Timber
import javax.inject.Inject
internal class SendGossipRequestWorker(context: Context,
params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<SendGossipRequestWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
val sessionId: String,
override val sessionId: String,
val keyShareRequest: OutgoingRoomKeyRequest? = null,
val secretShareRequest: OutgoingSecretRequest? = null
)
val secretShareRequest: OutgoingSecretRequest? = null,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject lateinit var sendToDeviceTask: SendToDeviceTask
@Inject lateinit var cryptoStore: IMXCryptoStore
@Inject lateinit var eventBus: EventBus
@Inject lateinit var credentials: Credentials
override suspend fun doWork(): Result {
val errorOutputData = Data.Builder().putBoolean("failed", true).build()
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success(errorOutputData)
val sessionComponent = getSessionComponent(params.sessionId)
?: return Result.success(errorOutputData).also {
// TODO, can this happen? should I update local echo?
Timber.e("Unknown Session, cannot send message, sessionId: ${params.sessionId}")
}
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val localId = LocalEcho.createLocalEchoId()
val contentMap = MXUsersDevicesMap<Any>()
val eventType: String
@ -121,7 +114,7 @@ internal class SendGossipRequestWorker(context: Context,
}
}
else -> {
return Result.success(errorOutputData).also {
return buildErrorResult(params, "Unknown empty gossiping request").also {
Timber.e("Unknown empty gossiping request: $params")
}
}
@ -137,13 +130,17 @@ internal class SendGossipRequestWorker(context: Context,
)
cryptoStore.updateOutgoingGossipingRequestState(requestId, OutgoingGossipingRequestState.SENT)
return Result.success()
} catch (exception: Throwable) {
return if (exception.shouldBeRetried()) {
} catch (throwable: Throwable) {
return if (throwable.shouldBeRetried()) {
Result.retry()
} else {
cryptoStore.updateOutgoingGossipingRequestState(requestId, OutgoingGossipingRequestState.FAILED_TO_SEND)
Result.success(errorOutputData)
buildErrorResult(params, throwable.localizedMessage ?: "error")
}
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -18,10 +18,9 @@
package org.matrix.android.sdk.internal.crypto
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Event
@ -34,22 +33,23 @@ import org.matrix.android.sdk.internal.crypto.model.MXUsersDevicesMap
import org.matrix.android.sdk.internal.crypto.model.event.SecretSendEventContent
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import timber.log.Timber
import javax.inject.Inject
internal class SendGossipWorker(context: Context,
params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<SendGossipWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
val sessionId: String,
override val sessionId: String,
val secretValue: String,
val request: IncomingSecretShareRequest
)
val request: IncomingSecretShareRequest,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject lateinit var sendToDeviceTask: SendToDeviceTask
@Inject lateinit var cryptoStore: IMXCryptoStore
@ -58,18 +58,11 @@ internal class SendGossipWorker(context: Context,
@Inject lateinit var messageEncrypter: MessageEncrypter
@Inject lateinit var ensureOlmSessionsForDevicesAction: EnsureOlmSessionsForDevicesAction
override suspend fun doWork(): Result {
val errorOutputData = Data.Builder().putBoolean("failed", true).build()
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success(errorOutputData)
val sessionComponent = getSessionComponent(params.sessionId)
?: return Result.success(errorOutputData).also {
// TODO, can this happen? should I update local echo?
Timber.e("Unknown Session, cannot send message, sessionId: ${params.sessionId}")
}
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val localId = LocalEcho.createLocalEchoId()
val eventType: String = EventType.SEND_SECRET
@ -81,7 +74,7 @@ internal class SendGossipWorker(context: Context,
val requestingUserId = params.request.userId ?: ""
val requestingDeviceId = params.request.deviceId ?: ""
val deviceInfo = cryptoStore.getUserDevice(requestingUserId, requestingDeviceId)
?: return Result.success(errorOutputData).also {
?: return buildErrorResult(params, "Unknown deviceInfo, cannot send message").also {
cryptoStore.updateGossipingRequestState(params.request, GossipingRequestState.FAILED_TO_ACCEPTED)
Timber.e("Unknown deviceInfo, cannot send message, sessionId: ${params.request.deviceId}")
}
@ -94,7 +87,7 @@ internal class SendGossipWorker(context: Context,
if (olmSessionResult?.sessionId == null) {
// no session with this device, probably because there
// were no one-time keys.
return Result.success(errorOutputData).also {
return buildErrorResult(params, "no session with this device").also {
cryptoStore.updateGossipingRequestState(params.request, GossipingRequestState.FAILED_TO_ACCEPTED)
Timber.e("no session with this device, probably because there were no one-time keys.")
}
@ -130,13 +123,17 @@ internal class SendGossipWorker(context: Context,
)
cryptoStore.updateGossipingRequestState(params.request, GossipingRequestState.ACCEPTED)
return Result.success()
} catch (exception: Throwable) {
return if (exception.shouldBeRetried()) {
} catch (throwable: Throwable) {
return if (throwable.shouldBeRetried()) {
Result.retry()
} else {
cryptoStore.updateGossipingRequestState(params.request, GossipingRequestState.FAILED_TO_ACCEPTED)
Result.success(errorOutputData)
buildErrorResult(params, throwable.localizedMessage ?: "error")
}
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -17,18 +17,17 @@
package org.matrix.android.sdk.internal.crypto.verification
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject
@ -38,7 +37,7 @@ import javax.inject.Inject
*/
internal class SendVerificationMessageWorker(context: Context,
params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<SendVerificationMessageWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -47,30 +46,17 @@ internal class SendVerificationMessageWorker(context: Context,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject
lateinit var sendVerificationMessageTask: SendVerificationMessageTask
@Inject
lateinit var localEchoRepository: LocalEchoRepository
@Inject
lateinit var cryptoService: CryptoService
@Inject lateinit var sendVerificationMessageTask: SendVerificationMessageTask
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var cryptoService: CryptoService
@Inject lateinit var cancelSendTracker: CancelSendTracker
override suspend fun doWork(): Result {
val errorOutputData = Data.Builder().putBoolean(OUTPUT_KEY_FAILED, true).build()
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success(errorOutputData)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
val sessionComponent = getSessionComponent(params.sessionId)
?: return Result.success(errorOutputData).also {
// TODO, can this happen? should I update local echo?
Timber.e("Unknown Session, cannot send message, sessionId: ${params.sessionId}")
}
sessionComponent.inject(this)
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) ?: return Result.success(errorOutputData)
override suspend fun doSafeWork(params: Params): Result {
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) ?: return buildErrorResult(params, "Event not found")
val localEventId = localEvent.eventId ?: ""
val roomId = localEvent.roomId ?: ""
@ -91,20 +77,16 @@ internal class SendVerificationMessageWorker(context: Context,
)
Result.success(Data.Builder().putString(localEventId, resultEventId).build())
} catch (exception: Throwable) {
if (exception.shouldBeRetried()) {
} catch (throwable: Throwable) {
if (throwable.shouldBeRetried()) {
Result.retry()
} else {
Result.success(errorOutputData)
buildErrorResult(params, throwable.localizedMessage ?: "error")
}
}
}
companion object {
private const val OUTPUT_KEY_FAILED = "failed"
fun hasFailed(outputData: Data): Boolean {
return outputData.getBoolean(OUTPUT_KEY_FAILED, false)
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -54,6 +54,7 @@ import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.StringProvider
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import timber.log.Timber
import java.util.UUID
@ -117,14 +118,13 @@ internal class VerificationTransportRoomMessage(
workInfoList
?.firstOrNull { it.id == enqueueInfo.second }
?.let { wInfo ->
when (wInfo.state) {
WorkInfo.State.FAILED -> {
tx?.cancel(onErrorReason)
workLiveData.removeObserver(this)
}
WorkInfo.State.SUCCEEDED -> {
if (SendVerificationMessageWorker.hasFailed(wInfo.outputData)) {
if (SessionSafeCoroutineWorker.hasFailed(wInfo.outputData)) {
Timber.e("## SAS verification [${tx?.transactionId}] failed to send verification message in state : ${tx?.state}")
tx?.cancel(onErrorReason)
} else {
@ -210,7 +210,7 @@ internal class VerificationTransportRoomMessage(
?.filter { it.state == WorkInfo.State.SUCCEEDED }
?.firstOrNull { it.id == workRequest.id }
?.let { wInfo ->
if (SendVerificationMessageWorker.hasFailed(wInfo.outputData)) {
if (SessionSafeCoroutineWorker.hasFailed(wInfo.outputData)) {
callback(null, null)
} else {
val eventId = wInfo.outputData.getString(localId)

View File

@ -19,7 +19,6 @@ package org.matrix.android.sdk.internal.session.content
import android.content.Context
import android.graphics.BitmapFactory
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.tryOrNull
@ -37,13 +36,14 @@ import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.network.ProgressRequestBody
import org.matrix.android.sdk.internal.session.DefaultFileService
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoIdentifiers
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import java.io.File
import java.util.UUID
@ -59,7 +59,8 @@ private data class NewImageAttributes(
* Possible previous worker: None
* Possible next worker : Always [MultipleEventSendingDispatcherWorker]
*/
internal class UploadContentWorker(val context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
internal class UploadContentWorker(val context: Context, params: WorkerParameters)
: SessionSafeCoroutineWorker<UploadContentWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -78,19 +79,12 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
@Inject lateinit var imageCompressor: ImageCompressor
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
Timber.v("Starting upload media work with params $params")
if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
// Just defensive code to ensure that we never have an uncaught exception that could break the queue
return try {
internalDoWork(params)
@ -100,10 +94,11 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
}
}
private suspend fun internalDoWork(params: Params): Result {
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun internalDoWork(params: Params): Result {
val allCancelled = params.localEchoIds.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) }
if (allCancelled) {
// there is no point in uploading the image!
@ -218,14 +213,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
}
} catch (e: Exception) {
Timber.e(e, "## FileService: ERROR")
notifyTracker(params) { contentUploadStateTracker.setFailure(it, e) }
return Result.success(
WorkerParamsFactory.toData(
params.copy(
lastFailureMessage = e.localizedMessage
)
)
)
return handleFailure(params, e)
} finally {
// Delete all temporary files
filesToDelete.forEach {

View File

@ -18,19 +18,19 @@
package org.matrix.android.sdk.internal.session.group
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class GetGroupDataWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
internal class GetGroupDataWorker(context: Context, params: WorkerParameters)
: SessionSafeCoroutineWorker<GetGroupDataWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -40,12 +40,11 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
@Inject lateinit var getGroupDataTask: GetGroupDataTask
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override suspend fun doSafeWork(params: Params): Result {
return runCatching {
getGroupDataTask.execute(GetGroupDataTask.Params.FetchAllActive)
}.fold(
@ -53,4 +52,8 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
{ Result.retry() }
)
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -17,10 +17,10 @@
package org.matrix.android.sdk.internal.session.pushers
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import com.zhuinden.monarchy.Monarchy
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.session.pushers.PusherState
import org.matrix.android.sdk.internal.database.mapper.toEntity
@ -28,16 +28,14 @@ import org.matrix.android.sdk.internal.database.model.PusherEntity
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject
internal class AddHttpPusherWorker(context: Context, params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<AddHttpPusherWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -50,14 +48,11 @@ internal class AddHttpPusherWorker(context: Context, params: WorkerParameters)
@Inject @SessionDatabase lateinit var monarchy: Monarchy
@Inject lateinit var eventBus: EventBus
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val pusher = params.pusher
if (pusher.pushKey.isBlank()) {
@ -82,6 +77,10 @@ internal class AddHttpPusherWorker(context: Context, params: WorkerParameters)
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun setPusher(pusher: JsonPusher) {
executeRequest<Unit>(eventBus) {
apiCall = pushersAPI.setPusher(pusher)

View File

@ -17,7 +17,6 @@
package org.matrix.android.sdk.internal.session.room.relation
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.greenrobot.eventbus.EventBus
@ -27,17 +26,17 @@ import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent
import org.matrix.android.sdk.api.session.room.model.relation.ReactionInfo
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject
// TODO This is not used. Delete?
internal class SendRelationWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
internal class SendRelationWorker(context: Context, params: WorkerParameters)
: SessionSafeCoroutineWorker<SendRelationWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -52,19 +51,11 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
@Inject lateinit var eventBus: EventBus
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent?.eventId == null) {
return Result.failure()
@ -89,6 +80,10 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun sendRelation(roomId: String, relationType: String, relatedEventId: String, localEvent: Event) {
executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.sendRelation(

View File

@ -18,7 +18,6 @@
package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.Failure
@ -31,10 +30,11 @@ import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.util.awaitCallback
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject
@ -43,7 +43,7 @@ import javax.inject.Inject
* Possible next worker : Always [SendEventWorker]
*/
internal class EncryptEventWorker(context: Context, params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<EncryptEventWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -58,20 +58,12 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var cancelSendTracker: CancelSendTracker
override suspend fun doWork(): Result {
Timber.v("Start Encrypt work")
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
Timber.v("## SendEvent: Start Encrypt work for event ${params.eventId}")
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent?.eventId == null) {
@ -148,4 +140,8 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -19,17 +19,17 @@ package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.BackoffPolicy
import androidx.work.CoroutineWorker
import androidx.work.OneTimeWorkRequest
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.content.UploadContentWorker
import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.matrix.android.sdk.internal.worker.startChain
import timber.log.Timber
import java.util.concurrent.TimeUnit
@ -42,7 +42,7 @@ import javax.inject.Inject
* Possible next worker : None, but it will post new work to send events, encrypted or not
*/
internal class MultipleEventSendingDispatcherWorker(context: Context, params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<MultipleEventSendingDispatcherWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -56,22 +56,20 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
@Inject lateinit var timelineSendEventWorkCommon: TimelineSendEventWorkCommon
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
Timber.v("## SendEvent: Start dispatch sending multiple event work")
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
if (params.lastFailureMessage != null) {
params.localEchoIds.forEach { localEchoIds ->
localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED)
}
// Transmit the error if needed?
return Result.success(inputData)
.also { Timber.e("## SendEvent: Work cancelled due to input error from parent ${params.lastFailureMessage}") }
override fun doOnError(params: Params): Result {
params.localEchoIds.forEach { localEchoIds ->
localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED)
}
return super.doOnError(params)
}
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
Timber.v("## SendEvent: Start dispatch sending multiple event work")
// Create a work for every event
params.localEchoIds.forEach { localEchoIds ->
val roomId = localEchoIds.roomId
@ -94,6 +92,10 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
return Result.success()
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private fun createEncryptEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val params = EncryptEventWorker.Params(sessionId, eventId)
val sendWorkData = WorkerParamsFactory.toData(params)

View File

@ -17,24 +17,24 @@
package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class RedactEventWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
internal class RedactEventWorker(context: Context, params: WorkerParameters)
: SessionSafeCoroutineWorker<RedactEventWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -49,19 +49,11 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
@Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val eventId = params.eventId
return runCatching {
executeRequest<SendResponse>(eventBus) {
@ -90,4 +82,8 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
}
)
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
}

View File

@ -18,7 +18,6 @@
package org.matrix.android.sdk.internal.session.room.send
import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import io.realm.RealmConfiguration
@ -28,10 +27,10 @@ import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject
@ -43,7 +42,7 @@ import javax.inject.Inject
*/
internal class SendEventWorker(context: Context,
params: WorkerParameters)
: CoroutineWorker(context, params) {
: SessionSafeCoroutineWorker<SendEventWorker.Params>(context, params, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -58,13 +57,11 @@ internal class SendEventWorker(context: Context,
@Inject lateinit var cancelSendTracker: CancelSendTracker
@SessionDatabase @Inject lateinit var realmConfiguration: RealmConfiguration
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
val event = localEchoRepository.getUpToDateEcho(params.eventId)
if (event?.eventId == null || event.roomId == null) {
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
@ -103,6 +100,10 @@ internal class SendEventWorker(context: Context,
}
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun sendEvent(eventId: String, roomId: String, type: String, content: Content?) {
localEchoRepository.updateSendState(eventId, SendState.SENDING)
executeRequest<SendResponse>(eventBus) {

View File

@ -131,7 +131,7 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
/**
* At the moment we don't get any group data through the sync, so we poll where every hour.
You can also force to refetch group data using [Group] API.
* You can also force to refetch group data using [Group] API.
*/
private fun scheduleGroupDataFetchingIfNeeded(groupsSyncResponse: GroupsSyncResponse) {
val groupIds = ArrayList<String>()

View File

@ -18,18 +18,18 @@ package org.matrix.android.sdk.internal.session.sync.job
import android.content.Context
import androidx.work.BackoffPolicy
import androidx.work.CoroutineWorker
import androidx.work.ExistingWorkPolicy
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.network.NetworkConnectivityChecker
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.sync.SyncTask
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
@ -43,7 +43,7 @@ private const val DEFAULT_DELAY_TIMEOUT = 30_000L
*/
internal class SyncWorker(context: Context,
workerParameters: WorkerParameters
) : CoroutineWorker(context, workerParameters) {
) : SessionSafeCoroutineWorker<SyncWorker.Params>(context, workerParameters, Params::class.java) {
@JsonClass(generateAdapter = true)
internal data class Params(
@ -59,14 +59,13 @@ internal class SyncWorker(context: Context,
@Inject lateinit var networkConnectivityChecker: NetworkConnectivityChecker
@Inject lateinit var workManagerProvider: WorkManagerProvider
override suspend fun doWork(): Result {
Timber.i("Sync work starting")
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
override fun injectWith(injector: SessionComponent) {
injector.inject(this)
}
override suspend fun doSafeWork(params: Params): Result {
Timber.i("Sync work starting")
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
return runCatching {
doSync(params.timeout)
}.fold(
@ -91,6 +90,10 @@ internal class SyncWorker(context: Context,
)
}
override fun buildErrorParams(params: Params, message: String): Params {
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}
private suspend fun doSync(timeout: Long) {
val taskParams = SyncTask.Params(timeout * 1000)
syncTask.execute(taskParams)

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.worker
import android.content.Context
import androidx.annotation.CallSuper
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.internal.session.SessionComponent
import timber.log.Timber
/**
* This worker should only sends Result.Success when added to a unique queue to avoid breaking the unique queue.
* This abstract class handle the cases of problem when parsing parameter, and forward the error if any to
* the next workers.
*/
internal abstract class SessionSafeCoroutineWorker<PARAM : SessionWorkerParams>(
context: Context,
workerParameters: WorkerParameters,
private val paramClass: Class<PARAM>
) : CoroutineWorker(context, workerParameters) {
@JsonClass(generateAdapter = true)
internal data class ErrorData(
override val sessionId: String,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
final override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData(paramClass, inputData)
?: return buildErrorResult(null, "Unable to parse work parameters")
.also { Timber.e("Unable to parse work parameters") }
return try {
val sessionComponent = getSessionComponent(params.sessionId)
?: return buildErrorResult(params, "No session")
// Make sure to inject before handling error as you may need some dependencies to process them.
injectWith(sessionComponent)
if (params.lastFailureMessage != null) {
// Forward error to the next workers
doOnError(params)
} else {
doSafeWork(params)
}
} catch (throwable: Throwable) {
buildErrorResult(params, throwable.localizedMessage ?: "error")
}
}
abstract fun injectWith(injector: SessionComponent)
/**
* Should only return Result.Success for workers added to a unique queue
*/
abstract suspend fun doSafeWork(params: PARAM): Result
protected fun buildErrorResult(params: PARAM?, message: String): Result {
return Result.success(
if (params != null) {
WorkerParamsFactory.toData(paramClass, buildErrorParams(params, message))
} else {
WorkerParamsFactory.toData(ErrorData::class.java, ErrorData(sessionId = "", lastFailureMessage = message))
}
)
}
abstract fun buildErrorParams(params: PARAM, message: String): PARAM
/**
* This is called when the input parameters are correct, but contain an error from the previous worker.
*/
@CallSuper
open fun doOnError(params: PARAM): Result {
// Forward the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
companion object {
fun hasFailed(outputData: Data): Boolean {
return WorkerParamsFactory.fromData(ErrorData::class.java, outputData)
.let { it?.lastFailureMessage != null }
}
}
}

View File

@ -18,13 +18,14 @@
package org.matrix.android.sdk.internal.worker
import androidx.work.Data
import com.squareup.moshi.Moshi
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.network.parsing.CheckNumberType
internal object WorkerParamsFactory {
val moshi by lazy {
private val moshi: Moshi by lazy {
// We are adding the CheckNumberType as we are serializing/deserializing multiple time in a row
// and we lost typing information doing so.
// We don't want this check to be done on all adapters, so we just add it here.
@ -34,20 +35,24 @@ internal object WorkerParamsFactory {
.build()
}
const val KEY = "WORKER_PARAMS_JSON"
private const val KEY = "WORKER_PARAMS_JSON"
inline fun <reified T> toData(params: T): Data {
val adapter = moshi.adapter(T::class.java)
inline fun <reified T> toData(params: T) = toData(T::class.java, params)
fun <T> toData(clazz: Class<T>, params: T): Data {
val adapter = moshi.adapter(clazz)
val json = adapter.toJson(params)
return Data.Builder().putString(KEY, json).build()
}
inline fun <reified T> fromData(data: Data): T? = tryOrNull("Unable to parse work parameters") {
inline fun <reified T> fromData(data: Data) = fromData(T::class.java, data)
fun <T> fromData(clazz: Class<T>, data: Data): T? = tryOrNull("Unable to parse work parameters") {
val json = data.getString(KEY)
return if (json == null) {
null
} else {
val adapter = moshi.adapter(T::class.java)
val adapter = moshi.adapter(clazz)
adapter.fromJson(json)
}
}