Stop using workers for interactive verification
This commit is contained in:
parent
54fb4ae8db
commit
f9dd3b96d6
|
@ -97,6 +97,7 @@ import org.matrix.android.sdk.internal.crypto.tasks.GetDevicesTask
|
|||
import org.matrix.android.sdk.internal.crypto.tasks.SetDeviceNameTask
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.UploadKeysTask
|
||||
import org.matrix.android.sdk.internal.crypto.verification.DefaultVerificationService
|
||||
import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor
|
||||
import org.matrix.android.sdk.internal.di.DeviceId
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
import org.matrix.android.sdk.internal.di.UserId
|
||||
|
@ -183,6 +184,7 @@ internal class DefaultCryptoService @Inject constructor(
|
|||
private val taskExecutor: TaskExecutor,
|
||||
private val cryptoCoroutineScope: CoroutineScope,
|
||||
private val eventDecryptor: EventDecryptor,
|
||||
private val verificationMessageProcessor: VerificationMessageProcessor,
|
||||
private val liveEventManager: Lazy<StreamEventsManager>
|
||||
) : CryptoService {
|
||||
|
||||
|
@ -197,7 +199,7 @@ internal class DefaultCryptoService @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
fun onLiveEvent(roomId: String, event: Event) {
|
||||
fun onLiveEvent(roomId: String, event: Event, isInitialSync: Boolean) {
|
||||
// handle state events
|
||||
if (event.isStateEvent()) {
|
||||
when (event.type) {
|
||||
|
@ -206,6 +208,15 @@ internal class DefaultCryptoService @Inject constructor(
|
|||
EventType.STATE_ROOM_HISTORY_VISIBILITY -> onRoomHistoryVisibilityEvent(roomId, event)
|
||||
}
|
||||
}
|
||||
|
||||
// handle verification
|
||||
if (!isInitialSync) {
|
||||
if (event.type != null && verificationMessageProcessor.shouldProcess(event.type)) {
|
||||
cryptoCoroutineScope.launch(coroutineDispatchers.dmVerif) {
|
||||
verificationMessageProcessor.process(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// val gossipingBuffer = mutableListOf<Event>()
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
package org.matrix.android.sdk.internal.crypto.tasks
|
||||
|
||||
import org.matrix.android.sdk.api.crypto.MXCRYPTO_ALGORITHM_MEGOLM
|
||||
import dagger.Lazy
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.crypto.model.MXEncryptEventContentResult
|
||||
import org.matrix.android.sdk.api.session.crypto.model.MXEventDecryptionResult
|
||||
|
@ -39,7 +40,7 @@ internal interface EncryptEventTask : Task<EncryptEventTask.Params, Event> {
|
|||
|
||||
internal class DefaultEncryptEventTask @Inject constructor(
|
||||
private val localEchoRepository: LocalEchoRepository,
|
||||
private val cryptoService: CryptoService
|
||||
private val cryptoService: Lazy<CryptoService>
|
||||
) : EncryptEventTask {
|
||||
override suspend fun execute(params: EncryptEventTask.Params): Event {
|
||||
// don't want to wait for any query
|
||||
|
@ -59,7 +60,7 @@ internal class DefaultEncryptEventTask @Inject constructor(
|
|||
// try {
|
||||
// let it throws
|
||||
awaitCallback<MXEncryptEventContentResult> {
|
||||
cryptoService.encryptEventContent(localMutableContent, localEvent.type, params.roomId, it)
|
||||
cryptoService.get().encryptEventContent(localMutableContent, localEvent.type, params.roomId, it)
|
||||
}.let { result ->
|
||||
val modifiedContent = HashMap(result.eventContent)
|
||||
params.keepKeys?.forEach { toKeep ->
|
||||
|
@ -80,7 +81,7 @@ internal class DefaultEncryptEventTask @Inject constructor(
|
|||
).toContent(),
|
||||
forwardingCurve25519KeyChain = emptyList(),
|
||||
senderCurve25519Key = result.eventContent["sender_key"] as? String,
|
||||
claimedEd25519Key = cryptoService.getMyDevice().fingerprint()
|
||||
claimedEd25519Key = cryptoService.get().getMyDevice().fingerprint()
|
||||
)
|
||||
} else {
|
||||
null
|
||||
|
|
|
@ -47,7 +47,7 @@ internal class DefaultSendVerificationMessageTask @Inject constructor(
|
|||
localEchoRepository.updateSendState(localId, event.roomId, SendState.SENDING)
|
||||
val response = executeRequest(globalErrorReceiver) {
|
||||
roomAPI.send(
|
||||
localId,
|
||||
txId = localId,
|
||||
roomId = event.roomId ?: "",
|
||||
content = event.content,
|
||||
eventType = event.type ?: ""
|
||||
|
|
|
@ -764,8 +764,15 @@ internal class DefaultVerificationService @Inject constructor(
|
|||
return
|
||||
}
|
||||
|
||||
val roomId = event.roomId
|
||||
if (roomId == null) {
|
||||
Timber.e("## SAS Verification missing roomId for event")
|
||||
// TODO cancel?
|
||||
return
|
||||
}
|
||||
|
||||
handleReadyReceived(event.senderId, readyReq) {
|
||||
verificationTransportRoomMessageFactory.createTransport(event.roomId!!, it)
|
||||
verificationTransportRoomMessageFactory.createTransport(roomId, it)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1171,6 +1178,7 @@ internal class DefaultVerificationService @Inject constructor(
|
|||
}
|
||||
.distinct()
|
||||
|
||||
requestsForUser.add(verificationRequest)
|
||||
transport.sendVerificationRequest(methodValues, validLocalId, otherUserId, roomId, null) { syncedId, info ->
|
||||
// We need to update with the syncedID
|
||||
updatePendingRequest(verificationRequest.copy(
|
||||
|
@ -1180,7 +1188,6 @@ internal class DefaultVerificationService @Inject constructor(
|
|||
))
|
||||
}
|
||||
|
||||
requestsForUser.add(verificationRequest)
|
||||
dispatchRequestAdded(verificationRequest)
|
||||
|
||||
return verificationRequest
|
||||
|
|
|
@ -1,88 +0,0 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.matrix.android.sdk.internal.crypto.verification
|
||||
|
||||
import android.content.Context
|
||||
import androidx.work.Data
|
||||
import androidx.work.WorkerParameters
|
||||
import com.squareup.moshi.JsonClass
|
||||
import org.matrix.android.sdk.api.failure.shouldBeRetried
|
||||
import org.matrix.android.sdk.internal.SessionManager
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
|
||||
import org.matrix.android.sdk.internal.session.SessionComponent
|
||||
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
|
||||
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
|
||||
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
|
||||
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
/**
|
||||
* Possible previous worker: None
|
||||
* Possible next worker : None
|
||||
*/
|
||||
internal class SendVerificationMessageWorker(context: Context, params: WorkerParameters, sessionManager: SessionManager) :
|
||||
SessionSafeCoroutineWorker<SendVerificationMessageWorker.Params>(context, params, sessionManager, Params::class.java) {
|
||||
|
||||
@JsonClass(generateAdapter = true)
|
||||
internal data class Params(
|
||||
override val sessionId: String,
|
||||
val eventId: String,
|
||||
override val lastFailureMessage: String? = null
|
||||
) : SessionWorkerParams
|
||||
|
||||
@Inject lateinit var sendVerificationMessageTask: SendVerificationMessageTask
|
||||
@Inject lateinit var localEchoRepository: LocalEchoRepository
|
||||
@Inject lateinit var cancelSendTracker: CancelSendTracker
|
||||
|
||||
override fun injectWith(injector: SessionComponent) {
|
||||
injector.inject(this)
|
||||
}
|
||||
|
||||
override suspend fun doSafeWork(params: Params): Result {
|
||||
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) ?: return buildErrorResult(params, "Event not found")
|
||||
val localEventId = localEvent.eventId ?: ""
|
||||
val roomId = localEvent.roomId ?: ""
|
||||
|
||||
if (cancelSendTracker.isCancelRequestedFor(localEventId, roomId)) {
|
||||
return Result.success()
|
||||
.also {
|
||||
cancelSendTracker.markCancelled(localEventId, roomId)
|
||||
Timber.e("## SendEvent: Event sending has been cancelled $localEventId")
|
||||
}
|
||||
}
|
||||
|
||||
return try {
|
||||
val resultEventId = sendVerificationMessageTask.execute(
|
||||
SendVerificationMessageTask.Params(
|
||||
event = localEvent
|
||||
)
|
||||
)
|
||||
|
||||
Result.success(Data.Builder().putString(localEventId, resultEventId).build())
|
||||
} catch (throwable: Throwable) {
|
||||
if (throwable.shouldBeRetried()) {
|
||||
Result.retry()
|
||||
} else {
|
||||
buildErrorResult(params, throwable.localizedMessage ?: "error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun buildErrorParams(params: Params, message: String): Params {
|
||||
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
|
||||
}
|
||||
}
|
|
@ -15,13 +15,9 @@
|
|||
*/
|
||||
package org.matrix.android.sdk.internal.crypto.verification
|
||||
|
||||
import io.realm.Realm
|
||||
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
||||
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
|
||||
import org.matrix.android.sdk.api.session.crypto.verification.VerificationService
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.events.model.EventType
|
||||
import org.matrix.android.sdk.api.session.events.model.LocalEcho
|
||||
import org.matrix.android.sdk.api.session.events.model.toModel
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageRelationContent
|
||||
|
@ -29,20 +25,16 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageType
|
|||
import org.matrix.android.sdk.api.session.room.model.message.MessageVerificationReadyContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageVerificationRequestContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageVerificationStartContent
|
||||
import org.matrix.android.sdk.internal.crypto.EventDecryptor
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.di.DeviceId
|
||||
import org.matrix.android.sdk.internal.di.UserId
|
||||
import org.matrix.android.sdk.internal.session.EventInsertLiveProcessor
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class VerificationMessageProcessor @Inject constructor(
|
||||
private val eventDecryptor: EventDecryptor,
|
||||
private val verificationService: DefaultVerificationService,
|
||||
@UserId private val userId: String,
|
||||
@DeviceId private val deviceId: String?
|
||||
) : EventInsertLiveProcessor {
|
||||
) {
|
||||
|
||||
private val transactionsHandledByOtherDevice = ArrayList<String>()
|
||||
|
||||
|
@ -58,41 +50,20 @@ internal class VerificationMessageProcessor @Inject constructor(
|
|||
EventType.ENCRYPTED
|
||||
)
|
||||
|
||||
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
|
||||
if (insertType != EventInsertType.INCREMENTAL_SYNC) {
|
||||
return false
|
||||
}
|
||||
return allowedTypes.contains(eventType) && !LocalEcho.isLocalEchoId(eventId)
|
||||
fun shouldProcess(eventType: String): Boolean {
|
||||
return allowedTypes.contains(eventType)
|
||||
}
|
||||
|
||||
override suspend fun process(realm: Realm, event: Event) {
|
||||
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
|
||||
suspend fun process(event: Event) {
|
||||
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.getClearType()} from ${event.senderId}")
|
||||
|
||||
// If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
|
||||
// the message should be ignored by the receiver.
|
||||
|
||||
if (!VerificationService.isValidRequest(event.ageLocalTs
|
||||
?: event.originServerTs)) return Unit.also {
|
||||
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated")
|
||||
if (event.ageLocalTs != null && !VerificationService.isValidRequest(event.ageLocalTs)) return Unit.also {
|
||||
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated age:$event.ageLocalTs ms")
|
||||
}
|
||||
|
||||
// decrypt if needed?
|
||||
if (event.isEncrypted() && event.mxDecryptionResult == null) {
|
||||
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
|
||||
// for now decrypt sync
|
||||
try {
|
||||
val result = eventDecryptor.decryptEvent(event, "")
|
||||
event.mxDecryptionResult = OlmDecryptionResult(
|
||||
payload = result.clearEvent,
|
||||
senderKey = result.senderCurve25519Key,
|
||||
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
|
||||
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
|
||||
)
|
||||
} catch (e: MXCryptoError) {
|
||||
Timber.e("## SAS Failed to decrypt event: ${event.eventId}")
|
||||
verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
|
||||
}
|
||||
}
|
||||
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} type: ${event.getClearType()}")
|
||||
|
||||
// Relates to is not encrypted
|
||||
|
@ -101,7 +72,6 @@ internal class VerificationMessageProcessor @Inject constructor(
|
|||
if (event.senderId == userId) {
|
||||
// If it's send from me, we need to keep track of Requests or Start
|
||||
// done from another device of mine
|
||||
|
||||
if (EventType.MESSAGE == event.getClearType()) {
|
||||
val msgType = event.getClearContent().toModel<MessageContent>()?.msgType
|
||||
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == msgType) {
|
||||
|
@ -136,6 +106,8 @@ internal class VerificationMessageProcessor @Inject constructor(
|
|||
transactionsHandledByOtherDevice.remove(it)
|
||||
verificationService.onRoomRequestHandledByOtherDevice(event)
|
||||
}
|
||||
} else if (EventType.ENCRYPTED == event.getClearType()) {
|
||||
verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
|
||||
}
|
||||
|
||||
Timber.v("## SAS Verification ignoring message sent by me: ${event.eventId} type: ${event.getClearType()}")
|
||||
|
|
|
@ -15,14 +15,9 @@
|
|||
*/
|
||||
package org.matrix.android.sdk.internal.crypto.verification
|
||||
|
||||
import androidx.lifecycle.Observer
|
||||
import androidx.work.BackoffPolicy
|
||||
import androidx.work.Data
|
||||
import androidx.work.ExistingWorkPolicy
|
||||
import androidx.work.Operation
|
||||
import androidx.work.WorkInfo
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.android.sdk.api.session.crypto.verification.CancelCode
|
||||
import org.matrix.android.sdk.api.session.crypto.verification.ValidVerificationInfoRequest
|
||||
|
@ -45,25 +40,25 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageVerification
|
|||
import org.matrix.android.sdk.api.session.room.model.relation.RelationDefaultContent
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.VERIFICATION_METHOD_RECIPROCATE
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.VERIFICATION_METHOD_SAS
|
||||
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
|
||||
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
|
||||
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
|
||||
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
|
||||
import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer
|
||||
import timber.log.Timber
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
internal class VerificationTransportRoomMessage(
|
||||
private val workManagerProvider: WorkManagerProvider,
|
||||
private val sessionId: String,
|
||||
private val sendVerificationMessageTask: SendVerificationMessageTask,
|
||||
private val userId: String,
|
||||
private val userDeviceId: String?,
|
||||
private val roomId: String,
|
||||
private val localEchoEventFactory: LocalEchoEventFactory,
|
||||
private val tx: DefaultVerificationTransaction?,
|
||||
private val coroutineScope: CoroutineScope
|
||||
private val tx: DefaultVerificationTransaction?
|
||||
) : VerificationTransport {
|
||||
|
||||
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val verificationSenderScope = CoroutineScope(SupervisorJob() + dispatcher)
|
||||
private val sequencer = SemaphoreCoroutineSequencer()
|
||||
|
||||
override fun <T> sendToOther(type: String,
|
||||
verificationInfo: VerificationInfo<T>,
|
||||
nextState: VerificationTxState,
|
||||
|
@ -77,68 +72,22 @@ internal class VerificationTransportRoomMessage(
|
|||
content = verificationInfo.toEventContent()!!
|
||||
)
|
||||
|
||||
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
|
||||
sessionId = sessionId,
|
||||
eventId = event.eventId ?: ""
|
||||
))
|
||||
val enqueueInfo = enqueueSendWork(workerParams)
|
||||
|
||||
// I cannot just listen to the given work request, because when used in a uniqueWork,
|
||||
// The callback is called while it is still Running ...
|
||||
|
||||
// Futures.addCallback(enqueueInfo.first.result, object : FutureCallback<Operation.State.SUCCESS> {
|
||||
// override fun onSuccess(result: Operation.State.SUCCESS?) {
|
||||
// if (onDone != null) {
|
||||
// onDone()
|
||||
// } else {
|
||||
// tx?.state = nextState
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// override fun onFailure(t: Throwable) {
|
||||
// Timber.e("## SAS verification [${tx?.transactionId}] failed to send toDevice in state : ${tx?.state}, reason: ${t.localizedMessage}")
|
||||
// tx?.cancel(onErrorReason)
|
||||
// }
|
||||
// }, listenerExecutor)
|
||||
|
||||
val workLiveData = workManagerProvider.workManager
|
||||
.getWorkInfosForUniqueWorkLiveData(uniqueQueueName())
|
||||
|
||||
val observer = object : Observer<List<WorkInfo>> {
|
||||
override fun onChanged(workInfoList: List<WorkInfo>?) {
|
||||
workInfoList
|
||||
?.firstOrNull { it.id == enqueueInfo.second }
|
||||
?.let { wInfo ->
|
||||
when (wInfo.state) {
|
||||
WorkInfo.State.FAILED -> {
|
||||
tx?.cancel(onErrorReason)
|
||||
workLiveData.removeObserver(this)
|
||||
}
|
||||
WorkInfo.State.SUCCEEDED -> {
|
||||
if (SessionSafeCoroutineWorker.hasFailed(wInfo.outputData)) {
|
||||
Timber.e("## SAS verification [${tx?.transactionId}] failed to send verification message in state : ${tx?.state}")
|
||||
tx?.cancel(onErrorReason)
|
||||
} else {
|
||||
if (onDone != null) {
|
||||
onDone()
|
||||
} else {
|
||||
tx?.state = nextState
|
||||
}
|
||||
}
|
||||
workLiveData.removeObserver(this)
|
||||
}
|
||||
else -> {
|
||||
// nop
|
||||
}
|
||||
}
|
||||
}
|
||||
verificationSenderScope.launch {
|
||||
sequencer.post {
|
||||
try {
|
||||
val params = SendVerificationMessageTask.Params(event)
|
||||
sendVerificationMessageTask.executeRetry(params, 5)
|
||||
// Do I need to update local echo state to sent?
|
||||
if (onDone != null) {
|
||||
onDone()
|
||||
} else {
|
||||
tx?.state = nextState
|
||||
}
|
||||
} catch (failure: Throwable) {
|
||||
tx?.cancel(onErrorReason)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO listen to DB to get synced info
|
||||
coroutineScope.launch(Dispatchers.Main) {
|
||||
workLiveData.observeForever(observer)
|
||||
}
|
||||
}
|
||||
|
||||
override fun sendVerificationRequest(supportedMethods: List<String>,
|
||||
|
@ -169,58 +118,24 @@ internal class VerificationTransportRoomMessage(
|
|||
val content = info.toContent()
|
||||
|
||||
val event = createEventAndLocalEcho(
|
||||
localId,
|
||||
EventType.MESSAGE,
|
||||
roomId,
|
||||
content
|
||||
localId = localId,
|
||||
type = EventType.MESSAGE,
|
||||
roomId = roomId,
|
||||
content = content
|
||||
)
|
||||
|
||||
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
|
||||
sessionId = sessionId,
|
||||
eventId = event.eventId ?: ""
|
||||
))
|
||||
|
||||
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>()
|
||||
.setConstraints(WorkManagerProvider.workConstraints)
|
||||
.setInputData(workerParams)
|
||||
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
|
||||
workManagerProvider.workManager
|
||||
.beginUniqueWork("${roomId}_VerificationWork", ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
|
||||
.enqueue()
|
||||
|
||||
// I cannot just listen to the given work request, because when used in a uniqueWork,
|
||||
// The callback is called while it is still Running ...
|
||||
|
||||
val workLiveData = workManagerProvider.workManager
|
||||
.getWorkInfosForUniqueWorkLiveData("${roomId}_VerificationWork")
|
||||
|
||||
val observer = object : Observer<List<WorkInfo>> {
|
||||
override fun onChanged(workInfoList: List<WorkInfo>?) {
|
||||
workInfoList
|
||||
?.filter { it.state == WorkInfo.State.SUCCEEDED }
|
||||
?.firstOrNull { it.id == workRequest.id }
|
||||
?.let { wInfo ->
|
||||
if (SessionSafeCoroutineWorker.hasFailed(wInfo.outputData)) {
|
||||
callback(null, null)
|
||||
} else {
|
||||
val eventId = wInfo.outputData.getString(localId)
|
||||
if (eventId != null) {
|
||||
callback(eventId, validInfo)
|
||||
} else {
|
||||
callback(null, null)
|
||||
}
|
||||
}
|
||||
workLiveData.removeObserver(this)
|
||||
}
|
||||
verificationSenderScope.launch {
|
||||
val params = SendVerificationMessageTask.Params(event)
|
||||
sequencer.post {
|
||||
try {
|
||||
val eventId = sendVerificationMessageTask.executeRetry(params, 5)
|
||||
// Do I need to update local echo state to sent?
|
||||
callback(eventId, validInfo)
|
||||
} catch (failure: Throwable) {
|
||||
callback(null, null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO listen to DB to get synced info
|
||||
coroutineScope.launch(Dispatchers.Main) {
|
||||
workLiveData.observeForever(observer)
|
||||
}
|
||||
}
|
||||
|
||||
override fun cancelTransaction(transactionId: String, otherUserId: String, otherUserDeviceId: String?, code: CancelCode) {
|
||||
|
@ -230,11 +145,17 @@ internal class VerificationTransportRoomMessage(
|
|||
roomId = roomId,
|
||||
content = MessageVerificationCancelContent.create(transactionId, code).toContent()
|
||||
)
|
||||
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
|
||||
sessionId = sessionId,
|
||||
eventId = event.eventId ?: ""
|
||||
))
|
||||
enqueueSendWork(workerParams)
|
||||
|
||||
verificationSenderScope.launch {
|
||||
sequencer.post {
|
||||
try {
|
||||
val params = SendVerificationMessageTask.Params(event)
|
||||
sendVerificationMessageTask.executeRetry(params, 5)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.w("")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun done(transactionId: String,
|
||||
|
@ -250,44 +171,56 @@ internal class VerificationTransportRoomMessage(
|
|||
)
|
||||
).toContent()
|
||||
)
|
||||
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
|
||||
sessionId = sessionId,
|
||||
eventId = event.eventId ?: ""
|
||||
))
|
||||
val enqueueInfo = enqueueSendWork(workerParams)
|
||||
|
||||
val workLiveData = workManagerProvider.workManager
|
||||
.getWorkInfosForUniqueWorkLiveData(uniqueQueueName())
|
||||
val observer = object : Observer<List<WorkInfo>> {
|
||||
override fun onChanged(workInfoList: List<WorkInfo>?) {
|
||||
workInfoList
|
||||
?.filter { it.state == WorkInfo.State.SUCCEEDED }
|
||||
?.firstOrNull { it.id == enqueueInfo.second }
|
||||
?.let { _ ->
|
||||
onDone?.invoke()
|
||||
workLiveData.removeObserver(this)
|
||||
}
|
||||
verificationSenderScope.launch {
|
||||
sequencer.post {
|
||||
try {
|
||||
val params = SendVerificationMessageTask.Params(event)
|
||||
sendVerificationMessageTask.executeRetry(params, 5)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.w("")
|
||||
} finally {
|
||||
onDone?.invoke()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO listen to DB to get synced info
|
||||
coroutineScope.launch(Dispatchers.Main) {
|
||||
workLiveData.observeForever(observer)
|
||||
}
|
||||
// val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
|
||||
// sessionId = sessionId,
|
||||
// eventId = event.eventId ?: ""
|
||||
// ))
|
||||
// val enqueueInfo = enqueueSendWork(workerParams)
|
||||
//
|
||||
// val workLiveData = workManagerProvider.workManager
|
||||
// .getWorkInfosForUniqueWorkLiveData(uniqueQueueName())
|
||||
// val observer = object : Observer<List<WorkInfo>> {
|
||||
// override fun onChanged(workInfoList: List<WorkInfo>?) {
|
||||
// workInfoList
|
||||
// ?.filter { it.state == WorkInfo.State.SUCCEEDED }
|
||||
// ?.firstOrNull { it.id == enqueueInfo.second }
|
||||
// ?.let { _ ->
|
||||
// onDone?.invoke()
|
||||
// workLiveData.removeObserver(this)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // TODO listen to DB to get synced info
|
||||
// coroutineScope.launch(Dispatchers.Main) {
|
||||
// workLiveData.observeForever(observer)
|
||||
// }
|
||||
}
|
||||
|
||||
private fun enqueueSendWork(workerParams: Data): Pair<Operation, UUID> {
|
||||
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>()
|
||||
.setConstraints(WorkManagerProvider.workConstraints)
|
||||
.setInputData(workerParams)
|
||||
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
return workManagerProvider.workManager
|
||||
.beginUniqueWork(uniqueQueueName(), ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
|
||||
.enqueue() to workRequest.id
|
||||
}
|
||||
// private fun enqueueSendWork(workerParams: Data): Pair<Operation, UUID> {
|
||||
// val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>()
|
||||
// .setConstraints(WorkManagerProvider.workConstraints)
|
||||
// .setInputData(workerParams)
|
||||
// .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
|
||||
// .build()
|
||||
// return workManagerProvider.workManager
|
||||
// .beginUniqueWork(uniqueQueueName(), ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
|
||||
// .enqueue() to workRequest.id
|
||||
// }
|
||||
|
||||
private fun uniqueQueueName() = "${roomId}_VerificationWork"
|
||||
// private fun uniqueQueueName() = "${roomId}_VerificationWork"
|
||||
|
||||
override fun createAccept(tid: String,
|
||||
keyAgreementProtocol: String,
|
||||
|
|
|
@ -16,34 +16,28 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.crypto.verification
|
||||
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
|
||||
import org.matrix.android.sdk.internal.di.DeviceId
|
||||
import org.matrix.android.sdk.internal.di.SessionId
|
||||
import org.matrix.android.sdk.internal.di.UserId
|
||||
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class VerificationTransportRoomMessageFactory @Inject constructor(
|
||||
private val workManagerProvider: WorkManagerProvider,
|
||||
@SessionId
|
||||
private val sessionId: String,
|
||||
private val sendVerificationMessageTask: SendVerificationMessageTask,
|
||||
@UserId
|
||||
private val userId: String,
|
||||
@DeviceId
|
||||
private val deviceId: String?,
|
||||
private val localEchoEventFactory: LocalEchoEventFactory,
|
||||
private val taskExecutor: TaskExecutor
|
||||
private val localEchoEventFactory: LocalEchoEventFactory
|
||||
) {
|
||||
|
||||
fun createTransport(roomId: String, tx: DefaultVerificationTransaction?): VerificationTransportRoomMessage {
|
||||
return VerificationTransportRoomMessage(workManagerProvider,
|
||||
sessionId,
|
||||
userId,
|
||||
deviceId,
|
||||
roomId,
|
||||
localEchoEventFactory,
|
||||
tx,
|
||||
taskExecutor.executorScope)
|
||||
return VerificationTransportRoomMessage(
|
||||
sendVerificationMessageTask = sendVerificationMessageTask,
|
||||
userId = userId,
|
||||
userDeviceId = deviceId,
|
||||
roomId = roomId,
|
||||
localEchoEventFactory = localEchoEventFactory,
|
||||
tx = tx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.matrix.android.sdk.api.auth.data.SessionParams
|
|||
import org.matrix.android.sdk.api.session.Session
|
||||
import org.matrix.android.sdk.internal.crypto.CryptoModule
|
||||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker
|
||||
import org.matrix.android.sdk.internal.crypto.verification.SendVerificationMessageWorker
|
||||
import org.matrix.android.sdk.internal.di.MatrixComponent
|
||||
import org.matrix.android.sdk.internal.federation.FederationModule
|
||||
import org.matrix.android.sdk.internal.network.NetworkConnectivityChecker
|
||||
|
@ -129,8 +128,6 @@ internal interface SessionComponent {
|
|||
|
||||
fun inject(worker: AddPusherWorker)
|
||||
|
||||
fun inject(worker: SendVerificationMessageWorker)
|
||||
|
||||
fun inject(worker: UpdateTrustWorker)
|
||||
|
||||
@Component.Factory
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.matrix.android.sdk.api.util.md5
|
|||
import org.matrix.android.sdk.internal.crypto.secrets.DefaultSharedSecretStorageService
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.DefaultRedactEventTask
|
||||
import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask
|
||||
import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor
|
||||
import org.matrix.android.sdk.internal.database.EventInsertLiveObserver
|
||||
import org.matrix.android.sdk.internal.database.RealmSessionProvider
|
||||
import org.matrix.android.sdk.internal.database.SessionRealmConfigurationFactory
|
||||
|
@ -318,10 +317,6 @@ internal abstract class SessionModule {
|
|||
@IntoSet
|
||||
abstract fun bindRoomCreateEventProcessor(processor: RoomCreateEventProcessor): EventInsertLiveProcessor
|
||||
|
||||
@Binds
|
||||
@IntoSet
|
||||
abstract fun bindVerificationMessageProcessor(processor: VerificationMessageProcessor): EventInsertLiveProcessor
|
||||
|
||||
@Binds
|
||||
@IntoSet
|
||||
abstract fun bindCallEventProcessor(processor: CallEventProcessor): EventInsertLiveProcessor
|
||||
|
|
|
@ -378,7 +378,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
val roomMemberContentsByUser = HashMap<String, RoomMemberContent?>()
|
||||
|
||||
val optimizedThreadSummaryMap = hashMapOf<String, EventEntity>()
|
||||
for (event in eventList) {
|
||||
for (rawEvent in eventList) {
|
||||
// It's annoying roomId is not there, but lot of code rely on it.
|
||||
// And had to do it now as copy would delete all decryption results..
|
||||
val event = rawEvent.copy(roomId = roomId)
|
||||
if (event.eventId == null || event.senderId == null || event.type == null) {
|
||||
continue
|
||||
}
|
||||
|
@ -445,7 +448,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
}
|
||||
}
|
||||
// Give info to crypto module
|
||||
cryptoService.onLiveEvent(roomEntity.roomId, event)
|
||||
cryptoService.onLiveEvent(roomEntity.roomId, event, isInitialSync)
|
||||
|
||||
// Try to remove local echo
|
||||
event.unsignedData?.transactionId?.also {
|
||||
|
|
|
@ -23,7 +23,6 @@ import androidx.work.WorkerFactory
|
|||
import androidx.work.WorkerParameters
|
||||
import org.matrix.android.sdk.internal.SessionManager
|
||||
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker
|
||||
import org.matrix.android.sdk.internal.crypto.verification.SendVerificationMessageWorker
|
||||
import org.matrix.android.sdk.internal.di.MatrixScope
|
||||
import org.matrix.android.sdk.internal.session.content.UploadContentWorker
|
||||
import org.matrix.android.sdk.internal.session.group.GetGroupDataWorker
|
||||
|
@ -61,8 +60,6 @@ internal class MatrixWorkerFactory @Inject constructor(private val sessionManage
|
|||
RedactEventWorker(appContext, workerParameters, sessionManager)
|
||||
SendEventWorker::class.java.name ->
|
||||
SendEventWorker(appContext, workerParameters, sessionManager)
|
||||
SendVerificationMessageWorker::class.java.name ->
|
||||
SendVerificationMessageWorker(appContext, workerParameters, sessionManager)
|
||||
SyncWorker::class.java.name ->
|
||||
SyncWorker(appContext, workerParameters, sessionManager)
|
||||
UpdateTrustWorker::class.java.name ->
|
||||
|
|
Loading…
Reference in New Issue