Compare commits

...

2 Commits

Author SHA1 Message Date
junkfood
2a6aade598
fix(sync): refactor SyncWorker for robust background sync 2024-11-26 00:43:22 +08:00
junkfood
54cebbc9ca
feat(sync): sync feeds concurrently 2024-11-24 21:51:17 +08:00
6 changed files with 56 additions and 54 deletions

View File

@ -8,13 +8,17 @@ import androidx.work.ListenableWorker
import androidx.work.WorkManager
import com.rometools.rome.feed.synd.SyndFeed
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import me.ash.reader.domain.model.account.Account
import me.ash.reader.domain.model.article.ArticleWithFeed
import me.ash.reader.domain.model.feed.Feed
@ -101,21 +105,18 @@ abstract class AbstractRssRepository(
val preTime = System.currentTimeMillis()
val preDate = Date(preTime)
val accountId = context.currentAccountId
feedDao.queryAll(accountId)
.chunked(16)
.forEach {
it.map { feed -> async { syncFeed(feed, preDate) } }
.awaitAll()
.forEach {
if (it.feed.isNotification) {
notificationHelper.notify(it.apply {
articles = articleDao.insertListIfNotExist(it.articles)
})
} else {
articleDao.insertListIfNotExist(it.articles)
}
val semaphore = Semaphore(16)
feedDao.queryAll(accountId).mapIndexed { _, feed ->
async(Dispatchers.IO) {
semaphore.withPermit {
val feedWithArticle = syncFeed(feed, preDate)
val newArticles = articleDao.insertListIfNotExist(feedWithArticle.articles)
if (feedWithArticle.feed.isNotification) {
notificationHelper.notify(feedWithArticle.copy(articles = newArticles))
}
}
}
}.awaitAll()
Log.i("RlOG", "onCompletion: ${System.currentTimeMillis() - preTime}")
accountDao.queryById(accountId)?.let { account ->
@ -200,39 +201,29 @@ abstract class AbstractRssRepository(
}
fun cancelSync() {
workManager.cancelAllWork()
SyncWorker.cancelPeriodicWork(workManager)
SyncWorker.cancelOneTimeWork(workManager)
}
fun doSyncOneTime() {
workManager.cancelAllWork()
SyncWorker.enqueueOneTimeWork(workManager)
}
suspend fun doSync(isOnStart: Boolean = false) {
workManager.cancelAllWork()
suspend fun initSync() {
accountDao.queryById(context.currentAccountId)?.let {
if (isOnStart) {
if (it.syncOnStart.value) {
SyncWorker.enqueueOneTimeWork(workManager)
}
if (it.syncInterval.value != SyncIntervalPreference.Manually.value) {
SyncWorker.enqueuePeriodicWork(
workManager = workManager,
syncInterval = it.syncInterval,
syncOnlyWhenCharging = it.syncOnlyWhenCharging,
syncOnlyOnWiFi = it.syncOnlyOnWiFi,
)
}
val syncOnStart = it.syncOnStart.value
if (syncOnStart) {
doSyncOneTime()
}
if (it.syncInterval.value != SyncIntervalPreference.Manually.value) {
SyncWorker.enqueuePeriodicWork(
workManager = workManager,
syncInterval = it.syncInterval,
syncOnlyWhenCharging = it.syncOnlyWhenCharging,
syncOnlyOnWiFi = it.syncOnlyOnWiFi,
)
} else {
SyncWorker.enqueueOneTimeWork(workManager)
if (it.syncInterval.value != SyncIntervalPreference.Manually.value) {
SyncWorker.enqueuePeriodicWork(
workManager = workManager,
syncInterval = it.syncInterval,
syncOnlyWhenCharging = it.syncOnlyWhenCharging,
syncOnlyOnWiFi = it.syncOnlyOnWiFi,
)
}
SyncWorker.cancelPeriodicWork(workManager)
}
}
}

View File

@ -33,15 +33,25 @@ class SyncWorker @AssistedInject constructor(
companion object {
private const val IS_SYNCING = "isSyncing"
const val WORK_NAME = "ReadYou"
lateinit var uuid: UUID
private const val WORK_NAME_PERIODIC = "ReadYou"
private const val WORK_NAME_ONETIME = "SYNC_ONETIME"
const val WORK_TAG = "SYNC_TAG"
fun cancelOneTimeWork(workManager: WorkManager) {
workManager.cancelUniqueWork(WORK_NAME_ONETIME)
}
fun cancelPeriodicWork(workManager: WorkManager) {
workManager.cancelUniqueWork(WORK_NAME_PERIODIC)
}
fun enqueueOneTimeWork(
workManager: WorkManager,
) {
workManager.enqueue(OneTimeWorkRequestBuilder<SyncWorker>()
.addTag(WORK_NAME)
.build()
workManager.enqueueUniqueWork(
WORK_NAME_ONETIME,
ExistingWorkPolicy.KEEP,
OneTimeWorkRequestBuilder<SyncWorker>().addTag(WORK_TAG).build()
)
}
@ -52,15 +62,16 @@ class SyncWorker @AssistedInject constructor(
syncOnlyOnWiFi: SyncOnlyOnWiFiPreference,
) {
workManager.enqueueUniquePeriodicWork(
WORK_NAME,
WORK_NAME_PERIODIC,
ExistingPeriodicWorkPolicy.UPDATE,
PeriodicWorkRequestBuilder<SyncWorker>(syncInterval.value, TimeUnit.MINUTES)
.setConstraints(Constraints.Builder()
.setRequiresCharging(syncOnlyWhenCharging.value)
.setRequiredNetworkType(if (syncOnlyOnWiFi.value) NetworkType.UNMETERED else NetworkType.CONNECTED)
.build()
.setConstraints(
Constraints.Builder()
.setRequiresCharging(syncOnlyWhenCharging.value)
.setRequiredNetworkType(if (syncOnlyOnWiFi.value) NetworkType.UNMETERED else NetworkType.CONNECTED)
.build()
)
.addTag(WORK_NAME)
.addTag(WORK_TAG)
.setInitialDelay(syncInterval.value, TimeUnit.MINUTES)
.build()
)

View File

@ -133,7 +133,7 @@ class AndroidApp : Application(), Configuration.Provider {
}
private suspend fun workerInit() {
rssService.get().doSync(isOnStart = true)
rssService.get().initSync()
}
private suspend fun checkUpdate() {

View File

@ -46,11 +46,11 @@ class HomeViewModel @Inject constructor(
private val _filterUiState = MutableStateFlow(FilterState())
val filterUiState = _filterUiState.asStateFlow()
val syncWorkLiveData = workManager.getWorkInfosByTagLiveData(SyncWorker.WORK_NAME)
val syncWorkLiveData = workManager.getWorkInfosByTagLiveData(SyncWorker.WORK_TAG)
fun sync() {
applicationScope.launch(ioDispatcher) {
rssService.get().doSync()
rssService.get().doSyncOneTime()
}
}

View File

@ -59,7 +59,7 @@ class SubscribeViewModel @Inject constructor(
fun importFromInputStream(inputStream: InputStream) {
applicationScope.launch {
opmlService.saveToDatabase(inputStream)
rssService.get().doSync()
rssService.get().doSyncOneTime()
}
}

View File

@ -34,7 +34,7 @@ class FlowViewModel @Inject constructor(
fun sync() {
applicationScope.launch(ioDispatcher) {
rssService.get().doSync()
rssService.get().doSyncOneTime()
}
}