Restart service with worker if 256+ messages have been received

To acknowledge them, so the server can delete them
This commit is contained in:
sim 2024-11-29 15:29:49 +00:00
parent fa2d699b1e
commit 2e1e309ebb
4 changed files with 33 additions and 7 deletions

View File

@ -26,6 +26,9 @@ object AppCompanion {
/** Have we received the start event ? To check the reverse proxy timeout is high enough */ /** Have we received the start event ? To check the reverse proxy timeout is high enough */
val started = AtomicBoolean(false) val started = AtomicBoolean(false)
/** Count messages since restart */
val messageCounter = AtomicInteger(0)
/** Have we received the ping event ? To check the reverse proxy timeout is high enough */ /** Have we received the ping event ? To check the reverse proxy timeout is high enough */
val pinged = AtomicBoolean(false) val pinged = AtomicBoolean(false)
val bufferedResponseChecked = AtomicBoolean(false) val bufferedResponseChecked = AtomicBoolean(false)

View File

@ -55,6 +55,7 @@ class SSEListener(val context: Context) : EventSourceListener() {
when (type) { when (type) {
"start" -> { "start" -> {
AppCompanion.started.set(true) AppCompanion.started.set(true)
AppCompanion.messageCounter.set(0)
StartingTimer.stop() StartingTimer.stop()
AppCompanion.bufferedResponseChecked.set(true) AppCompanion.bufferedResponseChecked.set(true)
NoStartNotification(context).delete() NoStartNotification(context).delete()
@ -80,6 +81,7 @@ class SSEListener(val context: Context) : EventSourceListener() {
} }
"message" -> { "message" -> {
AppCompanion.messageCounter.incrementAndGet()
val message = Gson().fromJson(data, SSEResponse::class.java) val message = Gson().fromJson(data, SSEResponse::class.java)
sendMessage( sendMessage(
context, context,

View File

@ -95,12 +95,14 @@ object FailureCounter {
/** /**
* Set the counter of failed events to 1. * Set the counter of failed events to 1.
* *
* Used when there is no Internet access. * Used when there is no Internet access, or we want to restart the sync request to ack messages.
*
* The eventSource isn't closed, else it would call _onFailure_ and try to restart, which will
* be redundant
*/ */
fun setFailOnce() { fun setFailOnce() {
Log.d(TAG, "setFailOnce") Log.d(TAG, "setFailOnce")
Atomics.nFails.set(1) Atomics.nFails.set(1)
Atomics.eventSource.getAndSet(null)?.cancel()
} }
fun clearFails() { fun clearFails() {

View File

@ -11,9 +11,6 @@ import org.unifiedpush.distributor.nextpush.AppCompanion
import org.unifiedpush.distributor.nextpush.account.AccountFactory.getAccount import org.unifiedpush.distributor.nextpush.account.AccountFactory.getAccount
import org.unifiedpush.distributor.nextpush.utils.TAG import org.unifiedpush.distributor.nextpush.utils.TAG
private const val UNIQUE_PERIODIC_WORK_TAG = "nextpush::RestartWorker::unique_periodic"
private const val UNIQUE_ONETIME_WORK_TAG = "nextpush::RestartWorker::unique_onetime"
class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params) { class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params) {
/** /**
@ -35,8 +32,21 @@ class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params
restartDate.time = it.time restartDate.time = it.time
restartDate.add(Calendar.SECOND, AppCompanion.keepalive.get()) restartDate.add(Calendar.SECOND, AppCompanion.keepalive.get())
Log.d(TAG, "restartDate: ${restartDate.time}") Log.d(TAG, "restartDate: ${restartDate.time}")
if (currentDate.after(restartDate)) {
/**
* We restart the service when:
* - we didn't receive the last ping
* - we have received more than [MAX_MSG] messages, so the received messages can be ack to the server
* and deleted from the server
*/
val timeoutExpire = currentDate.after(restartDate)
val msgCount = AppCompanion.messageCounter.get()
if (timeoutExpire || msgCount > MAX_MSG) {
if (timeoutExpire) {
Log.d(TAG, "We should have received an event before ${restartDate.time}. Restarting") Log.d(TAG, "We should have received an event before ${restartDate.time}. Restarting")
} else {
Log.d(TAG, "Restarting to acknowledge $msgCount messages.")
}
/** /**
* If there is at least one failure, we do not update [FailureCounter]'s counter, * If there is at least one failure, we do not update [FailureCounter]'s counter,
* it will be done by the next requests. * it will be done by the next requests.
@ -61,6 +71,15 @@ class RestartWorker(ctx: Context, params: WorkerParameters) : Worker(ctx, params
companion object { companion object {
private val lock = Object() private val lock = Object()
private const val UNIQUE_PERIODIC_WORK_TAG = "nextpush::RestartWorker::unique_periodic"
private const val UNIQUE_ONETIME_WORK_TAG = "nextpush::RestartWorker::unique_onetime"
/**
* Number of messages before restarting the service to acknowledge to the server received messages.
*
* 256 messages means a _maximum_ of 1MB of ack messages (if all of them are 4kB)
*/
private const val MAX_MSG = 256
fun startPeriodic(context: Context) { fun startPeriodic(context: Context) {
getAccount(context) ?: return getAccount(context) ?: return