Rebase post matrix sdk package renaming
This commit is contained in:
parent
cc57a73f23
commit
bfcbb9ff4f
|
@ -29,6 +29,8 @@ import org.junit.runners.MethodSorters
|
|||
import org.matrix.android.sdk.internal.crypto.attachments.MXEncryptedAttachments
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo
|
||||
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileKey
|
||||
import org.matrix.android.sdk.internal.crypto.attachments.toElementToDecrypt
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
|
||||
/**
|
||||
|
@ -52,7 +54,7 @@ class AttachmentEncryptionTest {
|
|||
memoryFile.inputStream
|
||||
}
|
||||
|
||||
val decryptedStream = MXEncryptedAttachments.decryptAttachment(inputStream, encryptedFileInfo)
|
||||
val decryptedStream = MXEncryptedAttachments.decryptAttachment(inputStream, encryptedFileInfo.toElementToDecrypt()!!)
|
||||
|
||||
assertNotNull(decryptedStream)
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ interface ContentUploadStateTracker {
|
|||
object Idle : State()
|
||||
object EncryptingThumbnail : State()
|
||||
data class UploadingThumbnail(val current: Long, val total: Long) : State()
|
||||
object Encrypting : State()
|
||||
data class Encrypting(val current: Long, val total: Long) : State()
|
||||
data class Uploading(val current: Long, val total: Long) : State()
|
||||
object Success : State()
|
||||
data class Failure(val throwable: Throwable) : State()
|
||||
|
|
|
@ -239,6 +239,14 @@ fun Event.isVideoMessage(): Boolean {
|
|||
}
|
||||
}
|
||||
|
||||
fun Event.isAudioMessage(): Boolean {
|
||||
return getClearType() == EventType.MESSAGE
|
||||
&& when (getClearContent()?.toModel<MessageContent>()?.msgType) {
|
||||
MessageType.MSGTYPE_AUDIO -> true
|
||||
else -> false
|
||||
}
|
||||
}
|
||||
|
||||
fun Event.isFileMessage(): Boolean {
|
||||
return getClearType() == EventType.MESSAGE
|
||||
&& when (getClearContent()?.toModel<MessageContent>()?.msgType) {
|
||||
|
@ -246,6 +254,16 @@ fun Event.isFileMessage(): Boolean {
|
|||
else -> false
|
||||
}
|
||||
}
|
||||
fun Event.isAttachmentMessage(): Boolean {
|
||||
return getClearType() == EventType.MESSAGE
|
||||
&& when (getClearContent()?.toModel<MessageContent>()?.msgType) {
|
||||
MessageType.MSGTYPE_IMAGE,
|
||||
MessageType.MSGTYPE_AUDIO,
|
||||
MessageType.MSGTYPE_VIDEO,
|
||||
MessageType.MSGTYPE_FILE -> true
|
||||
else -> false
|
||||
}
|
||||
}
|
||||
|
||||
fun Event.getRelationContent(): RelationDefaultContent? {
|
||||
return if (isEncrypted()) {
|
||||
|
|
|
@ -126,6 +126,8 @@ interface SendService {
|
|||
|
||||
fun clearSendingQueue()
|
||||
|
||||
fun cancelSend(eventId: String)
|
||||
|
||||
/**
|
||||
* Resend all failed messages one by one (and keep order)
|
||||
*/
|
||||
|
|
|
@ -37,7 +37,8 @@ enum class SendState {
|
|||
internal companion object {
|
||||
val HAS_FAILED_STATES = listOf(UNDELIVERED, FAILED_UNKNOWN_DEVICES)
|
||||
val IS_SENT_STATES = listOf(SENT, SYNCED)
|
||||
val IS_SENDING_STATES = listOf(UNSENT, ENCRYPTING, SENDING)
|
||||
val IS_PROGRESSING_STATES = listOf(ENCRYPTING, SENDING)
|
||||
val IS_SENDING_STATES = IS_PROGRESSING_STATES + UNSENT
|
||||
val PENDING_STATES = IS_SENDING_STATES + HAS_FAILED_STATES
|
||||
}
|
||||
|
||||
|
@ -45,5 +46,7 @@ enum class SendState {
|
|||
|
||||
fun hasFailed() = HAS_FAILED_STATES.contains(this)
|
||||
|
||||
fun isInProgress() = IS_PROGRESSING_STATES.contains(this)
|
||||
|
||||
fun isSending() = IS_SENDING_STATES.contains(this)
|
||||
}
|
||||
|
|
|
@ -22,10 +22,13 @@ import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo
|
|||
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileKey
|
||||
import timber.log.Timber
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.security.MessageDigest
|
||||
import java.security.SecureRandom
|
||||
import javax.crypto.Cipher
|
||||
import javax.crypto.CipherInputStream
|
||||
import javax.crypto.spec.IvParameterSpec
|
||||
import javax.crypto.spec.SecretKeySpec
|
||||
|
||||
|
@ -35,8 +38,121 @@ internal object MXEncryptedAttachments {
|
|||
private const val SECRET_KEY_SPEC_ALGORITHM = "AES"
|
||||
private const val MESSAGE_DIGEST_ALGORITHM = "SHA-256"
|
||||
|
||||
fun encrypt(clearStream: InputStream, mimetype: String?, outputFile: File, progress: ((current: Int, total: Int) -> Unit)): EncryptedFileInfo {
|
||||
val t0 = System.currentTimeMillis()
|
||||
val secureRandom = SecureRandom()
|
||||
val initVectorBytes = ByteArray(16) { 0.toByte() }
|
||||
|
||||
val ivRandomPart = ByteArray(8)
|
||||
secureRandom.nextBytes(ivRandomPart)
|
||||
|
||||
System.arraycopy(ivRandomPart, 0, initVectorBytes, 0, ivRandomPart.size)
|
||||
|
||||
val key = ByteArray(32)
|
||||
secureRandom.nextBytes(key)
|
||||
|
||||
val messageDigest = MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM)
|
||||
|
||||
outputFile.outputStream().use { outputStream ->
|
||||
val encryptCipher = Cipher.getInstance(CIPHER_ALGORITHM)
|
||||
val secretKeySpec = SecretKeySpec(key, SECRET_KEY_SPEC_ALGORITHM)
|
||||
val ivParameterSpec = IvParameterSpec(initVectorBytes)
|
||||
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, ivParameterSpec)
|
||||
|
||||
val data = ByteArray(CRYPTO_BUFFER_SIZE)
|
||||
var read: Int
|
||||
var encodedBytes: ByteArray
|
||||
clearStream.use { inputStream ->
|
||||
val estimatedSize = inputStream.available()
|
||||
progress.invoke(0, estimatedSize)
|
||||
read = inputStream.read(data)
|
||||
var totalRead = read
|
||||
while (read != -1) {
|
||||
progress.invoke(totalRead, estimatedSize)
|
||||
encodedBytes = encryptCipher.update(data, 0, read)
|
||||
messageDigest.update(encodedBytes, 0, encodedBytes.size)
|
||||
outputStream.write(encodedBytes)
|
||||
read = inputStream.read(data)
|
||||
totalRead += read
|
||||
}
|
||||
}
|
||||
|
||||
// encrypt the latest chunk
|
||||
encodedBytes = encryptCipher.doFinal()
|
||||
messageDigest.update(encodedBytes, 0, encodedBytes.size)
|
||||
outputStream.write(encodedBytes)
|
||||
}
|
||||
|
||||
return EncryptedFileInfo(
|
||||
url = null,
|
||||
mimetype = mimetype,
|
||||
key = EncryptedFileKey(
|
||||
alg = "A256CTR",
|
||||
ext = true,
|
||||
key_ops = listOf("encrypt", "decrypt"),
|
||||
kty = "oct",
|
||||
k = base64ToBase64Url(Base64.encodeToString(key, Base64.DEFAULT))
|
||||
),
|
||||
iv = Base64.encodeToString(initVectorBytes, Base64.DEFAULT).replace("\n", "").replace("=", ""),
|
||||
hashes = mapOf("sha256" to base64ToUnpaddedBase64(Base64.encodeToString(messageDigest.digest(), Base64.DEFAULT))),
|
||||
v = "v2"
|
||||
)
|
||||
.also { Timber.v("Encrypt in ${System.currentTimeMillis() - t0}ms") }
|
||||
}
|
||||
|
||||
// fun cipherInputStream(attachmentStream: InputStream, mimetype: String?): Pair<DigestInputStream, EncryptedFileInfo> {
|
||||
// val secureRandom = SecureRandom()
|
||||
//
|
||||
// // generate a random iv key
|
||||
// // Half of the IV is random, the lower order bits are zeroed
|
||||
// // such that the counter never wraps.
|
||||
// // See https://github.com/matrix-org/matrix-ios-kit/blob/3dc0d8e46b4deb6669ed44f72ad79be56471354c/MatrixKit/Models/Room/MXEncryptedAttachments.m#L75
|
||||
// val initVectorBytes = ByteArray(16) { 0.toByte() }
|
||||
//
|
||||
// val ivRandomPart = ByteArray(8)
|
||||
// secureRandom.nextBytes(ivRandomPart)
|
||||
//
|
||||
// System.arraycopy(ivRandomPart, 0, initVectorBytes, 0, ivRandomPart.size)
|
||||
//
|
||||
// val key = ByteArray(32)
|
||||
// secureRandom.nextBytes(key)
|
||||
//
|
||||
// val encryptCipher = Cipher.getInstance(CIPHER_ALGORITHM)
|
||||
// val secretKeySpec = SecretKeySpec(key, SECRET_KEY_SPEC_ALGORITHM)
|
||||
// val ivParameterSpec = IvParameterSpec(initVectorBytes)
|
||||
// encryptCipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, ivParameterSpec)
|
||||
//
|
||||
// val cipherInputStream = CipherInputStream(attachmentStream, encryptCipher)
|
||||
//
|
||||
// // Could it be possible to get the digest on the fly instead of
|
||||
// val info = EncryptedFileInfo(
|
||||
// url = null,
|
||||
// mimetype = mimetype,
|
||||
// key = EncryptedFileKey(
|
||||
// alg = "A256CTR",
|
||||
// ext = true,
|
||||
// key_ops = listOf("encrypt", "decrypt"),
|
||||
// kty = "oct",
|
||||
// k = base64ToBase64Url(Base64.encodeToString(key, Base64.DEFAULT))
|
||||
// ),
|
||||
// iv = Base64.encodeToString(initVectorBytes, Base64.DEFAULT).replace("\n", "").replace("=", ""),
|
||||
// //hashes = mapOf("sha256" to base64ToUnpaddedBase64(Base64.encodeToString(messageDigest.digest(), Base64.DEFAULT))),
|
||||
// v = "v2"
|
||||
// )
|
||||
//
|
||||
// val messageDigest = MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM)
|
||||
// return DigestInputStream(cipherInputStream, messageDigest) to info
|
||||
// }
|
||||
//
|
||||
// fun updateInfoWithDigest(digestInputStream: DigestInputStream, info: EncryptedFileInfo): EncryptedFileInfo {
|
||||
// return info.copy(
|
||||
// hashes = mapOf("sha256" to base64ToUnpaddedBase64(Base64.encodeToString(digestInputStream.messageDigest.digest(), Base64.DEFAULT)))
|
||||
// )
|
||||
// }
|
||||
|
||||
/***
|
||||
* Encrypt an attachment stream.
|
||||
* DO NOT USE for big files, it will load all in memory
|
||||
* @param attachmentStream the attachment stream. Will be closed after this method call.
|
||||
* @param mimetype the mime type
|
||||
* @return the encryption file info
|
||||
|
@ -59,14 +175,14 @@ internal object MXEncryptedAttachments {
|
|||
val key = ByteArray(32)
|
||||
secureRandom.nextBytes(key)
|
||||
|
||||
ByteArrayOutputStream().use { outputStream ->
|
||||
val messageDigest = MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM)
|
||||
val byteArrayOutputStream = ByteArrayOutputStream()
|
||||
byteArrayOutputStream.use { outputStream ->
|
||||
val encryptCipher = Cipher.getInstance(CIPHER_ALGORITHM)
|
||||
val secretKeySpec = SecretKeySpec(key, SECRET_KEY_SPEC_ALGORITHM)
|
||||
val ivParameterSpec = IvParameterSpec(initVectorBytes)
|
||||
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, ivParameterSpec)
|
||||
|
||||
val messageDigest = MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM)
|
||||
|
||||
val data = ByteArray(CRYPTO_BUFFER_SIZE)
|
||||
var read: Int
|
||||
var encodedBytes: ByteArray
|
||||
|
@ -85,6 +201,7 @@ internal object MXEncryptedAttachments {
|
|||
encodedBytes = encryptCipher.doFinal()
|
||||
messageDigest.update(encodedBytes, 0, encodedBytes.size)
|
||||
outputStream.write(encodedBytes)
|
||||
}
|
||||
|
||||
return EncryptionResult(
|
||||
encryptedFileInfo = EncryptedFileInfo(
|
||||
|
@ -93,7 +210,7 @@ internal object MXEncryptedAttachments {
|
|||
key = EncryptedFileKey(
|
||||
alg = "A256CTR",
|
||||
ext = true,
|
||||
keyOps = listOf("encrypt", "decrypt"),
|
||||
key_ops = listOf("encrypt", "decrypt"),
|
||||
kty = "oct",
|
||||
k = base64ToBase64Url(Base64.encodeToString(key, Base64.DEFAULT))
|
||||
),
|
||||
|
@ -101,11 +218,10 @@ internal object MXEncryptedAttachments {
|
|||
hashes = mapOf("sha256" to base64ToUnpaddedBase64(Base64.encodeToString(messageDigest.digest(), Base64.DEFAULT))),
|
||||
v = "v2"
|
||||
),
|
||||
encryptedByteArray = outputStream.toByteArray()
|
||||
encryptedByteArray = byteArrayOutputStream.toByteArray()
|
||||
)
|
||||
.also { Timber.v("Encrypt in ${System.currentTimeMillis() - t0}ms") }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypt an attachment
|
||||
|
@ -114,15 +230,23 @@ internal object MXEncryptedAttachments {
|
|||
* @param encryptedFileInfo the encryption file info
|
||||
* @return the decrypted attachment stream
|
||||
*/
|
||||
fun decryptAttachment(attachmentStream: InputStream?, encryptedFileInfo: EncryptedFileInfo?): InputStream? {
|
||||
if (encryptedFileInfo?.isValid() != true) {
|
||||
Timber.e("## decryptAttachment() : some fields are not defined, or invalid key fields")
|
||||
fun decryptAttachment(attachmentStream: InputStream?, elementToDecrypt: ElementToDecrypt): InputStream? {
|
||||
try {
|
||||
val digestCheckInputStream = MatrixDigestCheckInputStream(attachmentStream, elementToDecrypt.sha256)
|
||||
|
||||
val key = Base64.decode(base64UrlToBase64(elementToDecrypt.k), Base64.DEFAULT)
|
||||
val initVectorBytes = Base64.decode(elementToDecrypt.iv, Base64.DEFAULT)
|
||||
|
||||
val decryptCipher = Cipher.getInstance(CIPHER_ALGORITHM)
|
||||
val secretKeySpec = SecretKeySpec(key, SECRET_KEY_SPEC_ALGORITHM)
|
||||
val ivParameterSpec = IvParameterSpec(initVectorBytes)
|
||||
decryptCipher.init(Cipher.DECRYPT_MODE, secretKeySpec, ivParameterSpec)
|
||||
|
||||
return CipherInputStream(digestCheckInputStream, decryptCipher)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.e(failure, "## decryptAttachment() : failed to create stream")
|
||||
return null
|
||||
}
|
||||
|
||||
val elementToDecrypt = encryptedFileInfo.toElementToDecrypt()
|
||||
|
||||
return decryptAttachment(attachmentStream, elementToDecrypt)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -132,16 +256,15 @@ internal object MXEncryptedAttachments {
|
|||
* @param elementToDecrypt the elementToDecrypt info
|
||||
* @return the decrypted attachment stream
|
||||
*/
|
||||
fun decryptAttachment(attachmentStream: InputStream?, elementToDecrypt: ElementToDecrypt?): InputStream? {
|
||||
fun decryptAttachment(attachmentStream: InputStream?, elementToDecrypt: ElementToDecrypt?, outputStream: OutputStream): Boolean {
|
||||
// sanity checks
|
||||
if (null == attachmentStream || elementToDecrypt == null) {
|
||||
Timber.e("## decryptAttachment() : null stream")
|
||||
return null
|
||||
return false
|
||||
}
|
||||
|
||||
val t0 = System.currentTimeMillis()
|
||||
|
||||
ByteArrayOutputStream().use { outputStream ->
|
||||
try {
|
||||
val key = Base64.decode(base64UrlToBase64(elementToDecrypt.k), Base64.DEFAULT)
|
||||
val initVectorBytes = Base64.decode(elementToDecrypt.iv, Base64.DEFAULT)
|
||||
|
@ -175,19 +298,17 @@ internal object MXEncryptedAttachments {
|
|||
|
||||
if (elementToDecrypt.sha256 != currentDigestValue) {
|
||||
Timber.e("## decryptAttachment() : Digest value mismatch")
|
||||
return null
|
||||
return false
|
||||
}
|
||||
|
||||
return outputStream.toByteArray().inputStream()
|
||||
.also { Timber.v("Decrypt in ${System.currentTimeMillis() - t0}ms") }
|
||||
return true.also { Timber.v("Decrypt in ${System.currentTimeMillis() - t0}ms") }
|
||||
} catch (oom: OutOfMemoryError) {
|
||||
Timber.e(oom, "## decryptAttachment() failed: OOM")
|
||||
} catch (e: Exception) {
|
||||
Timber.e(e, "## decryptAttachment() failed")
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -206,7 +327,7 @@ internal object MXEncryptedAttachments {
|
|||
.replace("=", "")
|
||||
}
|
||||
|
||||
private fun base64ToUnpaddedBase64(base64: String): String {
|
||||
internal fun base64ToUnpaddedBase64(base64: String): String {
|
||||
return base64.replace("\n".toRegex(), "")
|
||||
.replace("=", "")
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.crypto.attachments
|
||||
package org.matrix.android.sdk.internal.crypto.attachments
|
||||
|
||||
import android.util.Base64
|
||||
import java.io.FilterInputStream
|
|
@ -35,6 +35,10 @@ internal class ProgressRequestBody(private val delegate: RequestBody,
|
|||
return delegate.contentType()
|
||||
}
|
||||
|
||||
override fun isOneShot() = delegate.isOneShot()
|
||||
|
||||
override fun isDuplex() = delegate.isDuplex()
|
||||
|
||||
override fun contentLength(): Long {
|
||||
try {
|
||||
return delegate.contentLength()
|
||||
|
|
|
@ -143,20 +143,22 @@ internal class DefaultFileService @Inject constructor(
|
|||
Timber.v("Response size ${response.body?.contentLength()} - Stream available: ${!source.exhausted()}")
|
||||
|
||||
if (elementToDecrypt != null) {
|
||||
Timber.v("## decrypt file")
|
||||
val decryptedStream = MXEncryptedAttachments.decryptAttachment(source.inputStream(), elementToDecrypt)
|
||||
Timber.v("## FileService: decrypt file")
|
||||
val decryptSuccess = MXEncryptedAttachments.decryptAttachment(
|
||||
source.inputStream(),
|
||||
elementToDecrypt,
|
||||
destFile.outputStream().buffered()
|
||||
)
|
||||
response.close()
|
||||
if (decryptedStream == null) {
|
||||
if (!decryptSuccess) {
|
||||
return@flatMap Try.Failure(IllegalStateException("Decryption error"))
|
||||
} else {
|
||||
decryptedStream.use {
|
||||
writeToFile(decryptedStream, destFile)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
writeToFile(source.inputStream(), destFile)
|
||||
response.close()
|
||||
}
|
||||
} else {
|
||||
Timber.v("## FileService: cache hit for $url")
|
||||
}
|
||||
|
||||
Try.just(copyFile(destFile, downloadMode))
|
||||
|
|
|
@ -74,8 +74,8 @@ internal class DefaultContentUploadStateTracker @Inject constructor() : ContentU
|
|||
updateState(key, progressData)
|
||||
}
|
||||
|
||||
internal fun setEncrypting(key: String) {
|
||||
val progressData = ContentUploadStateTracker.State.Encrypting
|
||||
internal fun setEncrypting(key: String, current: Long, total: Long) {
|
||||
val progressData = ContentUploadStateTracker.State.Encrypting(current, total)
|
||||
updateState(key, progressData)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,12 +23,14 @@ import com.squareup.moshi.Moshi
|
|||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
|
||||
import okhttp3.MediaType
|
||||
import okhttp3.MediaType.Companion.toMediaTypeOrNull
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.RequestBody
|
||||
import okhttp3.RequestBody.Companion.asRequestBody
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
import okio.BufferedSink
|
||||
import okio.source
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import org.matrix.android.sdk.api.session.content.ContentUrlResolver
|
||||
import org.matrix.android.sdk.internal.di.Authenticated
|
||||
|
@ -38,6 +40,7 @@ import org.matrix.android.sdk.internal.network.toFailure
|
|||
import java.io.File
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.IOException
|
||||
import java.io.InputStream
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class FileUploader @Inject constructor(@Authenticated
|
||||
|
@ -54,7 +57,21 @@ internal class FileUploader @Inject constructor(@Authenticated
|
|||
filename: String?,
|
||||
mimeType: String?,
|
||||
progressListener: ProgressRequestBody.Listener? = null): ContentUploadResponse {
|
||||
val uploadBody = file.asRequestBody(mimeType?.toMediaTypeOrNull())
|
||||
val uploadBody = object : RequestBody() {
|
||||
override fun contentLength() = file.length()
|
||||
|
||||
// Disable okhttp auto resend for 'large files'
|
||||
override fun isOneShot() = contentLength() == 0L || contentLength() >= 1_000_000
|
||||
|
||||
override fun contentType(): MediaType? {
|
||||
return mimeType?.toMediaTypeOrNull()
|
||||
}
|
||||
|
||||
override fun writeTo(sink: BufferedSink) {
|
||||
file.source().use { sink.writeAll(it) }
|
||||
}
|
||||
}
|
||||
|
||||
return upload(uploadBody, filename, progressListener)
|
||||
}
|
||||
|
||||
|
@ -66,6 +83,28 @@ internal class FileUploader @Inject constructor(@Authenticated
|
|||
return upload(uploadBody, filename, progressListener)
|
||||
}
|
||||
|
||||
suspend fun uploadInputStream(inputStream: InputStream,
|
||||
filename: String?,
|
||||
mimeType: String?,
|
||||
progressListener: ProgressRequestBody.Listener? = null): ContentUploadResponse {
|
||||
val length = inputStream.available().toLong()
|
||||
val uploadBody = object : RequestBody() {
|
||||
override fun contentLength() = length
|
||||
|
||||
// Disable okhttp auto resend for 'large files'
|
||||
override fun isOneShot() = contentLength() == 0L || contentLength() >= 1_000_000
|
||||
|
||||
override fun contentType(): MediaType? {
|
||||
return mimeType?.toMediaTypeOrNull()
|
||||
}
|
||||
|
||||
override fun writeTo(sink: BufferedSink) {
|
||||
inputStream.source().use { sink.writeAll(it) }
|
||||
}
|
||||
}
|
||||
return upload(uploadBody, filename, progressListener)
|
||||
}
|
||||
|
||||
suspend fun uploadFromUri(uri: Uri,
|
||||
filename: String?,
|
||||
mimeType: String?,
|
||||
|
@ -73,10 +112,7 @@ internal class FileUploader @Inject constructor(@Authenticated
|
|||
return withContext(Dispatchers.IO) {
|
||||
val inputStream = context.contentResolver.openInputStream(uri) ?: throw FileNotFoundException()
|
||||
|
||||
inputStream.use {
|
||||
uploadByteArray(it.readBytes(), filename, mimeType, progressListener)
|
||||
}
|
||||
}
|
||||
return uploadInputStream(inputStream, filename, mimeType, progressListener)
|
||||
}
|
||||
private suspend fun upload(uploadBody: RequestBody, filename: String?, progressListener: ProgressRequestBody.Listener?): ContentUploadResponse {
|
||||
val urlBuilder = uploadUrl.toHttpUrlOrNull()?.newBuilder() ?: throw RuntimeException()
|
||||
|
|
|
@ -24,6 +24,7 @@ import androidx.work.WorkerParameters
|
|||
import com.squareup.moshi.JsonClass
|
||||
import id.zelory.compressor.Compressor
|
||||
import id.zelory.compressor.constraint.default
|
||||
import org.matrix.android.sdk.api.extensions.tryThis
|
||||
import org.matrix.android.sdk.api.session.content.ContentAttachmentData
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.events.model.toContent
|
||||
|
@ -37,12 +38,15 @@ import org.matrix.android.sdk.internal.crypto.attachments.MXEncryptedAttachments
|
|||
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo
|
||||
import org.matrix.android.sdk.internal.network.ProgressRequestBody
|
||||
import org.matrix.android.sdk.internal.session.DefaultFileService
|
||||
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
|
||||
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
|
||||
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
|
||||
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
|
||||
import org.matrix.android.sdk.internal.worker.getSessionComponent
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.io.InputStream
|
||||
import java.util.UUID
|
||||
import javax.inject.Inject
|
||||
|
||||
|
@ -71,6 +75,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
@Inject lateinit var fileUploader: FileUploader
|
||||
@Inject lateinit var contentUploadStateTracker: DefaultContentUploadStateTracker
|
||||
@Inject lateinit var fileService: DefaultFileService
|
||||
@Inject lateinit var cancelSendTracker: CancelSendTracker
|
||||
|
||||
override suspend fun doWork(): Result {
|
||||
val params = WorkerParamsFactory.fromData<Params>(inputData)
|
||||
|
@ -102,6 +107,13 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
|
||||
var newImageAttributes: NewImageAttributes? = null
|
||||
|
||||
val allCancelled = params.events.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) }
|
||||
if (allCancelled) {
|
||||
// there is no point in uploading the image!
|
||||
return Result.success(inputData)
|
||||
.also { Timber.e("## Send: Work cancelled by user") }
|
||||
}
|
||||
|
||||
try {
|
||||
val inputStream = context.contentResolver.openInputStream(attachment.queryUri)
|
||||
?: return Result.success(
|
||||
|
@ -112,7 +124,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
)
|
||||
)
|
||||
|
||||
inputStream.use {
|
||||
// inputStream.use {
|
||||
var uploadedThumbnailUrl: String? = null
|
||||
var uploadedThumbnailEncryptedFileInfo: EncryptedFileInfo? = null
|
||||
|
||||
|
@ -161,6 +173,9 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
var uploadedFileEncryptedFileInfo: EncryptedFileInfo? = null
|
||||
|
||||
return try {
|
||||
var modifiedStream: InputStream
|
||||
|
||||
if (attachment.type == ContentAttachmentData.Type.IMAGE && params.compressBeforeSending) {
|
||||
// Compressor library works with File instead of Uri for now. Since Scoped Storage doesn't allow us to access files directly, we should
|
||||
// copy it to a cache folder by using InputStream and OutputStream.
|
||||
// https://github.com/zetbaitsu/Compressor/pull/150
|
||||
|
@ -178,13 +193,13 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
inputStream.copyTo(outputStream)
|
||||
}
|
||||
|
||||
if (attachment.type == ContentAttachmentData.Type.IMAGE && params.compressBeforeSending) {
|
||||
cacheFile = Compressor.compress(context, cacheFile) {
|
||||
val compressedFile = Compressor.compress(context, cacheFile) {
|
||||
default(
|
||||
width = MAX_IMAGE_SIZE,
|
||||
height = MAX_IMAGE_SIZE
|
||||
)
|
||||
}.also { compressedFile ->
|
||||
}
|
||||
|
||||
val options = BitmapFactory.Options().apply { inJustDecodeBounds = true }
|
||||
BitmapFactory.decodeFile(compressedFile.absolutePath, options)
|
||||
val fileSize = compressedFile.length().toInt()
|
||||
|
@ -193,29 +208,57 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
options.outHeight,
|
||||
fileSize
|
||||
)
|
||||
modifiedStream = compressedFile.inputStream()
|
||||
} else {
|
||||
// Unfortunatly the original stream is not always able to provide content length
|
||||
// by passing by a temp copy it's working (better experience for upload progress..)
|
||||
modifiedStream = if (tryThis { inputStream.available() } ?: 0 <= 0) {
|
||||
val tmp = File.createTempFile(UUID.randomUUID().toString(), null, context.cacheDir)
|
||||
tmp.outputStream().use {
|
||||
inputStream.copyTo(it)
|
||||
}
|
||||
tmp.inputStream()
|
||||
} else inputStream
|
||||
}
|
||||
|
||||
val contentUploadResponse = if (params.isEncrypted) {
|
||||
Timber.v("Encrypt file")
|
||||
notifyTracker(params) { contentUploadStateTracker.setEncrypting(it) }
|
||||
Timber.v("## FileService: Encrypt file")
|
||||
|
||||
val encryptionResult = MXEncryptedAttachments.encryptAttachment(cacheFile.inputStream(), attachment.getSafeMimeType())
|
||||
uploadedFileEncryptedFileInfo = encryptionResult.encryptedFileInfo
|
||||
val tmpEncrypted = File.createTempFile(UUID.randomUUID().toString(), null, context.cacheDir)
|
||||
|
||||
uploadedFileEncryptedFileInfo =
|
||||
MXEncryptedAttachments.encrypt(modifiedStream, attachment.getSafeMimeType(), tmpEncrypted) { read, total ->
|
||||
notifyTracker(params) {
|
||||
contentUploadStateTracker.setEncrypting(it, read.toLong(), total.toLong())
|
||||
}
|
||||
}
|
||||
|
||||
Timber.v("## FileService: Uploading file")
|
||||
|
||||
fileUploader
|
||||
.uploadByteArray(encryptionResult.encryptedByteArray, attachment.name, "application/octet-stream", progressListener)
|
||||
.uploadFile(tmpEncrypted, attachment.name, "application/octet-stream", progressListener)
|
||||
.also {
|
||||
// we can delete?
|
||||
tryThis { tmpEncrypted.delete() }
|
||||
}
|
||||
} else {
|
||||
Timber.v("## FileService: Clear file")
|
||||
fileUploader
|
||||
.uploadFile(cacheFile, attachment.name, attachment.getSafeMimeType(), progressListener)
|
||||
.uploadInputStream(modifiedStream, attachment.name, attachment.getSafeMimeType(), progressListener)
|
||||
}
|
||||
|
||||
// If it's a file update the file service so that it does not redownload?
|
||||
if (params.attachment.type == ContentAttachmentData.Type.FILE) {
|
||||
// if (params.attachment.type == ContentAttachmentData.Type.FILE) {
|
||||
Timber.v("## FileService: Update cache storage for ${contentUploadResponse.contentUri}")
|
||||
try {
|
||||
context.contentResolver.openInputStream(attachment.queryUri)?.let {
|
||||
fileService.storeDataFor(contentUploadResponse.contentUri, params.attachment.getSafeMimeType(), it)
|
||||
}
|
||||
Timber.v("## FileService: cache storage updated")
|
||||
} catch (failure: Throwable) {
|
||||
Timber.e(failure, "## FileService: Failed to update fileservice cache")
|
||||
}
|
||||
// }
|
||||
|
||||
handleSuccess(params,
|
||||
contentUploadResponse.contentUri,
|
||||
|
@ -224,12 +267,12 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
uploadedThumbnailEncryptedFileInfo,
|
||||
newImageAttributes)
|
||||
} catch (t: Throwable) {
|
||||
Timber.e(t)
|
||||
Timber.e(t, "## FileService: ERROR ${t.localizedMessage}")
|
||||
handleFailure(params, t)
|
||||
}
|
||||
}
|
||||
// }
|
||||
} catch (e: Exception) {
|
||||
Timber.e(e)
|
||||
Timber.e(e, "## FileService: ERROR")
|
||||
notifyTracker(params) { contentUploadStateTracker.setFailure(it, e) }
|
||||
return Result.success(
|
||||
WorkerParamsFactory.toData(
|
||||
|
@ -259,7 +302,6 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
thumbnailUrl: String?,
|
||||
thumbnailEncryptedFileInfo: EncryptedFileInfo?,
|
||||
newImageAttributes: NewImageAttributes?): Result {
|
||||
Timber.v("handleSuccess $attachmentUrl, work is stopped $isStopped")
|
||||
notifyTracker(params) { contentUploadStateTracker.setSuccess(it) }
|
||||
|
||||
val updatedEvents = params.events
|
||||
|
@ -268,7 +310,9 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
|
|||
}
|
||||
|
||||
val sendParams = MultipleEventSendingDispatcherWorker.Params(params.sessionId, updatedEvents, params.isEncrypted)
|
||||
return Result.success(WorkerParamsFactory.toData(sendParams))
|
||||
return Result.success(WorkerParamsFactory.toData(sendParams)).also {
|
||||
Timber.v("## handleSuccess $attachmentUrl, work is stopped $isStopped")
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateEvent(event: Event,
|
||||
|
|
|
@ -61,6 +61,7 @@ internal class DefaultContentDownloadStateTracker @Inject constructor() : Progre
|
|||
// private fun URL.toKey() = toString()
|
||||
|
||||
override fun update(url: String, bytesRead: Long, contentLength: Long, done: Boolean) {
|
||||
mainHandler.post {
|
||||
Timber.v("## DL Progress url:$url read:$bytesRead total:$contentLength done:$done")
|
||||
if (done) {
|
||||
updateState(url, ContentDownloadStateTracker.State.Success)
|
||||
|
@ -68,14 +69,17 @@ internal class DefaultContentDownloadStateTracker @Inject constructor() : Progre
|
|||
updateState(url, ContentDownloadStateTracker.State.Downloading(bytesRead, contentLength, contentLength == -1L))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun error(url: String, errorCode: Int) {
|
||||
mainHandler.post {
|
||||
Timber.v("## DL Progress Error code:$errorCode")
|
||||
updateState(url, ContentDownloadStateTracker.State.Failure(errorCode))
|
||||
listeners[url]?.forEach {
|
||||
tryThis { it.onDownloadStateUpdate(ContentDownloadStateTracker.State.Failure(errorCode)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateState(url: String, state: ContentDownloadStateTracker.State) {
|
||||
states[url] = state
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.room.send
|
||||
package org.matrix.android.sdk.internal.session.room.send
|
||||
|
||||
import im.vector.matrix.android.internal.session.SessionScope
|
||||
import org.matrix.android.sdk.internal.session.SessionScope
|
||||
import javax.inject.Inject
|
||||
|
||||
/**
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.room.send
|
||||
|
||||
import android.net.Uri
|
||||
import androidx.work.BackoffPolicy
|
||||
import androidx.work.ExistingWorkPolicy
|
||||
import androidx.work.OneTimeWorkRequest
|
||||
|
@ -26,7 +27,6 @@ import com.squareup.inject.assisted.AssistedInject
|
|||
import org.matrix.android.sdk.api.session.content.ContentAttachmentData
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.events.model.isImageMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.isTextMessage
|
||||
import org.matrix.android.sdk.api.session.room.model.message.OptionItem
|
||||
import org.matrix.android.sdk.api.session.room.send.SendService
|
||||
|
@ -45,6 +45,15 @@ import org.matrix.android.sdk.internal.worker.AlwaysSuccessfulWorker
|
|||
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
|
||||
import org.matrix.android.sdk.internal.worker.startChain
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.android.sdk.api.session.events.model.isAttachmentMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.toModel
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageFileContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageImageContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageWithAttachmentContent
|
||||
import org.matrix.android.sdk.api.session.room.model.message.getFileUrl
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
@ -60,7 +69,8 @@ internal class DefaultSendService @AssistedInject constructor(
|
|||
private val cryptoService: CryptoService,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val localEchoRepository: LocalEchoRepository,
|
||||
private val roomEventSender: RoomEventSender
|
||||
private val roomEventSender: RoomEventSender,
|
||||
private val cancelSendTracker: CancelSendTracker
|
||||
) : SendService {
|
||||
|
||||
@AssistedInject.Factory
|
||||
|
@ -136,36 +146,72 @@ internal class DefaultSendService @AssistedInject constructor(
|
|||
}
|
||||
|
||||
override fun resendMediaMessage(localEcho: TimelineEvent): Cancelable? {
|
||||
if (localEcho.root.isImageMessage() && localEcho.root.sendState.hasFailed()) {
|
||||
if (localEcho.root.sendState.hasFailed()) {
|
||||
// TODO this need a refactoring of attachement sending
|
||||
// val clearContent = localEcho.root.getClearContent()
|
||||
// val messageContent = clearContent?.toModel<MessageContent>() ?: return null
|
||||
// when (messageContent.type) {
|
||||
// MessageType.MSGTYPE_IMAGE -> {
|
||||
// val imageContent = clearContent.toModel<MessageImageContent>() ?: return null
|
||||
// val url = imageContent.url ?: return null
|
||||
// if (url.startsWith("mxc://")) {
|
||||
// //TODO
|
||||
// } else {
|
||||
// //The image has not yet been sent
|
||||
// val attachmentData = ContentAttachmentData(
|
||||
// size = imageContent.info!!.size.toLong(),
|
||||
// mimeType = imageContent.info.mimeType!!,
|
||||
// width = imageContent.info.width.toLong(),
|
||||
// height = imageContent.info.height.toLong(),
|
||||
// name = imageContent.body,
|
||||
// path = imageContent.url,
|
||||
// type = ContentAttachmentData.Type.IMAGE
|
||||
// )
|
||||
// monarchy.runTransactionSync {
|
||||
// EventEntity.where(it,eventId = localEcho.root.eventId ?: "").findFirst()?.let {
|
||||
// it.sendState = SendState.UNSENT
|
||||
// }
|
||||
// }
|
||||
// return internalSendMedia(localEcho.root,attachmentData)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
val clearContent = localEcho.root.getClearContent()
|
||||
val messageContent = clearContent?.toModel<MessageContent>() as? MessageWithAttachmentContent ?: return null
|
||||
|
||||
val url = messageContent.getFileUrl() ?: return null
|
||||
if (url.startsWith("mxc://")) {
|
||||
// We need to resend only the message as the attachment is ok
|
||||
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
|
||||
return sendEvent(localEcho.root)
|
||||
}
|
||||
// we need to resend the media
|
||||
|
||||
when (messageContent) {
|
||||
is MessageImageContent -> {
|
||||
// The image has not yet been sent
|
||||
val attachmentData = ContentAttachmentData(
|
||||
size = messageContent.info!!.size.toLong(),
|
||||
mimeType = messageContent.info.mimeType!!,
|
||||
width = messageContent.info.width.toLong(),
|
||||
height = messageContent.info.height.toLong(),
|
||||
name = messageContent.body,
|
||||
queryUri = Uri.parse(messageContent.url),
|
||||
type = ContentAttachmentData.Type.IMAGE
|
||||
)
|
||||
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
|
||||
return internalSendMedia(listOf(localEcho.root), attachmentData, true)
|
||||
}
|
||||
is MessageVideoContent -> {
|
||||
val attachmentData = ContentAttachmentData(
|
||||
size = messageContent.videoInfo?.size ?: 0L,
|
||||
mimeType = messageContent.mimeType,
|
||||
width = messageContent.videoInfo?.width?.toLong(),
|
||||
height = messageContent.videoInfo?.height?.toLong(),
|
||||
duration = messageContent.videoInfo?.duration?.toLong(),
|
||||
name = messageContent.body,
|
||||
queryUri = Uri.parse(messageContent.url),
|
||||
type = ContentAttachmentData.Type.VIDEO
|
||||
)
|
||||
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
|
||||
return internalSendMedia(listOf(localEcho.root), attachmentData, true)
|
||||
}
|
||||
is MessageFileContent -> {
|
||||
val attachmentData = ContentAttachmentData(
|
||||
size = messageContent.info!!.size,
|
||||
mimeType = messageContent.info.mimeType!!,
|
||||
name = messageContent.body,
|
||||
queryUri = Uri.parse(messageContent.url),
|
||||
type = ContentAttachmentData.Type.FILE
|
||||
)
|
||||
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
|
||||
return internalSendMedia(listOf(localEcho.root), attachmentData, true)
|
||||
}
|
||||
is MessageAudioContent -> {
|
||||
val attachmentData = ContentAttachmentData(
|
||||
size = messageContent.audioInfo?.size ?: 0,
|
||||
duration = messageContent.audioInfo?.duration?.toLong() ?: 0L,
|
||||
mimeType = messageContent.audioInfo?.mimeType,
|
||||
name = messageContent.body,
|
||||
queryUri = Uri.parse(messageContent.url),
|
||||
type = ContentAttachmentData.Type.AUDIO
|
||||
)
|
||||
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
|
||||
return internalSendMedia(listOf(localEcho.root), attachmentData, true)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
return null
|
||||
|
@ -196,15 +242,33 @@ internal class DefaultSendService @AssistedInject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
override fun cancelSend(eventId: String) {
|
||||
cancelSendTracker.markLocalEchoForCancel(eventId, roomId)
|
||||
taskExecutor.executorScope.launch {
|
||||
localEchoRepository.deleteFailedEcho(roomId, eventId)
|
||||
}
|
||||
}
|
||||
|
||||
override fun resendAllFailedMessages() {
|
||||
taskExecutor.executorScope.launch {
|
||||
val eventsToResend = localEchoRepository.getAllFailedEventsToResend(roomId)
|
||||
eventsToResend.forEach {
|
||||
sendEvent(it)
|
||||
}
|
||||
localEchoRepository.updateSendState(roomId, eventsToResend.mapNotNull { it.eventId }, SendState.UNSENT)
|
||||
if (it.root.isTextMessage()) {
|
||||
resendTextMessage(it)
|
||||
} else if (it.root.isAttachmentMessage()) {
|
||||
resendMediaMessage(it)
|
||||
}
|
||||
}
|
||||
localEchoRepository.updateSendState(roomId, eventsToResend.map { it.eventId }, SendState.UNSENT)
|
||||
}
|
||||
}
|
||||
|
||||
// override fun failAllPendingMessages() {
|
||||
// taskExecutor.executorScope.launch {
|
||||
// val eventsToResend = localEchoRepository.getAllEventsWithStates(roomId, SendState.PENDING_STATES)
|
||||
// localEchoRepository.updateSendState(roomId, eventsToResend.map { it.eventId }, SendState.UNDELIVERED)
|
||||
// }
|
||||
// }
|
||||
|
||||
override fun sendMedia(attachment: ContentAttachmentData,
|
||||
compressBeforeSending: Boolean,
|
||||
|
|
|
@ -54,6 +54,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
|
|||
|
||||
@Inject lateinit var crypto: CryptoService
|
||||
@Inject lateinit var localEchoRepository: LocalEchoRepository
|
||||
@Inject lateinit var cancelSendTracker: CancelSendTracker
|
||||
|
||||
override suspend fun doWork(): Result {
|
||||
Timber.v("Start Encrypt work")
|
||||
|
@ -61,7 +62,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
|
|||
?: return Result.success()
|
||||
.also { Timber.e("Unable to parse work parameters") }
|
||||
|
||||
Timber.v("Start Encrypt work for event ${params.event.eventId}")
|
||||
Timber.v("## SendEvent: Start Encrypt work for event ${params.event.eventId}")
|
||||
if (params.lastFailureMessage != null) {
|
||||
// Transmit the error
|
||||
return Result.success(inputData)
|
||||
|
@ -75,6 +76,12 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
|
|||
if (localEvent.eventId == null) {
|
||||
return Result.success()
|
||||
}
|
||||
|
||||
if (cancelSendTracker.isCancelRequestedFor(localEvent.eventId, localEvent.roomId)) {
|
||||
return Result.success()
|
||||
.also { Timber.e("## SendEvent: Event sending has been cancelled ${localEvent.eventId}") }
|
||||
}
|
||||
|
||||
localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
|
||||
|
||||
val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf()
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
|
|||
import org.matrix.android.sdk.internal.database.helper.nextId
|
||||
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.mapper.asDomain
|
||||
import org.matrix.android.sdk.internal.database.mapper.toEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
|
||||
|
@ -88,7 +87,7 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
|
|||
}
|
||||
|
||||
fun updateSendState(eventId: String, sendState: SendState) {
|
||||
Timber.v("Update local state of $eventId to ${sendState.name}")
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}")
|
||||
monarchy.writeAsync { realm ->
|
||||
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
|
||||
if (sendingEventEntity != null) {
|
||||
|
@ -114,9 +113,13 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
|
|||
}
|
||||
|
||||
suspend fun deleteFailedEcho(roomId: String, localEcho: TimelineEvent) {
|
||||
deleteFailedEcho(roomId, localEcho.eventId)
|
||||
}
|
||||
|
||||
suspend fun deleteFailedEcho(roomId: String, eventId: String?) {
|
||||
monarchy.awaitTransaction { realm ->
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = localEcho.root.eventId ?: "").findFirst()?.deleteFromRealm()
|
||||
EventEntity.where(realm, eventId = localEcho.root.eventId ?: "").findFirst()?.deleteFromRealm()
|
||||
TimelineEventEntity.where(realm, roomId = roomId, eventId = eventId ?: "").findFirst()?.deleteFromRealm()
|
||||
EventEntity.where(realm, eventId = eventId ?: "").findFirst()?.deleteFromRealm()
|
||||
roomSummaryUpdater.updateSendingInformation(realm, roomId)
|
||||
}
|
||||
}
|
||||
|
@ -142,45 +145,47 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
|
|||
}
|
||||
}
|
||||
|
||||
fun getAllFailedEventsToResend(roomId: String): List<Event> {
|
||||
fun getAllFailedEventsToResend(roomId: String): List<TimelineEvent> {
|
||||
return getAllEventsWithStates(roomId, SendState.HAS_FAILED_STATES)
|
||||
}
|
||||
|
||||
fun getAllEventsWithStates(roomId: String, states : List<SendState>): List<TimelineEvent> {
|
||||
return Realm.getInstance(monarchy.realmConfiguration).use { realm ->
|
||||
TimelineEventEntity
|
||||
.findAllInRoomWithSendStates(realm, roomId, SendState.HAS_FAILED_STATES)
|
||||
.findAllInRoomWithSendStates(realm, roomId, states)
|
||||
.sortedByDescending { it.displayIndex }
|
||||
.mapNotNull { it.root?.asDomain() }
|
||||
.mapNotNull { it?.let { timelineEventMapper.map(it) } }
|
||||
.filter { event ->
|
||||
when (event.getClearType()) {
|
||||
when (event.root.getClearType()) {
|
||||
EventType.MESSAGE,
|
||||
EventType.REDACTION,
|
||||
EventType.REACTION -> {
|
||||
val content = event.getClearContent().toModel<MessageContent>()
|
||||
val content = event.root.getClearContent().toModel<MessageContent>()
|
||||
if (content != null) {
|
||||
when (content.msgType) {
|
||||
MessageType.MSGTYPE_EMOTE,
|
||||
MessageType.MSGTYPE_NOTICE,
|
||||
MessageType.MSGTYPE_LOCATION,
|
||||
MessageType.MSGTYPE_TEXT -> {
|
||||
true
|
||||
}
|
||||
MessageType.MSGTYPE_TEXT,
|
||||
MessageType.MSGTYPE_FILE,
|
||||
MessageType.MSGTYPE_VIDEO,
|
||||
MessageType.MSGTYPE_IMAGE,
|
||||
MessageType.MSGTYPE_AUDIO -> {
|
||||
// need to resend the attachment
|
||||
false
|
||||
true
|
||||
}
|
||||
else -> {
|
||||
Timber.e("Cannot resend message ${event.type} / ${content.msgType}")
|
||||
Timber.e("Cannot resend message ${event.root.getClearType()} / ${content.msgType}")
|
||||
false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Timber.e("Unsupported message to resend ${event.type}")
|
||||
Timber.e("Unsupported message to resend ${event.root.getClearType()}")
|
||||
false
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
Timber.e("Unsupported message to resend ${event.type}")
|
||||
Timber.e("Unsupported message to resend ${event.root.getClearType()}")
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
|
|||
@Inject lateinit var localEchoRepository: LocalEchoRepository
|
||||
|
||||
override suspend fun doWork(): Result {
|
||||
Timber.v("Start dispatch sending multiple event work")
|
||||
Timber.v("## SendEvent: Start dispatch sending multiple event work")
|
||||
val params = WorkerParamsFactory.fromData<Params>(inputData)
|
||||
?: return Result.success()
|
||||
.also { Timber.e("Unable to parse work parameters") }
|
||||
|
@ -72,18 +72,21 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
|
|||
}
|
||||
// Transmit the error if needed?
|
||||
return Result.success(inputData)
|
||||
.also { Timber.e("Work cancelled due to input error from parent") }
|
||||
.also { Timber.e("## SendEvent: Work cancelled due to input error from parent ${params.lastFailureMessage}") }
|
||||
}
|
||||
|
||||
// Create a work for every event
|
||||
params.events.forEach { event ->
|
||||
if (params.isEncrypted) {
|
||||
Timber.v("Send event in encrypted room")
|
||||
localEchoRepository.updateSendState(event.eventId ?: "", SendState.ENCRYPTING)
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}")
|
||||
val encryptWork = createEncryptEventWork(params.sessionId, event, true)
|
||||
// Note that event will be replaced by the result of the previous work
|
||||
val sendWork = createSendEventWork(params.sessionId, event, false)
|
||||
timelineSendEventWorkCommon.postSequentialWorks(event.roomId!!, encryptWork, sendWork)
|
||||
} else {
|
||||
localEchoRepository.updateSendState(event.eventId ?: "", SendState.SENDING)
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}")
|
||||
val sendWork = createSendEventWork(params.sessionId, event, true)
|
||||
timelineSendEventWorkCommon.postWork(event.roomId!!, sendWork)
|
||||
}
|
||||
|
|
|
@ -39,13 +39,16 @@ internal class RoomEventSender @Inject constructor(
|
|||
) {
|
||||
fun sendEvent(event: Event): Cancelable {
|
||||
// Encrypted room handling
|
||||
return if (cryptoService.isRoomEncrypted(event.roomId ?: "")) {
|
||||
Timber.v("Send event in encrypted room")
|
||||
return if (cryptoService.isRoomEncrypted(event.roomId ?: "")
|
||||
&& !event.isEncrypted() // In case of resend where it's already encrypted so skip to send
|
||||
) {
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}")
|
||||
val encryptWork = createEncryptEventWork(event, true)
|
||||
// Note that event will be replaced by the result of the previous work
|
||||
val sendWork = createSendEventWork(event, false)
|
||||
timelineSendEventWorkCommon.postSequentialWorks(event.roomId ?: "", encryptWork, sendWork)
|
||||
} else {
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}")
|
||||
val sendWork = createSendEventWork(event, true)
|
||||
timelineSendEventWorkCommon.postWork(event.roomId ?: "", sendWork)
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.matrix.android.sdk.internal.worker.getSessionComponent
|
|||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
private const val MAX_NUMBER_OF_RETRY_BEFORE_FAILING = 3
|
||||
// private const val MAX_NUMBER_OF_RETRY_BEFORE_FAILING = 3
|
||||
|
||||
/**
|
||||
* Possible previous worker: [EncryptEventWorker] or first worker
|
||||
|
@ -56,12 +56,12 @@ internal class SendEventWorker(context: Context,
|
|||
@Inject lateinit var localEchoRepository: LocalEchoRepository
|
||||
@Inject lateinit var roomAPI: RoomAPI
|
||||
@Inject lateinit var eventBus: EventBus
|
||||
@Inject lateinit var cancelSendTracker: CancelSendTracker
|
||||
|
||||
override suspend fun doWork(): Result {
|
||||
val params = WorkerParamsFactory.fromData<Params>(inputData)
|
||||
?: return Result.success()
|
||||
.also { Timber.e("Unable to parse work parameters") }
|
||||
|
||||
.also { Timber.e("## SendEvent: Unable to parse work parameters") }
|
||||
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
|
||||
sessionComponent.inject(this)
|
||||
|
||||
|
@ -75,22 +75,32 @@ internal class SendEventWorker(context: Context,
|
|||
.also { Timber.e("Work cancelled due to bad input data") }
|
||||
}
|
||||
|
||||
if (cancelSendTracker.isCancelRequestedFor(params.eventId, params.roomId)) {
|
||||
return Result.success()
|
||||
.also {
|
||||
cancelSendTracker.markCancelled(params.eventId, params.roomId)
|
||||
Timber.e("## SendEvent: Event sending has been cancelled ${params.eventId}")
|
||||
}
|
||||
}
|
||||
|
||||
if (params.lastFailureMessage != null) {
|
||||
localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED)
|
||||
// Transmit the error
|
||||
return Result.success(inputData)
|
||||
.also { Timber.e("Work cancelled due to input error from parent") }
|
||||
}
|
||||
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Send event ${params.eventId}")
|
||||
return try {
|
||||
sendEvent(event.eventId, event.roomId, event.type, event.content)
|
||||
Result.success()
|
||||
} catch (exception: Throwable) {
|
||||
// It does start from 0, we want it to stop if it fails the third time
|
||||
val currentAttemptCount = runAttemptCount + 1
|
||||
if (currentAttemptCount >= MAX_NUMBER_OF_RETRY_BEFORE_FAILING || !exception.shouldBeRetried()) {
|
||||
localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED)
|
||||
if (/*currentAttemptCount >= MAX_NUMBER_OF_RETRY_BEFORE_FAILING ||**/ !exception.shouldBeRetried()) {
|
||||
Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed cannot retry ${params.eventId} > ${exception.localizedMessage}")
|
||||
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
|
||||
return Result.success()
|
||||
} else {
|
||||
Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed schedule retry ${params.eventId} > ${exception.localizedMessage}")
|
||||
Result.retry()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,6 +115,7 @@ internal class DefaultTimeline(
|
|||
if (!results.isLoaded || !results.isValid) {
|
||||
return@OrderedRealmCollectionChangeListener
|
||||
}
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] DB update for room $roomId")
|
||||
handleUpdates(results, changeSet)
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ internal class TimelineSendEventWorkCommon @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
fun postWork(roomId: String, workRequest: OneTimeWorkRequest, policy: ExistingWorkPolicy = ExistingWorkPolicy.APPEND): Cancelable {
|
||||
fun postWork(roomId: String, workRequest: OneTimeWorkRequest, policy: ExistingWorkPolicy = ExistingWorkPolicy.APPEND_OR_REPLACE): Cancelable {
|
||||
workManagerProvider.workManager
|
||||
.beginUniqueWork(buildWorkName(roomId), policy, workRequest)
|
||||
.enqueue()
|
||||
|
|
|
@ -26,8 +26,9 @@ import com.bumptech.glide.load.model.MultiModelLoaderFactory
|
|||
import com.bumptech.glide.signature.ObjectKey
|
||||
import im.vector.app.core.di.ActiveSessionHolder
|
||||
import im.vector.app.features.media.ImageContentRenderer
|
||||
import org.matrix.android.sdk.api.Matrix
|
||||
import okhttp3.OkHttpClient
|
||||
import org.matrix.android.sdk.api.MatrixCallback
|
||||
import org.matrix.android.sdk.api.session.file.FileService
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.io.IOException
|
||||
|
|
|
@ -34,6 +34,7 @@ import im.vector.app.core.glide.GlideApp
|
|||
import im.vector.app.core.glide.GlideRequest
|
||||
import im.vector.app.core.glide.GlideRequests
|
||||
import im.vector.app.core.utils.getColorFromUserId
|
||||
import org.matrix.android.sdk.api.extensions.tryThis
|
||||
import org.matrix.android.sdk.api.session.content.ContentUrlResolver
|
||||
import org.matrix.android.sdk.api.util.MatrixItem
|
||||
import javax.inject.Inject
|
||||
|
|
|
@ -61,7 +61,7 @@ import org.matrix.android.sdk.api.query.QueryStringValue
|
|||
import org.matrix.android.sdk.api.session.Session
|
||||
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
|
||||
import org.matrix.android.sdk.api.session.events.model.EventType
|
||||
import org.matrix.android.sdk.api.session.events.model.isImageMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.isAttachmentMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.isTextMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.toContent
|
||||
import org.matrix.android.sdk.api.session.events.model.toModel
|
||||
|
|
|
@ -35,6 +35,7 @@ import im.vector.app.features.settings.VectorPreferences
|
|||
import org.matrix.android.sdk.api.session.Session
|
||||
import org.matrix.android.sdk.api.session.crypto.keysbackup.KeysBackupState
|
||||
import org.matrix.android.sdk.api.session.events.model.EventType
|
||||
import org.matrix.android.sdk.api.session.events.model.isAttachmentMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.isTextMessage
|
||||
import org.matrix.android.sdk.api.session.events.model.toModel
|
||||
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
|
||||
|
|
|
@ -28,7 +28,6 @@ import im.vector.app.R
|
|||
import im.vector.app.core.glide.GlideApp
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.ContentUploadStateTrackerBinder
|
||||
import im.vector.app.features.media.ImageContentRenderer
|
||||
import im.vector.matrix.android.api.session.room.send.SendState
|
||||
|
||||
@EpoxyModelClass(layout = R.layout.item_timeline_event_base)
|
||||
abstract class MessageImageVideoItem : AbsMessageItem<MessageImageVideoItem.Holder>() {
|
||||
|
|
|
@ -39,6 +39,7 @@ import im.vector.app.core.utils.isLocalFile
|
|||
import org.matrix.android.sdk.api.session.content.ContentUrlResolver
|
||||
import org.matrix.android.sdk.internal.crypto.attachments.ElementToDecrypt
|
||||
import kotlinx.android.parcel.Parcelize
|
||||
import org.matrix.android.sdk.api.extensions.tryThis
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import javax.inject.Inject
|
||||
|
|
Loading…
Reference in New Issue