Merge pull request #2129 from vector-im/feature/fix_sending_too_long

Feature/fix sending too long
This commit is contained in:
Benoit Marty 2020-09-21 12:09:13 +02:00 committed by GitHub
commit 6486b9e5cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 248 additions and 161 deletions

View File

@ -8,6 +8,7 @@ Improvements 🙌:
- Add "show password" in import Megolm keys dialog - Add "show password" in import Megolm keys dialog
Bugfix 🐛: Bugfix 🐛:
- Long message cannot be sent/takes infinite time & blocks other messages #1397
- User Verification in DM not working - User Verification in DM not working
- Manual import of Megolm keys does back up the imported keys - Manual import of Megolm keys does back up the imported keys

View File

@ -20,6 +20,10 @@ package org.matrix.android.sdk.internal.crypto.store.db
import androidx.lifecycle.LiveData import androidx.lifecycle.LiveData
import androidx.lifecycle.Transformations import androidx.lifecycle.Transformations
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.Sort
import io.realm.kotlin.where
import org.matrix.android.sdk.api.auth.data.Credentials import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.api.session.crypto.crosssigning.MXCrossSigningInfo import org.matrix.android.sdk.api.session.crypto.crosssigning.MXCrossSigningInfo
import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.Event
@ -85,10 +89,6 @@ import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.di.CryptoDatabase import org.matrix.android.sdk.internal.di.CryptoDatabase
import org.matrix.android.sdk.internal.di.MoshiProvider import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.session.SessionScope import org.matrix.android.sdk.internal.session.SessionScope
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.Sort
import io.realm.kotlin.where
import org.matrix.olm.OlmAccount import org.matrix.olm.OlmAccount
import org.matrix.olm.OlmException import org.matrix.olm.OlmException
import timber.log.Timber import timber.log.Timber
@ -541,7 +541,7 @@ internal class RealmCryptoStore @Inject constructor(
deviceId = it.deviceId deviceId = it.deviceId
) )
} }
monarchy.writeAsync { realm -> doRealmTransactionAsync(realmConfiguration) { realm ->
realm.where<MyDeviceLastSeenInfoEntity>().findAll().deleteAllFromRealm() realm.where<MyDeviceLastSeenInfoEntity>().findAll().deleteAllFromRealm()
entities.forEach { entities.forEach {
realm.insertOrUpdate(it) realm.insertOrUpdate(it)
@ -1191,7 +1191,7 @@ internal class RealmCryptoStore @Inject constructor(
.findAll() .findAll()
.mapNotNull { entity -> .mapNotNull { entity ->
when (entity.type) { when (entity.type) {
GossipRequestType.KEY -> { GossipRequestType.KEY -> {
IncomingRoomKeyRequest( IncomingRoomKeyRequest(
userId = entity.otherUserId, userId = entity.otherUserId,
deviceId = entity.otherDeviceId, deviceId = entity.otherDeviceId,

View File

@ -23,9 +23,9 @@ import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.shouldBeRetried import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent import org.matrix.android.sdk.internal.worker.getSessionComponent
@ -43,13 +43,16 @@ internal class SendVerificationMessageWorker(context: Context,
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val event: Event, val eventId: String,
override val lastFailureMessage: String? = null override val lastFailureMessage: String? = null
) : SessionWorkerParams ) : SessionWorkerParams
@Inject @Inject
lateinit var sendVerificationMessageTask: SendVerificationMessageTask lateinit var sendVerificationMessageTask: SendVerificationMessageTask
@Inject
lateinit var localEchoRepository: LocalEchoRepository
@Inject @Inject
lateinit var cryptoService: CryptoService lateinit var cryptoService: CryptoService
@ -67,25 +70,27 @@ internal class SendVerificationMessageWorker(context: Context,
} }
sessionComponent.inject(this) sessionComponent.inject(this)
val localId = params.event.eventId ?: "" val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) ?: return Result.success(errorOutputData)
val localEventId = localEvent.eventId ?: ""
val roomId = localEvent.roomId ?: ""
if (cancelSendTracker.isCancelRequestedFor(localId, params.event.roomId)) { if (cancelSendTracker.isCancelRequestedFor(localEventId, roomId)) {
return Result.success() return Result.success()
.also { .also {
cancelSendTracker.markCancelled(localId, params.event.roomId ?: "") cancelSendTracker.markCancelled(localEventId, roomId)
Timber.e("## SendEvent: Event sending has been cancelled $localId") Timber.e("## SendEvent: Event sending has been cancelled $localEventId")
} }
} }
return try { return try {
val eventId = sendVerificationMessageTask.execute( val resultEventId = sendVerificationMessageTask.execute(
SendVerificationMessageTask.Params( SendVerificationMessageTask.Params(
event = params.event, event = localEvent,
cryptoService = cryptoService cryptoService = cryptoService
) )
) )
Result.success(Data.Builder().putString(localId, eventId).build()) Result.success(Data.Builder().putString(localEventId, resultEventId).build())
} catch (exception: Throwable) { } catch (exception: Throwable) {
if (exception.shouldBeRetried()) { if (exception.shouldBeRetried()) {
Result.retry() Result.retry()

View File

@ -22,6 +22,9 @@ import androidx.work.Data
import androidx.work.ExistingWorkPolicy import androidx.work.ExistingWorkPolicy
import androidx.work.Operation import androidx.work.Operation
import androidx.work.WorkInfo import androidx.work.WorkInfo
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.matrix.android.sdk.R import org.matrix.android.sdk.R
import org.matrix.android.sdk.api.session.crypto.verification.CancelCode import org.matrix.android.sdk.api.session.crypto.verification.CancelCode
import org.matrix.android.sdk.api.session.crypto.verification.ValidVerificationInfoRequest import org.matrix.android.sdk.api.session.crypto.verification.ValidVerificationInfoRequest
@ -52,9 +55,6 @@ import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.StringProvider import org.matrix.android.sdk.internal.util.StringProvider
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import timber.log.Timber import timber.log.Timber
import java.util.UUID import java.util.UUID
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -87,7 +87,7 @@ internal class VerificationTransportRoomMessage(
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params( val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId, sessionId = sessionId,
event = event eventId = event.eventId ?: ""
)) ))
val enqueueInfo = enqueueSendWork(workerParams) val enqueueInfo = enqueueSendWork(workerParams)
@ -185,7 +185,7 @@ internal class VerificationTransportRoomMessage(
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params( val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId, sessionId = sessionId,
event = event eventId = event.eventId ?: ""
)) ))
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>() val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>()
@ -240,7 +240,7 @@ internal class VerificationTransportRoomMessage(
) )
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params( val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId, sessionId = sessionId,
event = event eventId = event.eventId ?: ""
)) ))
enqueueSendWork(workerParams) enqueueSendWork(workerParams)
} }
@ -260,7 +260,7 @@ internal class VerificationTransportRoomMessage(
) )
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params( val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId, sessionId = sessionId,
event = event eventId = event.eventId ?: ""
)) ))
val enqueueInfo = enqueueSendWork(workerParams) val enqueueInfo = enqueueSendWork(workerParams)

View File

@ -16,31 +16,52 @@
*/ */
package org.matrix.android.sdk.internal.database package org.matrix.android.sdk.internal.database
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm import io.realm.Realm
import io.realm.RealmConfiguration import io.realm.RealmConfiguration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import timber.log.Timber import timber.log.Timber
suspend fun <T> awaitTransaction(config: RealmConfiguration, transaction: suspend (realm: Realm) -> T) = withContext(Dispatchers.Default) { internal fun <T> CoroutineScope.asyncTransaction(monarchy: Monarchy, transaction: suspend (realm: Realm) -> T) {
Realm.getInstance(config).use { bgRealm -> asyncTransaction(monarchy.realmConfiguration, transaction)
bgRealm.beginTransaction() }
val result: T
try { internal fun <T> CoroutineScope.asyncTransaction(realmConfiguration: RealmConfiguration, transaction: suspend (realm: Realm) -> T) {
val start = System.currentTimeMillis() launch {
result = transaction(bgRealm) awaitTransaction(realmConfiguration, transaction)
if (isActive) { }
bgRealm.commitTransaction() }
val end = System.currentTimeMillis()
val time = end - start private val realmSemaphore = Semaphore(1)
Timber.v("Execute transaction in $time millis")
} suspend fun <T> awaitTransaction(config: RealmConfiguration, transaction: suspend (realm: Realm) -> T): T {
} finally { return realmSemaphore.withPermit {
if (bgRealm.isInTransaction) { withContext(Dispatchers.IO) {
bgRealm.cancelTransaction() Realm.getInstance(config).use { bgRealm ->
} bgRealm.beginTransaction()
} val result: T
result try {
val start = System.currentTimeMillis()
result = transaction(bgRealm)
if (isActive) {
bgRealm.commitTransaction()
val end = System.currentTimeMillis()
val time = end - start
Timber.v("Execute transaction in $time millis")
}
} finally {
if (bgRealm.isInTransaction) {
bgRealm.cancelTransaction()
}
}
result
}
}
} }
} }

View File

@ -20,21 +20,34 @@ package org.matrix.android.sdk.internal.database.mapper
import org.matrix.android.sdk.api.session.events.model.Content import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.util.JSON_DICT_PARAMETERIZED_TYPE import org.matrix.android.sdk.api.util.JSON_DICT_PARAMETERIZED_TYPE
import org.matrix.android.sdk.internal.di.MoshiProvider import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.network.parsing.CheckNumberType
internal object ContentMapper { internal object ContentMapper {
private val moshi = MoshiProvider.providesMoshi() private val moshi = MoshiProvider.providesMoshi()
private val adapter = moshi.adapter<Content>(JSON_DICT_PARAMETERIZED_TYPE) private val castJsonNumberMoshi by lazy {
// We are adding the CheckNumberType as we are serializing/deserializing multiple time in a row
// and we lost typing information doing so.
// We don't want this check to be done on all adapters, so we create a new moshi just for that.
MoshiProvider.providesMoshi()
.newBuilder()
.add(CheckNumberType.JSON_ADAPTER_FACTORY)
.build()
}
fun map(content: String?): Content? { fun map(content: String?, castJsonNumbers: Boolean = false): Content? {
return content?.let { return content?.let {
adapter.fromJson(it) if (castJsonNumbers) {
castJsonNumberMoshi
} else {
moshi
}.adapter<Content>(JSON_DICT_PARAMETERIZED_TYPE).fromJson(it)
} }
} }
fun map(content: Content?): String? { fun map(content: Content?): String? {
return content?.let { return content?.let {
adapter.toJson(it) moshi.adapter<Content>(JSON_DICT_PARAMETERIZED_TYPE).toJson(it)
} }
} }
} }

View File

@ -54,7 +54,7 @@ internal object EventMapper {
return eventEntity return eventEntity
} }
fun map(eventEntity: EventEntity): Event { fun map(eventEntity: EventEntity, castJsonNumbers: Boolean = false): Event {
val ud = eventEntity.unsignedData val ud = eventEntity.unsignedData
?.takeIf { it.isNotBlank() } ?.takeIf { it.isNotBlank() }
?.let { ?.let {
@ -69,8 +69,8 @@ internal object EventMapper {
return Event( return Event(
type = eventEntity.type, type = eventEntity.type,
eventId = eventEntity.eventId, eventId = eventEntity.eventId,
content = ContentMapper.map(eventEntity.content), content = ContentMapper.map(eventEntity.content, castJsonNumbers),
prevContent = ContentMapper.map(eventEntity.prevContent), prevContent = ContentMapper.map(eventEntity.prevContent, castJsonNumbers),
originServerTs = eventEntity.originServerTs, originServerTs = eventEntity.originServerTs,
senderId = eventEntity.sender, senderId = eventEntity.sender,
stateKey = eventEntity.stateKey, stateKey = eventEntity.stateKey,
@ -96,8 +96,8 @@ internal object EventMapper {
} }
} }
internal fun EventEntity.asDomain(): Event { internal fun EventEntity.asDomain(castJsonNumbers: Boolean = false): Event {
return EventMapper.map(this) return EventMapper.map(this, castJsonNumbers)
} }
internal fun Event.toEntity(roomId: String, sendState: SendState, ageLocalTs: Long?): EventEntity { internal fun Event.toEntity(roomId: String, sendState: SendState, ageLocalTs: Long?): EventEntity {

View File

@ -24,7 +24,6 @@ import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.tryThis import org.matrix.android.sdk.api.extensions.tryThis
import org.matrix.android.sdk.api.session.content.ContentAttachmentData import org.matrix.android.sdk.api.session.content.ContentAttachmentData
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toContent import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.events.model.toModel import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
@ -34,9 +33,13 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageImageContent
import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent
import org.matrix.android.sdk.internal.crypto.attachments.MXEncryptedAttachments import org.matrix.android.sdk.internal.crypto.attachments.MXEncryptedAttachments
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.network.ProgressRequestBody import org.matrix.android.sdk.internal.network.ProgressRequestBody
import org.matrix.android.sdk.internal.session.DefaultFileService import org.matrix.android.sdk.internal.session.DefaultFileService
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoIdentifiers
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -61,7 +64,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val events: List<Event>, val localEchoIds: List<LocalEchoIdentifiers>,
val attachment: ContentAttachmentData, val attachment: ContentAttachmentData,
val isEncrypted: Boolean, val isEncrypted: Boolean,
val compressBeforeSending: Boolean, val compressBeforeSending: Boolean,
@ -73,6 +76,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
@Inject lateinit var fileService: DefaultFileService @Inject lateinit var fileService: DefaultFileService
@Inject lateinit var cancelSendTracker: CancelSendTracker @Inject lateinit var cancelSendTracker: CancelSendTracker
@Inject lateinit var imageCompressor: ImageCompressor @Inject lateinit var imageCompressor: ImageCompressor
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
@ -100,7 +104,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
val allCancelled = params.events.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) } val allCancelled = params.localEchoIds.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) }
if (allCancelled) { if (allCancelled) {
// there is no point in uploading the image! // there is no point in uploading the image!
return Result.success(inputData) return Result.success(inputData)
@ -289,46 +293,48 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
) )
} }
private fun handleSuccess(params: Params, private suspend fun handleSuccess(params: Params,
attachmentUrl: String, attachmentUrl: String,
encryptedFileInfo: EncryptedFileInfo?, encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String?, thumbnailUrl: String?,
thumbnailEncryptedFileInfo: EncryptedFileInfo?, thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?): Result { newImageAttributes: NewImageAttributes?): Result {
notifyTracker(params) { contentUploadStateTracker.setSuccess(it) } notifyTracker(params) { contentUploadStateTracker.setSuccess(it) }
params.localEchoIds.forEach {
updateEvent(it.eventId, attachmentUrl, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo, newImageAttributes)
}
val updatedEvents = params.events val sendParams = MultipleEventSendingDispatcherWorker.Params(
.map { sessionId = params.sessionId,
updateEvent(it, attachmentUrl, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo, newImageAttributes) localEchoIds = params.localEchoIds,
} isEncrypted = params.isEncrypted
)
val sendParams = MultipleEventSendingDispatcherWorker.Params(params.sessionId, updatedEvents, params.isEncrypted)
return Result.success(WorkerParamsFactory.toData(sendParams)).also { return Result.success(WorkerParamsFactory.toData(sendParams)).also {
Timber.v("## handleSuccess $attachmentUrl, work is stopped $isStopped") Timber.v("## handleSuccess $attachmentUrl, work is stopped $isStopped")
} }
} }
private fun updateEvent(event: Event, private suspend fun updateEvent(eventId: String,
url: String, url: String,
encryptedFileInfo: EncryptedFileInfo?, encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String? = null, thumbnailUrl: String? = null,
thumbnailEncryptedFileInfo: EncryptedFileInfo?, thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?): Event { newImageAttributes: NewImageAttributes?) {
val messageContent: MessageContent = event.content.toModel() ?: return event localEchoRepository.updateEcho(eventId) { _, event ->
val updatedContent = when (messageContent) { val messageContent: MessageContent? = event.asDomain().content.toModel()
is MessageImageContent -> messageContent.update(url, encryptedFileInfo, newImageAttributes) val updatedContent = when (messageContent) {
is MessageVideoContent -> messageContent.update(url, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo) is MessageImageContent -> messageContent.update(url, encryptedFileInfo, newImageAttributes)
is MessageFileContent -> messageContent.update(url, encryptedFileInfo) is MessageVideoContent -> messageContent.update(url, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo)
is MessageAudioContent -> messageContent.update(url, encryptedFileInfo) is MessageFileContent -> messageContent.update(url, encryptedFileInfo)
else -> messageContent is MessageAudioContent -> messageContent.update(url, encryptedFileInfo)
else -> messageContent
}
event.content = ContentMapper.map(updatedContent.toContent())
} }
return event.copy(content = updatedContent.toContent())
} }
private fun notifyTracker(params: Params, function: (String) -> Unit) { private fun notifyTracker(params: Params, function: (String) -> Unit) {
params.events params.localEchoIds.forEach { function.invoke(it.eventId) }
.mapNotNull { it.eventId }
.forEach { eventId -> function.invoke(eventId) }
} }
private fun MessageImageContent.update(url: String, private fun MessageImageContent.update(url: String,

View File

@ -24,7 +24,6 @@ import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/** /**
@ -44,7 +43,6 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)

View File

@ -23,6 +23,7 @@ import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.task.Task import org.matrix.android.sdk.internal.task.Task
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.internal.util.awaitTransaction
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
@ -39,7 +40,7 @@ internal class DefaultRefreshUserThreePidsTask @Inject constructor(private val p
Timber.d("Get ${accountThreePidsResponse.threePids?.size} threePids") Timber.d("Get ${accountThreePidsResponse.threePids?.size} threePids")
// Store the list in DB // Store the list in DB
monarchy.writeAsync { realm -> monarchy.awaitTransaction { realm ->
realm.where(UserThreePidEntity::class.java).findAll().deleteAllFromRealm() realm.where(UserThreePidEntity::class.java).findAll().deleteAllFromRealm()
accountThreePidsResponse.threePids?.forEach { accountThreePidsResponse.threePids?.forEach {
val entity = UserThreePidEntity( val entity = UserThreePidEntity(

View File

@ -202,13 +202,13 @@ internal class DefaultRelationService @AssistedInject constructor(
private fun createEncryptEventWork(event: Event, keepKeys: List<String>?): OneTimeWorkRequest { private fun createEncryptEventWork(event: Event, keepKeys: List<String>?): OneTimeWorkRequest {
// Same parameter // Same parameter
val params = EncryptEventWorker.Params(sessionId, event, keepKeys) val params = EncryptEventWorker.Params(sessionId, event.eventId!!, keepKeys)
val sendWorkData = WorkerParamsFactory.toData(params) val sendWorkData = WorkerParamsFactory.toData(params)
return timeLineSendEventWorkCommon.createWork<EncryptEventWorker>(sendWorkData, true) return timeLineSendEventWorkCommon.createWork<EncryptEventWorker>(sendWorkData, true)
} }
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event) val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timeLineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain) return timeLineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)
} }

View File

@ -28,6 +28,7 @@ import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent
import org.matrix.android.sdk.api.session.room.model.relation.ReactionInfo import org.matrix.android.sdk.api.session.room.model.relation.ReactionInfo
import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.SendResponse import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -42,18 +43,18 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val roomId: String, val roomId: String,
val event: Event, val eventId: String,
val relationType: String? = null, val relationType: String? = null,
override val lastFailureMessage: String? = null override val lastFailureMessage: String? = null
) : SessionWorkerParams ) : SessionWorkerParams
@Inject lateinit var roomAPI: RoomAPI @Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus @Inject lateinit var eventBus: EventBus
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
@ -64,8 +65,8 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
val localEvent = params.event val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent.eventId == null) { if (localEvent?.eventId == null) {
return Result.failure() return Result.failure()
} }
val relationContent = localEvent.content.toModel<ReactionContent>() val relationContent = localEvent.content.toModel<ReactionContent>()

View File

@ -336,7 +336,7 @@ internal class DefaultSendService @AssistedInject constructor(
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter // Same parameter
return EncryptEventWorker.Params(sessionId, event) return EncryptEventWorker.Params(sessionId, event.eventId ?: "")
.let { WorkerParamsFactory.toData(it) } .let { WorkerParamsFactory.toData(it) }
.let { .let {
workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>() workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -360,7 +360,10 @@ internal class DefaultSendService @AssistedInject constructor(
attachment: ContentAttachmentData, attachment: ContentAttachmentData,
isRoomEncrypted: Boolean, isRoomEncrypted: Boolean,
compressBeforeSending: Boolean): OneTimeWorkRequest { compressBeforeSending: Boolean): OneTimeWorkRequest {
val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, allLocalEchos, attachment, isRoomEncrypted, compressBeforeSending) val localEchoIds = allLocalEchos.map {
LocalEchoIdentifiers(it.roomId!!, it.eventId!!)
}
val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, localEchoIds, attachment, isRoomEncrypted, compressBeforeSending)
val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams) val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<UploadContentWorker>() return workManagerProvider.matrixOneTimeWorkRequestBuilder<UploadContentWorker>()

View File

@ -24,11 +24,13 @@ import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.Failure import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toContent import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.util.awaitCallback import org.matrix.android.sdk.internal.util.awaitCallback
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -46,7 +48,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val event: Event, val eventId: String,
/** Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to) */ /** Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to) */
val keepKeys: List<String>? = null, val keepKeys: List<String>? = null,
override val lastFailureMessage: String? = null override val lastFailureMessage: String? = null
@ -60,20 +62,19 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
Timber.v("Start Encrypt work") Timber.v("Start Encrypt work")
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success() ?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
Timber.v("## SendEvent: Start Encrypt work for event ${params.event.eventId}")
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") } .also { Timber.e("Work cancelled due to input error from parent") }
} }
Timber.v("## SendEvent: Start Encrypt work for event ${params.eventId}")
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
val localEvent = params.event val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent.eventId == null) { if (localEvent?.eventId == null) {
return Result.success() return Result.success()
} }
@ -106,15 +107,10 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
modifiedContent[toKeep] = it modifiedContent[toKeep] = it
} }
} }
val safeResult = result.copy(eventContent = modifiedContent)
val encryptedEvent = localEvent.copy(
type = safeResult.eventType,
content = safeResult.eventContent
)
// Better handling of local echo, to avoid decrypting transition on remote echo // Better handling of local echo, to avoid decrypting transition on remote echo
// Should I only do it for text messages? // Should I only do it for text messages?
if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) { val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) {
val decryptionLocalEcho = MXEventDecryptionResult( MXEventDecryptionResult(
clearEvent = Event( clearEvent = Event(
type = localEvent.type, type = localEvent.type,
content = localEvent.content, content = localEvent.content,
@ -124,10 +120,18 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
senderCurve25519Key = result.eventContent["sender_key"] as? String, senderCurve25519Key = result.eventContent["sender_key"] as? String,
claimedEd25519Key = crypto.getMyDevice().fingerprint() claimedEd25519Key = crypto.getMyDevice().fingerprint()
) )
localEchoRepository.updateEncryptedEcho(localEvent.eventId, safeResult.eventContent, decryptionLocalEcho) } else {
null
}
localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho ->
localEcho.type = EventType.ENCRYPTED
localEcho.content = ContentMapper.map(modifiedContent)
decryptionLocalEcho?.also {
localEcho.setDecryptionResult(it)
}
} }
val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, event = encryptedEvent) val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, eventId = params.eventId)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
} else { } else {
val sendState = when (error) { val sendState = when (error) {
@ -138,7 +142,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
// always return success, or the chain will be stuck for ever! // always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params( val nextWorkerParams = SendEventWorker.Params(
sessionId = params.sessionId, sessionId = params.sessionId,
event = localEvent, eventId = localEvent.eventId,
lastFailureMessage = error?.localizedMessage ?: "Error" lastFailureMessage = error?.localizedMessage ?: "Error"
) )
return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) return Result.success(WorkerParamsFactory.toData(nextWorkerParams))

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send
import com.squareup.moshi.JsonClass
/**
* This is used as a holder to pass necessary data to some workers params.
*/
@JsonClass(generateAdapter = true)
internal data class LocalEchoIdentifiers(val roomId: String, val eventId: String)

View File

@ -18,8 +18,8 @@
package org.matrix.android.sdk.internal.session.room.send package org.matrix.android.sdk.internal.session.room.send
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel import org.matrix.android.sdk.api.session.events.model.toModel
@ -27,11 +27,11 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.model.message.MessageType import org.matrix.android.sdk.api.session.room.model.message.MessageType
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.database.RealmSessionProvider import org.matrix.android.sdk.internal.database.RealmSessionProvider
import org.matrix.android.sdk.internal.database.asyncTransaction
import org.matrix.android.sdk.internal.database.helper.nextId import org.matrix.android.sdk.internal.database.helper.nextId
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.mapper.toEntity import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertEntity import org.matrix.android.sdk.internal.database.model.EventInsertEntity
@ -44,11 +44,13 @@ import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper
import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryUpdater import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryUpdater
import org.matrix.android.sdk.internal.session.room.timeline.DefaultTimeline import org.matrix.android.sdk.internal.session.room.timeline.DefaultTimeline
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.awaitTransaction import org.matrix.android.sdk.internal.util.awaitTransaction
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal class LocalEchoRepository @Inject constructor(@SessionDatabase private val monarchy: Monarchy, internal class LocalEchoRepository @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
private val taskExecutor: TaskExecutor,
private val realmSessionProvider: RealmSessionProvider, private val realmSessionProvider: RealmSessionProvider,
private val roomSummaryUpdater: RoomSummaryUpdater, private val roomSummaryUpdater: RoomSummaryUpdater,
private val eventBus: EventBus, private val eventBus: EventBus,
@ -76,12 +78,12 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
} }
val timelineEvent = timelineEventMapper.map(timelineEventEntity) val timelineEvent = timelineEventMapper.map(timelineEventEntity)
eventBus.post(DefaultTimeline.OnLocalEchoCreated(roomId = roomId, timelineEvent = timelineEvent)) eventBus.post(DefaultTimeline.OnLocalEchoCreated(roomId = roomId, timelineEvent = timelineEvent))
monarchy.writeAsync { realm -> taskExecutor.executorScope.asyncTransaction(monarchy) { realm ->
val eventInsertEntity = EventInsertEntity(event.eventId, event.type).apply { val eventInsertEntity = EventInsertEntity(event.eventId, event.type).apply {
this.insertType = EventInsertType.LOCAL_ECHO this.insertType = EventInsertType.LOCAL_ECHO
} }
realm.insert(eventInsertEntity) realm.insert(eventInsertEntity)
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@writeAsync val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@asyncTransaction
roomEntity.sendingTimelineEvents.add(0, timelineEventEntity) roomEntity.sendingTimelineEvents.add(0, timelineEventEntity)
roomSummaryUpdater.updateSendingInformation(realm, roomId) roomSummaryUpdater.updateSendingInformation(realm, roomId)
} }
@ -89,30 +91,41 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
fun updateSendState(eventId: String, sendState: SendState) { fun updateSendState(eventId: String, sendState: SendState) {
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}") Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}")
monarchy.writeAsync { realm -> updateEchoAsync(eventId) { realm, sendingEventEntity ->
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
roomSummaryUpdater.updateSendingInformation(realm, sendingEventEntity.roomId)
}
}
suspend fun updateEcho(eventId: String, block: (realm: Realm, eventEntity: EventEntity) -> Unit) {
monarchy.awaitTransaction { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst() val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) { if (sendingEventEntity != null) {
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) { block(realm, sendingEventEntity)
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
roomSummaryUpdater.updateSendingInformation(realm, sendingEventEntity.roomId)
} }
} }
} }
fun updateEncryptedEcho(eventId: String, encryptedContent: Content, mxEventDecryptionResult: MXEventDecryptionResult) { fun updateEchoAsync(eventId: String, block: (realm: Realm, eventEntity: EventEntity) -> Unit) {
monarchy.writeAsync { realm -> taskExecutor.executorScope.asyncTransaction(monarchy) { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst() val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) { if (sendingEventEntity != null) {
sendingEventEntity.type = EventType.ENCRYPTED block(realm, sendingEventEntity)
sendingEventEntity.content = ContentMapper.map(encryptedContent)
sendingEventEntity.setDecryptionResult(mxEventDecryptionResult)
} }
} }
} }
suspend fun getUpToDateEcho(eventId: String): Event? {
// We are using awaitTransaction here to make sure this executes after other transactions
return monarchy.awaitTransaction { realm ->
EventEntity.where(realm, eventId).findFirst()?.asDomain(castJsonNumbers = true)
}
}
suspend fun deleteFailedEcho(roomId: String, localEcho: TimelineEvent) { suspend fun deleteFailedEcho(roomId: String, localEcho: TimelineEvent) {
deleteFailedEcho(roomId, localEcho.eventId) deleteFailedEcho(roomId, localEcho.eventId)
} }
@ -150,7 +163,7 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
return getAllEventsWithStates(roomId, SendState.HAS_FAILED_STATES) return getAllEventsWithStates(roomId, SendState.HAS_FAILED_STATES)
} }
fun getAllEventsWithStates(roomId: String, states : List<SendState>): List<TimelineEvent> { fun getAllEventsWithStates(roomId: String, states: List<SendState>): List<TimelineEvent> {
return realmSessionProvider.withRealm { realm -> return realmSessionProvider.withRealm { realm ->
TimelineEventEntity TimelineEventEntity
.findAllInRoomWithSendStates(realm, roomId, states) .findAllInRoomWithSendStates(realm, roomId, states)

View File

@ -23,7 +23,6 @@ import androidx.work.CoroutineWorker
import androidx.work.OneTimeWorkRequest import androidx.work.OneTimeWorkRequest
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.content.UploadContentWorker import org.matrix.android.sdk.internal.session.content.UploadContentWorker
@ -48,7 +47,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
val events: List<Event>, val localEchoIds: List<LocalEchoIdentifiers>,
val isEncrypted: Boolean, val isEncrypted: Boolean,
override val lastFailureMessage: String? = null override val lastFailureMessage: String? = null
) : SessionWorkerParams ) : SessionWorkerParams
@ -61,42 +60,42 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
Timber.v("## SendEvent: Start dispatch sending multiple event work") Timber.v("## SendEvent: Start dispatch sending multiple event work")
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success() ?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
params.events.forEach { event -> params.localEchoIds.forEach { localEchoIds ->
event.eventId?.let { localEchoRepository.updateSendState(it, SendState.UNDELIVERED) } localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED)
} }
// Transmit the error if needed? // Transmit the error if needed?
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("## SendEvent: Work cancelled due to input error from parent ${params.lastFailureMessage}") } .also { Timber.e("## SendEvent: Work cancelled due to input error from parent ${params.lastFailureMessage}") }
} }
// Create a work for every event // Create a work for every event
params.events.forEach { event -> params.localEchoIds.forEach { localEchoIds ->
val roomId = localEchoIds.roomId
val eventId = localEchoIds.eventId
if (params.isEncrypted) { if (params.isEncrypted) {
localEchoRepository.updateSendState(event.eventId ?: "", SendState.ENCRYPTING) localEchoRepository.updateSendState(eventId, SendState.ENCRYPTING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}") Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event $eventId")
val encryptWork = createEncryptEventWork(params.sessionId, event, true) val encryptWork = createEncryptEventWork(params.sessionId, eventId, true)
// Note that event will be replaced by the result of the previous work // Note that event will be replaced by the result of the previous work
val sendWork = createSendEventWork(params.sessionId, event, false) val sendWork = createSendEventWork(params.sessionId, eventId, false)
timelineSendEventWorkCommon.postSequentialWorks(event.roomId!!, encryptWork, sendWork) timelineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, sendWork)
} else { } else {
localEchoRepository.updateSendState(event.eventId ?: "", SendState.SENDING) localEchoRepository.updateSendState(eventId, SendState.SENDING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}") Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event $eventId")
val sendWork = createSendEventWork(params.sessionId, event, true) val sendWork = createSendEventWork(params.sessionId, eventId, true)
timelineSendEventWorkCommon.postWork(event.roomId!!, sendWork) timelineSendEventWorkCommon.postWork(roomId, sendWork)
} }
} }
return Result.success() return Result.success()
} }
private fun createEncryptEventWork(sessionId: String, event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createEncryptEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val params = EncryptEventWorker.Params(sessionId, event) val params = EncryptEventWorker.Params(sessionId, eventId)
val sendWorkData = WorkerParamsFactory.toData(params) val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>() return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -107,8 +106,8 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
.build() .build()
} }
private fun createSendEventWork(sessionId: String, event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createSendEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event) val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = eventId)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain) return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)

View File

@ -52,7 +52,6 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error

View File

@ -56,7 +56,7 @@ internal class RoomEventSender @Inject constructor(
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter // Same parameter
val params = EncryptEventWorker.Params(sessionId, event) val params = EncryptEventWorker.Params(sessionId, event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(params) val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>() return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -68,7 +68,7 @@ internal class RoomEventSender @Inject constructor(
} }
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event) val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain) return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)

View File

@ -21,11 +21,12 @@ import android.content.Context
import androidx.work.CoroutineWorker import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import io.realm.RealmConfiguration
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.failure.shouldBeRetried import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Content import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.SessionWorkerParams
@ -48,29 +49,25 @@ internal class SendEventWorker(context: Context,
internal data class Params( internal data class Params(
override val sessionId: String, override val sessionId: String,
override val lastFailureMessage: String? = null, override val lastFailureMessage: String? = null,
val event: Event? = null, val eventId: String
// Keep for compat at the moment, will be removed later
val eventId: String? = null
) : SessionWorkerParams ) : SessionWorkerParams
@Inject lateinit var localEchoRepository: LocalEchoRepository @Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var roomAPI: RoomAPI @Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus @Inject lateinit var eventBus: EventBus
@Inject lateinit var cancelSendTracker: CancelSendTracker @Inject lateinit var cancelSendTracker: CancelSendTracker
@SessionDatabase @Inject lateinit var realmConfiguration: RealmConfiguration
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success() ?: return Result.success()
.also { Timber.e("## SendEvent: Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
val event = params.event val event = localEchoRepository.getUpToDateEcho(params.eventId)
if (event?.eventId == null || event.roomId == null) { if (event?.eventId == null || event.roomId == null) {
// Old way of sending localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
if (params.eventId != null) {
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
}
return Result.success() return Result.success()
.also { Timber.e("Work cancelled due to bad input data") } .also { Timber.e("Work cancelled due to bad input data") }
} }

View File

@ -18,9 +18,9 @@
package org.matrix.android.sdk.internal.util package org.matrix.android.sdk.internal.util
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.internal.database.awaitTransaction
import io.realm.Realm import io.realm.Realm
import io.realm.RealmModel import io.realm.RealmModel
import org.matrix.android.sdk.internal.database.awaitTransaction
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
internal suspend fun <T> Monarchy.awaitTransaction(transaction: suspend (realm: Realm) -> T): T { internal suspend fun <T> Monarchy.awaitTransaction(transaction: suspend (realm: Realm) -> T): T {

View File

@ -18,6 +18,7 @@
package org.matrix.android.sdk.internal.worker package org.matrix.android.sdk.internal.worker
import androidx.work.Data import androidx.work.Data
import org.matrix.android.sdk.api.extensions.tryThis
import org.matrix.android.sdk.internal.di.MoshiProvider import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.network.parsing.CheckNumberType import org.matrix.android.sdk.internal.network.parsing.CheckNumberType
@ -41,7 +42,7 @@ internal object WorkerParamsFactory {
return Data.Builder().putString(KEY, json).build() return Data.Builder().putString(KEY, json).build()
} }
inline fun <reified T> fromData(data: Data): T? { inline fun <reified T> fromData(data: Data): T? = tryThis("Unable to parse work parameters") {
val json = data.getString(KEY) val json = data.getString(KEY)
return if (json == null) { return if (json == null) {
null null