diff --git a/app/src/main/java/org/unifiedpush/distributor/nextpush/AppCompanion.kt b/app/src/main/java/org/unifiedpush/distributor/nextpush/AppCompanion.kt index 5f7ad4e..5918bdd 100644 --- a/app/src/main/java/org/unifiedpush/distributor/nextpush/AppCompanion.kt +++ b/app/src/main/java/org/unifiedpush/distributor/nextpush/AppCompanion.kt @@ -26,6 +26,9 @@ object AppCompanion { /** Have we received the start event ? To check the reverse proxy timeout is high enough */ val started = AtomicBoolean(false) + /** Count messages since restart */ + val messageCounter = AtomicInteger(0) + /** Have we received the ping event ? To check the reverse proxy timeout is high enough */ val pinged = AtomicBoolean(false) val bufferedResponseChecked = AtomicBoolean(false) diff --git a/app/src/main/java/org/unifiedpush/distributor/nextpush/api/SSEListener.kt b/app/src/main/java/org/unifiedpush/distributor/nextpush/api/SSEListener.kt index f710ddb..b8dbd4a 100644 --- a/app/src/main/java/org/unifiedpush/distributor/nextpush/api/SSEListener.kt +++ b/app/src/main/java/org/unifiedpush/distributor/nextpush/api/SSEListener.kt @@ -55,6 +55,7 @@ class SSEListener(val context: Context) : EventSourceListener() { when (type) { "start" -> { AppCompanion.started.set(true) + AppCompanion.messageCounter.set(0) StartingTimer.stop() AppCompanion.bufferedResponseChecked.set(true) NoStartNotification(context).delete() @@ -80,6 +81,7 @@ class SSEListener(val context: Context) : EventSourceListener() { } "message" -> { + AppCompanion.messageCounter.incrementAndGet() val message = Gson().fromJson(data, SSEResponse::class.java) sendMessage( context, diff --git a/app/src/main/java/org/unifiedpush/distributor/nextpush/services/FailureCounter.kt b/app/src/main/java/org/unifiedpush/distributor/nextpush/services/FailureCounter.kt index e3c4332..fca8bcc 100644 --- a/app/src/main/java/org/unifiedpush/distributor/nextpush/services/FailureCounter.kt +++ b/app/src/main/java/org/unifiedpush/distributor/nextpush/services/FailureCounter.kt @@ -95,12 +95,14 @@ object FailureCounter { /** * Set the counter of failed events to 1. * - * Used when there is no Internet access. + * Used when there is no Internet access, or we want to restart the sync request to ack messages. + * + * The eventSource isn't closed, else it would call _onFailure_ and try to restart, which will + * be redundant */ fun setFailOnce() { Log.d(TAG, "setFailOnce") Atomics.nFails.set(1) - Atomics.eventSource.getAndSet(null)?.cancel() } fun clearFails() { diff --git a/app/src/main/java/org/unifiedpush/distributor/nextpush/services/RestartWorker.kt b/app/src/main/java/org/unifiedpush/distributor/nextpush/services/RestartWorker.kt index 65ca986..233c483 100644 --- a/app/src/main/java/org/unifiedpush/distributor/nextpush/services/RestartWorker.kt +++ b/app/src/main/java/org/unifiedpush/distributor/nextpush/services/RestartWorker.kt @@ -11,9 +11,6 @@ import org.unifiedpush.distributor.nextpush.AppCompanion import org.unifiedpush.distributor.nextpush.account.AccountFactory.getAccount import org.unifiedpush.distributor.nextpush.utils.TAG -private const val UNIQUE_PERIODIC_WORK_TAG = "nextpush::RestartWorker::unique_periodic" -private const val UNIQUE_ONETIME_WORK_TAG = "nextpush::RestartWorker::unique_onetime" - class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params) { /** @@ -35,8 +32,21 @@ class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params restartDate.time = it.time restartDate.add(Calendar.SECOND, AppCompanion.keepalive.get()) Log.d(TAG, "restartDate: ${restartDate.time}") - if (currentDate.after(restartDate)) { - Log.d(TAG, "We should have received an event before ${restartDate.time}. Restarting") + + /** + * We restart the service when: + * - we didn't receive the last ping + * - we have received more than [MAX_MSG] messages, so the received messages can be ack to the server + * and deleted from the server + */ + val timeoutExpire = currentDate.after(restartDate) + val msgCount = AppCompanion.messageCounter.get() + if (timeoutExpire || msgCount > MAX_MSG) { + if (timeoutExpire) { + Log.d(TAG, "We should have received an event before ${restartDate.time}. Restarting") + } else { + Log.d(TAG, "Restarting to acknowledge $msgCount messages.") + } /** * If there is at least one failure, we do not update [FailureCounter]'s counter, * it will be done by the next requests. @@ -61,6 +71,15 @@ class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params companion object { private val lock = Object() + private const val UNIQUE_PERIODIC_WORK_TAG = "nextpush::RestartWorker::unique_periodic" + private const val UNIQUE_ONETIME_WORK_TAG = "nextpush::RestartWorker::unique_onetime" + + /** + * Number of messages before restarting the service to acknowledge to the server received messages. + * + * 256 messages means a _maximum_ of 1MB of ack messages (if all of them are 4kB) + */ + private const val MAX_MSG = 256 fun startPeriodic(context: Context) { getAccount(context) ?: return