Simple rate limiting of RS sending

This commit is contained in:
Valere 2021-12-06 11:42:19 +01:00
parent 401e8e6712
commit 56dac76ca8
1 changed files with 140 additions and 80 deletions

View File

@ -25,6 +25,9 @@ import im.vector.app.features.settings.VectorPreferences
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -52,10 +55,43 @@ class AutoRageShaker @Inject constructor(
private val uisiDetectors = mutableMapOf<String, UISIDetector>() private val uisiDetectors = mutableMapOf<String, UISIDetector>()
private var currentActiveSessionId: String? = null private var currentActiveSessionId: String? = null
// Simple in memory cache of already sent report
private data class ReportInfo(
val roomId: String,
val sessionId: String
)
private val alreadyReportedUisi = mutableListOf<ReportInfo>()
private val e2eDetectedFlow = MutableSharedFlow<E2EMessageDetected>(replay = 0)
private val matchingRSRequestFlow = MutableSharedFlow<Event>(replay = 0)
fun initialize() { fun initialize() {
observeActiveSession() observeActiveSession()
// It's a singleton... // It's a singleton...
vectorPreferences.subscribeToChanges(this) vectorPreferences.subscribeToChanges(this)
// Simple rate limit, notice that order is not
// necessarily preserved
e2eDetectedFlow
.onEach {
sendRageShake(it)
delay(2_000)
}
.catch { cause ->
Timber.w(cause, "Failed to RS")
}
.launchIn(coroutineScope)
matchingRSRequestFlow
.onEach {
sendMatchingRageShake(it)
delay(2_000)
}
.catch { cause ->
Timber.w(cause, "Failed to send matching rageshake")
}
.launchIn(coroutineScope)
} }
override fun onSharedPreferenceChanged(sharedPreferences: SharedPreferences?, key: String?) { override fun onSharedPreferenceChanged(sharedPreferences: SharedPreferences?, key: String?) {
@ -83,7 +119,22 @@ class AutoRageShaker @Inject constructor(
fun decryptionErrorDetected(target: E2EMessageDetected) { fun decryptionErrorDetected(target: E2EMessageDetected) {
if (target.source == UISIEventSource.INITIAL_SYNC) return if (target.source == UISIEventSource.INITIAL_SYNC) return
if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return
val shouldSendRS = synchronized(alreadyReportedUisi) {
val reportInfo = ReportInfo(target.roomId, target.sessionId)
val alreadySent = alreadyReportedUisi.contains(reportInfo)
if (!alreadySent) {
alreadyReportedUisi.add(reportInfo)
}
!alreadySent
}
if (shouldSendRS) {
coroutineScope.launch { coroutineScope.launch {
e2eDetectedFlow.emit(target)
}
}
}
private fun sendRageShake(target: E2EMessageDetected) {
bugReporter.sendBugReport( bugReporter.sendBugReport(
context = context, context = context,
reportType = ReportType.AUTO_UISI, reportType = ReportType.AUTO_UISI,
@ -105,9 +156,15 @@ class AutoRageShaker @Inject constructor(
}), }),
listener = object : BugReporter.IMXBugReportListener { listener = object : BugReporter.IMXBugReportListener {
override fun onUploadCancelled() { override fun onUploadCancelled() {
synchronized(alreadyReportedUisi) {
alreadyReportedUisi.remove(ReportInfo(target.roomId, target.sessionId))
}
} }
override fun onUploadFailed(reason: String?) { override fun onUploadFailed(reason: String?) {
synchronized(alreadyReportedUisi) {
alreadyReportedUisi.remove(ReportInfo(target.roomId, target.sessionId))
}
} }
override fun onProgress(progress: Int) { override fun onProgress(progress: Int) {
@ -140,13 +197,17 @@ class AutoRageShaker @Inject constructor(
} }
}) })
} }
}
fun remoteAutoUISIRequest(event: Event) { fun remoteAutoUISIRequest(event: Event) {
if (event.type != AUTO_RS_REQUEST) return if (event.type != AUTO_RS_REQUEST) return
if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return
coroutineScope.launch { coroutineScope.launch {
matchingRSRequestFlow.emit(event)
}
}
private fun sendMatchingRageShake(event: Event) {
val eventId = event.content?.get("event_id") val eventId = event.content?.get("event_id")
val roomId = event.content?.get("room_id") val roomId = event.content?.get("room_id")
val sessionId = event.content?.get("session_id") val sessionId = event.content?.get("session_id")
@ -179,7 +240,6 @@ class AutoRageShaker @Inject constructor(
listener = null listener = null
) )
} }
}
private val detector = UISIDetector().apply { private val detector = UISIDetector().apply {
callback = object : UISIDetector.UISIDetectorCallback { callback = object : UISIDetector.UISIDetectorCallback {