Improve local account synchronization and OPML import global speed
This commit is contained in:
parent
a7416db1b7
commit
cfcc5c48c7
@ -10,7 +10,11 @@ import com.readrops.db.Database
|
||||
import com.readrops.db.entities.Feed
|
||||
import com.readrops.db.entities.Item
|
||||
import com.readrops.db.entities.account.Account
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.awaitAll
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.Headers
|
||||
import org.jsoup.Jsoup
|
||||
@ -20,7 +24,8 @@ import org.koin.core.component.get
|
||||
class LocalRSSRepository(
|
||||
private val dataSource: LocalRSSDataSource,
|
||||
database: Database,
|
||||
account: Account
|
||||
account: Account,
|
||||
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
|
||||
) : BaseRepository(database, account), KoinComponent {
|
||||
|
||||
override suspend fun login(account: Account) { /* useless here */ }
|
||||
@ -28,7 +33,7 @@ class LocalRSSRepository(
|
||||
override suspend fun synchronize(
|
||||
selectedFeeds: List<Feed>,
|
||||
onUpdate: suspend (Feed) -> Unit
|
||||
): Pair<SyncResult, ErrorResult> {
|
||||
): Pair<SyncResult, ErrorResult> = withContext(dispatcher) {
|
||||
val errors = hashMapOf<Feed, Exception>()
|
||||
val newItems = mutableListOf<Item>()
|
||||
|
||||
@ -36,27 +41,35 @@ class LocalRSSRepository(
|
||||
database.feedDao().selectFeeds(account.id)
|
||||
}
|
||||
|
||||
for (feed in feeds) {
|
||||
onUpdate(feed)
|
||||
feeds.chunked(MAX_PARALLEL_REQUESTS)
|
||||
.map { mapFeeds ->
|
||||
mapFeeds.map { feed ->
|
||||
async {
|
||||
ensureActive()
|
||||
|
||||
val headers = Headers.Builder()
|
||||
if (feed.etag != null) {
|
||||
headers[ApiUtils.IF_NONE_MATCH_HEADER] = feed.etag!!
|
||||
}
|
||||
if (feed.lastModified != null) {
|
||||
headers[ApiUtils.IF_MODIFIED_HEADER] = feed.lastModified!!
|
||||
val headers = Headers.Builder()
|
||||
if (feed.etag != null) {
|
||||
headers[ApiUtils.IF_NONE_MATCH_HEADER] = feed.etag!!
|
||||
}
|
||||
if (feed.lastModified != null) {
|
||||
headers[ApiUtils.IF_MODIFIED_HEADER] = feed.lastModified!!
|
||||
}
|
||||
|
||||
try {
|
||||
val pair = dataSource.queryRSSResource(feed.url!!, headers.build())
|
||||
|
||||
pair?.let { newItems.addAll(insertNewItems(it.second, feed)) }
|
||||
} catch (e: Exception) {
|
||||
errors[feed] = e
|
||||
}
|
||||
|
||||
onUpdate(feed)
|
||||
}
|
||||
}
|
||||
.awaitAll()
|
||||
}
|
||||
|
||||
try {
|
||||
val pair = dataSource.queryRSSResource(feed.url!!, headers.build())
|
||||
|
||||
pair?.let { newItems.addAll(insertNewItems(it.second, feed)) }
|
||||
} catch (e: Exception) {
|
||||
errors[feed] = e
|
||||
}
|
||||
}
|
||||
|
||||
return Pair(SyncResult(items = newItems), errors)
|
||||
SyncResult(items = newItems) to errors
|
||||
}
|
||||
|
||||
override suspend fun synchronize(): SyncResult =
|
||||
@ -66,21 +79,29 @@ class LocalRSSRepository(
|
||||
override suspend fun insertNewFeeds(
|
||||
newFeeds: List<Feed>,
|
||||
onUpdate: (Feed) -> Unit
|
||||
): ErrorResult = withContext(Dispatchers.IO) {
|
||||
): ErrorResult = withContext(dispatcher) {
|
||||
val errors = hashMapOf<Feed, Exception>()
|
||||
|
||||
for (newFeed in newFeeds) {
|
||||
onUpdate(newFeed)
|
||||
newFeeds.chunked(MAX_PARALLEL_REQUESTS)
|
||||
.map { newFeedsMap ->
|
||||
newFeedsMap.map { newFeed ->
|
||||
async {
|
||||
ensureActive()
|
||||
|
||||
try {
|
||||
val result = dataSource.queryRSSResource(newFeed.url!!, null)!!
|
||||
insertFeed(result.first.also { it.folderId = newFeed.folderId })
|
||||
} catch (e: Exception) {
|
||||
errors[newFeed] = e
|
||||
try {
|
||||
val result = dataSource.queryRSSResource(newFeed.url!!, null)!!
|
||||
insertFeed(result.first.also { it.folderId = newFeed.folderId })
|
||||
} catch (e: Exception) {
|
||||
errors[newFeed] = e
|
||||
}
|
||||
|
||||
onUpdate(newFeed)
|
||||
}
|
||||
}
|
||||
.awaitAll()
|
||||
}
|
||||
}
|
||||
|
||||
return@withContext errors
|
||||
errors
|
||||
}
|
||||
|
||||
private suspend fun insertNewItems(items: List<Item>, feed: Feed): List<Item> {
|
||||
@ -129,10 +150,14 @@ class LocalRSSRepository(
|
||||
feedUrl?.let { color = FeedColors.getFeedColor(it) }
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.e("LocalRSSRepository", "insertFeed: ${e.message}")
|
||||
Log.e("LocalRSSRepository", "getFaviconLink: ${e.message}")
|
||||
}
|
||||
|
||||
id = database.feedDao().insert(this).toInt()
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val MAX_PARALLEL_REQUESTS = 30
|
||||
}
|
||||
}
|
@ -213,6 +213,7 @@ abstract class BaseRepository(
|
||||
onUpdate: (Feed) -> Unit
|
||||
): ErrorResult {
|
||||
val errors = hashMapOf<Feed, Exception>()
|
||||
val feedsToInsert = arrayListOf<Feed>()
|
||||
|
||||
for ((folder, feeds) in foldersAndFeeds) {
|
||||
if (folder != null) {
|
||||
@ -220,21 +221,18 @@ abstract class BaseRepository(
|
||||
|
||||
val dbFolder = database.folderDao().selectFolderByName(folder.name!!, account.id)
|
||||
|
||||
if (dbFolder != null) {
|
||||
folder.id = dbFolder.id
|
||||
} else {
|
||||
folder.id = database.folderDao().insert(folder).toInt()
|
||||
}
|
||||
folder.id = dbFolder?.id ?: database.folderDao().insert(folder).toInt()
|
||||
}
|
||||
|
||||
feeds.forEach { it.folderId = folder?.id }
|
||||
|
||||
errors += insertNewFeeds(
|
||||
newFeeds = feeds,
|
||||
onUpdate = onUpdate
|
||||
)
|
||||
feedsToInsert += feeds
|
||||
}
|
||||
|
||||
errors += insertNewFeeds(
|
||||
newFeeds = feedsToInsert,
|
||||
onUpdate = onUpdate
|
||||
)
|
||||
|
||||
return errors
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user