Use MainThreadOperation to prevent article status updates from interleaving

This commit is contained in:
Maurice Parker 2020-05-02 10:02:58 -05:00
parent 6bdcd941ae
commit 85f7adaa0a
6 changed files with 305 additions and 149 deletions

View File

@ -89,6 +89,9 @@
51E4DB302426353D0091EB5B /* CloudKitAccountZone.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51E4DB2F2426353D0091EB5B /* CloudKitAccountZone.swift */; };
51E59599228C77BC00FCC42B /* FeedbinUnreadEntry.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51E59598228C77BC00FCC42B /* FeedbinUnreadEntry.swift */; };
51E5959B228C781500FCC42B /* FeedbinStarredEntry.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51E5959A228C781500FCC42B /* FeedbinStarredEntry.swift */; };
51F6C58F245DB2E8001E41CA /* CloudKitSendStatusOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51F6C58E245DB2E8001E41CA /* CloudKitSendStatusOperation.swift */; };
51F6C591245DB302001E41CA /* CloudKitReceiveStatusOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51F6C590245DB302001E41CA /* CloudKitReceiveStatusOperation.swift */; };
51F6C593245DBA8E001E41CA /* CloudKitRemoteNotificationOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 51F6C592245DBA8E001E41CA /* CloudKitRemoteNotificationOperation.swift */; };
552032F8229D5D5A009559E0 /* ReaderAPIEntry.swift in Sources */ = {isa = PBXBuildFile; fileRef = 552032ED229D5D5A009559E0 /* ReaderAPIEntry.swift */; };
552032F9229D5D5A009559E0 /* ReaderAPISubscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 552032EE229D5D5A009559E0 /* ReaderAPISubscription.swift */; };
552032FB229D5D5A009559E0 /* ReaderAPITag.swift in Sources */ = {isa = PBXBuildFile; fileRef = 552032F0229D5D5A009559E0 /* ReaderAPITag.swift */; };
@ -340,6 +343,9 @@
51E4DB2F2426353D0091EB5B /* CloudKitAccountZone.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CloudKitAccountZone.swift; sourceTree = "<group>"; };
51E59598228C77BC00FCC42B /* FeedbinUnreadEntry.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FeedbinUnreadEntry.swift; sourceTree = "<group>"; };
51E5959A228C781500FCC42B /* FeedbinStarredEntry.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FeedbinStarredEntry.swift; sourceTree = "<group>"; };
51F6C58E245DB2E8001E41CA /* CloudKitSendStatusOperation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CloudKitSendStatusOperation.swift; sourceTree = "<group>"; };
51F6C590245DB302001E41CA /* CloudKitReceiveStatusOperation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CloudKitReceiveStatusOperation.swift; sourceTree = "<group>"; };
51F6C592245DBA8E001E41CA /* CloudKitRemoteNotificationOperation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CloudKitRemoteNotificationOperation.swift; sourceTree = "<group>"; };
552032ED229D5D5A009559E0 /* ReaderAPIEntry.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReaderAPIEntry.swift; sourceTree = "<group>"; };
552032EE229D5D5A009559E0 /* ReaderAPISubscription.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReaderAPISubscription.swift; sourceTree = "<group>"; };
552032F0229D5D5A009559E0 /* ReaderAPITag.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReaderAPITag.swift; sourceTree = "<group>"; };
@ -555,10 +561,13 @@
5103A9D82422546800410853 /* CloudKitAccountDelegate.swift */,
51E4DB2F2426353D0091EB5B /* CloudKitAccountZone.swift */,
512DD4CC2431098700C17B1F /* CloudKitAccountZoneDelegate.swift */,
5139A6372459822D004D960C /* CloudKitArticleStatusUpdate.swift */,
519E84A72434C5EF00D238B0 /* CloudKitArticlesZone.swift */,
519E84AB2435019100D238B0 /* CloudKitArticlesZoneDelegate.swift */,
5150FFFD243823B800C1A442 /* CloudKitError.swift */,
5139A6372459822D004D960C /* CloudKitArticleStatusUpdate.swift */,
51F6C590245DB302001E41CA /* CloudKitReceiveStatusOperation.swift */,
51F6C592245DBA8E001E41CA /* CloudKitRemoteNotificationOperation.swift */,
51F6C58E245DB2E8001E41CA /* CloudKitSendStatusOperation.swift */,
51E4DB2D242633ED0091EB5B /* CloudKitZone.swift */,
51C034DE242D65D20014DC71 /* CloudKitZoneResult.swift */,
);
@ -1137,6 +1146,7 @@
9EC688EE232C58E800A8D0A2 /* OAuthAuthorizationCodeGranting.swift in Sources */,
9EEAE071235D019B00E3FEE4 /* FeedlyGetStreamContentsService.swift in Sources */,
9E7299D9235062A200DAEFB7 /* FeedlyResourceProviding.swift in Sources */,
51F6C591245DB302001E41CA /* CloudKitReceiveStatusOperation.swift in Sources */,
9E672394236F7CA0000BE141 /* FeedlyRefreshAccessTokenOperation.swift in Sources */,
514BF5202391B0DB00902FE8 /* SingleArticleFetcher.swift in Sources */,
9EC688EC232C583300A8D0A2 /* FeedlyAccountDelegate+OAuth.swift in Sources */,
@ -1176,6 +1186,7 @@
519E84A62433D49000D238B0 /* OPMLNormalizer.swift in Sources */,
9EEEF71F23545CB4009E9D80 /* FeedlySendArticleStatusesOperation.swift in Sources */,
9EBD49C223C67784005AD5CD /* FeedlyEntryIdentifierProviding.swift in Sources */,
51F6C593245DBA8E001E41CA /* CloudKitRemoteNotificationOperation.swift in Sources */,
846E77541F6F00E300A165E2 /* AccountManager.swift in Sources */,
51E490362288C37100C791F0 /* FeedbinDate.swift in Sources */,
9EEAE06E235D002D00E3FEE4 /* FeedlyGetCollectionsService.swift in Sources */,
@ -1280,6 +1291,7 @@
179DBD4ECC1C9712DF51DB8C /* NewsBlurFolderChange.swift in Sources */,
179DBCB4B11C88EBE852A015 /* NewsBlurFeedChange.swift in Sources */,
179DBE829FDF48E102F73244 /* NewsBlurAccountDelegate+Internal.swift in Sources */,
51F6C58F245DB2E8001E41CA /* CloudKitSendStatusOperation.swift in Sources */,
179DB3A93E3205EF29C2AF62 /* NewsBlurAPICaller+Internal.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;

View File

@ -18,10 +18,16 @@ import ArticlesDatabase
import RSWeb
import Secrets
public enum CloudKitAccountDelegateError: String, Error {
case invalidParameter = "An invalid parameter was used."
enum CloudKitAccountDelegateError: LocalizedError {
case invalidParameter
case unknown
var errorDescription: String? {
return NSLocalizedString("An unexpected CloudKit error occurred.", comment: "An unexpected CloudKit error occurred.")
}
}
final class CloudKitAccountDelegate: AccountDelegate {
private var log = OSLog(subsystem: Bundle.main.bundleIdentifier!, category: "CloudKit")
@ -33,10 +39,11 @@ final class CloudKitAccountDelegate: AccountDelegate {
return CKContainer(identifier: "iCloud.\(orgID).NetNewsWire")
}()
private lazy var zones: [CloudKitZone] = [accountZone, articlesZone]
private let accountZone: CloudKitAccountZone
private let articlesZone: CloudKitArticlesZone
private let mainThreadOperationQueue = MainThreadOperationQueue()
private lazy var refresher: LocalAccountRefresher = {
let refresher = LocalAccountRefresher()
refresher.delegate = self
@ -63,21 +70,11 @@ final class CloudKitAccountDelegate: AccountDelegate {
}
func receiveRemoteNotification(for account: Account, userInfo: [AnyHashable : Any], completion: @escaping () -> Void) {
os_log(.debug, log: log, "Processing remote notification...")
let group = DispatchGroup()
zones.forEach { zone in
group.enter()
zone.receiveRemoteNotification(userInfo: userInfo) {
group.leave()
}
}
group.notify(queue: DispatchQueue.main) {
os_log(.debug, log: self.log, "Done processing remote notification...")
let op = CloudKitRemoteNotificationOperation(accountZone: accountZone, articlesZone: articlesZone, userInfo: userInfo)
op.completionBlock = { mainThreadOperaion in
completion()
}
mainThreadOperationQueue.add(op)
}
func refreshAll(for account: Account, completion: @escaping (Result<Void, Error>) -> Void) {
@ -101,17 +98,15 @@ final class CloudKitAccountDelegate: AccountDelegate {
}
func refreshArticleStatus(for account: Account, completion: @escaping ((Result<Void, Error>) -> Void)) {
os_log(.debug, log: log, "Refreshing article statuses...")
articlesZone.refreshArticles() { result in
os_log(.debug, log: self.log, "Done refreshing article statuses.")
switch result {
case .success:
let op = CloudKitReceiveStatusOperation(articlesZone: articlesZone)
op.completionBlock = { mainThreadOperaion in
if mainThreadOperaion.isCanceled {
completion(.failure(CloudKitAccountDelegateError.unknown))
} else {
completion(.success(()))
case .failure(let error):
completion(.failure(error))
}
}
mainThreadOperationQueue.add(op)
}
func importOPML(for account:Account, opmlFile: URL, completion: @escaping (Result<Void, Error>) -> Void) {
@ -420,17 +415,15 @@ final class CloudKitAccountDelegate: AccountDelegate {
os_log(.error, log: self.log, "Error adding account container: %@", error.localizedDescription)
}
}
zones.forEach { zone in
zone.subscribeToZoneChanges()
}
accountZone.subscribeToZoneChanges()
articlesZone.subscribeToZoneChanges()
}
}
func accountWillBeDeleted(_ account: Account) {
zones.forEach { zone in
zone.resetChangeToken()
}
accountZone.resetChangeToken()
articlesZone.resetChangeToken()
}
static func validateCredentials(transport: Transport, credentials: Credentials, endpoint: URL? = nil, completion: (Result<Credentials?, Error>) -> Void) {
@ -770,94 +763,19 @@ private extension CloudKitAccountDelegate {
}
func sendArticleStatus(for account: Account, showProgress: Bool, completion: @escaping ((Result<Void, Error>) -> Void)) {
os_log(.debug, log: log, "Sending article statuses...")
database.selectForProcessing { result in
func processStatuses(_ syncStatuses: [SyncStatus]) {
guard syncStatuses.count > 0 else {
os_log(.debug, log: self.log, "Done sending article statuses.")
completion(.success(()))
return
}
let group = DispatchGroup()
let syncStatusChunks = syncStatuses.chunked(into: 300)
if showProgress {
self.refreshProgress.addToNumberOfTasksAndRemaining(syncStatusChunks.count)
}
for syncStatusChunk in syncStatusChunks {
group.enter()
self.sendArticleStatusChunk(for: account, syncStatuses: syncStatusChunk, showProgress: showProgress) {
group.leave()
}
}
group.notify(queue: DispatchQueue.main) {
os_log(.debug, log: self.log, "Done sending article statuses.")
completion(.success(()))
}
}
switch result {
case .success(let syncStatuses):
processStatuses(syncStatuses)
case .failure(let databaseError):
completion(.failure(databaseError))
let op = CloudKitSendStatusOperation(account: account,
articlesZone: articlesZone,
refreshProgress: refreshProgress,
showProgress: showProgress,
database: database)
op.completionBlock = { mainThreadOperaion in
if mainThreadOperaion.isCanceled {
completion(.failure(CloudKitAccountDelegateError.unknown))
} else {
completion(.success(()))
}
}
}
func sendArticleStatusChunk(for account: Account, syncStatuses: [SyncStatus], showProgress: Bool, completion: @escaping () -> Void) {
let articleIDs = syncStatuses.map({ $0.articleID })
account.fetchArticlesAsync(.articleIDs(Set(articleIDs))) { result in
func processWithArticles(_ articles: Set<Article>) {
let syncStatusesDict = Dictionary(grouping: syncStatuses, by: { $0.articleID })
let articlesDict = articles.reduce(into: [String: Article]()) { result, article in
result[article.articleID] = article
}
let statusUpdates = syncStatusesDict.map { (key, value) in
return CloudKitArticleStatusUpdate(articleID: key, statuses: value, article: articlesDict[key])
}
self.articlesZone.modifyArticles(statusUpdates) { result in
switch result {
case .success:
self.database.deleteSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
if showProgress {
self.refreshProgress.completeTask()
}
os_log(.debug, log: self.log, "Done sending article status block...")
completion()
}
case .failure(let error):
self.database.resetSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
self.processAccountError(account, error)
os_log(.error, log: self.log, "Send article status modify articles error: %@.", error.localizedDescription)
completion()
}
}
}
}
switch result {
case .success(let articles):
processWithArticles(articles)
case .failure(let databaseError):
self.database.resetSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
os_log(.error, log: self.log, "Send article status fetch articles error: %@.", databaseError.localizedDescription)
completion()
}
}
}
mainThreadOperationQueue.add(op)
}
}
@ -875,3 +793,4 @@ extension CloudKitAccountDelegate: LocalAccountRefresherDelegate {
}
}

View File

@ -0,0 +1,50 @@
//
// CloudKitReceiveStatusOperation.swift
// Account
//
// Created by Maurice Parker on 5/2/20.
// Copyright © 2020 Ranchero Software, LLC. All rights reserved.
//
import Foundation
import os.log
import RSCore
class CloudKitReceiveStatusOperation: MainThreadOperation {
private var log = OSLog(subsystem: Bundle.main.bundleIdentifier!, category: "CloudKit")
// MainThreadOperation
public var isCanceled = false
public var id: Int?
public weak var operationDelegate: MainThreadOperationDelegate?
public var name: String? = "CloudKitReceiveStatusOperation"
public var completionBlock: MainThreadOperation.MainThreadOperationCompletionBlock?
private weak var articlesZone: CloudKitArticlesZone?
init(articlesZone: CloudKitArticlesZone) {
self.articlesZone = articlesZone
}
func run() {
guard let articlesZone = articlesZone else {
self.operationDelegate?.operationDidComplete(self)
return
}
os_log(.debug, log: log, "Refreshing article statuses...")
articlesZone.refreshArticles() { result in
os_log(.debug, log: self.log, "Done refreshing article statuses.")
switch result {
case .success:
self.operationDelegate?.operationDidComplete(self)
case .failure(let error):
os_log(.error, log: self.log, "Receive status error: %@.", error.localizedDescription)
self.operationDelegate?.cancelOperation(self)
}
}
}
}

View File

@ -0,0 +1,52 @@
//
// CloudKitRemoteNotificationOperation.swift
// Account
//
// Created by Maurice Parker on 5/2/20.
// Copyright © 2020 Ranchero Software, LLC. All rights reserved.
//
import Foundation
import os.log
import RSCore
class CloudKitRemoteNotificationOperation: MainThreadOperation {
private var log = OSLog(subsystem: Bundle.main.bundleIdentifier!, category: "CloudKit")
// MainThreadOperation
public var isCanceled = false
public var id: Int?
public weak var operationDelegate: MainThreadOperationDelegate?
public var name: String? = "CloudKitRemoteNotificationOperation"
public var completionBlock: MainThreadOperation.MainThreadOperationCompletionBlock?
private weak var accountZone: CloudKitAccountZone?
private weak var articlesZone: CloudKitArticlesZone?
private var userInfo: [AnyHashable : Any]
init(accountZone: CloudKitAccountZone, articlesZone: CloudKitArticlesZone, userInfo: [AnyHashable : Any]) {
self.accountZone = accountZone
self.articlesZone = articlesZone
self.userInfo = userInfo
}
func run() {
guard let accountZone = accountZone, let articlesZone = articlesZone else {
self.operationDelegate?.operationDidComplete(self)
return
}
os_log(.debug, log: log, "Processing remote notification...")
accountZone.receiveRemoteNotification(userInfo: userInfo) {
articlesZone.receiveRemoteNotification(userInfo: self.userInfo) {
os_log(.debug, log: self.log, "Done processing remote notification.")
self.operationDelegate?.operationDidComplete(self)
}
}
}
}

View File

@ -0,0 +1,147 @@
//
// CloudKitSendStatusOperation.swift
// Account
//
// Created by Maurice Parker on 5/2/20.
// Copyright © 2020 Ranchero Software, LLC. All rights reserved.
//
import Foundation
import Articles
import os.log
import RSCore
import RSWeb
import SyncDatabase
class CloudKitSendStatusOperation: MainThreadOperation {
private var log = OSLog(subsystem: Bundle.main.bundleIdentifier!, category: "CloudKit")
// MainThreadOperation
public var isCanceled = false
public var id: Int?
public weak var operationDelegate: MainThreadOperationDelegate?
public var name: String? = "CloudKitSendStatusOperation"
public var completionBlock: MainThreadOperation.MainThreadOperationCompletionBlock?
private weak var account: Account?
private weak var articlesZone: CloudKitArticlesZone?
private weak var refreshProgress: DownloadProgress?
private var showProgress: Bool
private var database: SyncDatabase
init(account: Account, articlesZone: CloudKitArticlesZone, refreshProgress: DownloadProgress, showProgress: Bool, database: SyncDatabase) {
self.account = account
self.articlesZone = articlesZone
self.refreshProgress = refreshProgress
self.showProgress = showProgress
self.database = database
}
func run() {
os_log(.debug, log: log, "Sending article statuses...")
database.selectForProcessing { result in
func processStatuses(_ syncStatuses: [SyncStatus]) {
guard syncStatuses.count > 0 else {
os_log(.debug, log: self.log, "Done sending article statuses.")
self.operationDelegate?.operationDidComplete(self)
return
}
let group = DispatchGroup()
let syncStatusChunks = syncStatuses.chunked(into: 300)
if self.showProgress {
self.refreshProgress?.addToNumberOfTasksAndRemaining(syncStatusChunks.count)
}
for syncStatusChunk in syncStatusChunks {
group.enter()
self.sendArticleStatusChunk(syncStatuses: syncStatusChunk) {
group.leave()
}
}
group.notify(queue: DispatchQueue.main) {
os_log(.debug, log: self.log, "Done sending article statuses.")
self.operationDelegate?.operationDidComplete(self)
}
}
switch result {
case .success(let syncStatuses):
processStatuses(syncStatuses)
case .failure(let databaseError):
os_log(.error, log: self.log, "Send status error: %@.", databaseError.localizedDescription)
self.operationDelegate?.cancelOperation(self)
}
}
}
func sendArticleStatusChunk(syncStatuses: [SyncStatus], completion: @escaping () -> Void) {
guard let account = account, let articlesZone = articlesZone else {
completion()
return
}
let articleIDs = syncStatuses.map({ $0.articleID })
account.fetchArticlesAsync(.articleIDs(Set(articleIDs))) { result in
func processWithArticles(_ articles: Set<Article>) {
let syncStatusesDict = Dictionary(grouping: syncStatuses, by: { $0.articleID })
let articlesDict = articles.reduce(into: [String: Article]()) { result, article in
result[article.articleID] = article
}
let statusUpdates = syncStatusesDict.map { (key, value) in
return CloudKitArticleStatusUpdate(articleID: key, statuses: value, article: articlesDict[key])
}
articlesZone.modifyArticles(statusUpdates) { result in
switch result {
case .success:
self.database.deleteSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
if self.showProgress {
self.refreshProgress?.completeTask()
}
os_log(.debug, log: self.log, "Done sending article status block...")
completion()
}
case .failure(let error):
self.database.resetSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
self.processAccountError(account, error)
os_log(.error, log: self.log, "Send article status modify articles error: %@.", error.localizedDescription)
completion()
}
}
}
}
switch result {
case .success(let articles):
processWithArticles(articles)
case .failure(let databaseError):
self.database.resetSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
os_log(.error, log: self.log, "Send article status fetch articles error: %@.", databaseError.localizedDescription)
completion()
}
}
}
}
func processAccountError(_ account: Account, _ error: Error) {
if case CloudKitZoneError.userDeletedZone = error {
account.removeFeeds(account.topLevelWebFeeds)
for folder in account.folders ?? Set<Folder>() {
account.removeFolder(folder)
}
}
}
}

View File

@ -24,7 +24,6 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
private var syncBackgroundUpdateTask = UIBackgroundTaskIdentifier.invalid
var syncTimer: ArticleStatusSyncTimer?
private let remoteNotificationOperationQueue = MainThreadOperationQueue()
var shuttingDown = false {
didSet {
@ -112,9 +111,14 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
}
func application(_ application: UIApplication, didReceiveRemoteNotification userInfo: [AnyHashable : Any], fetchCompletionHandler completion: @escaping (UIBackgroundFetchResult) -> Void) {
let op = RemoteNotificationOperation(userInfo: userInfo, completion: completion)
remoteNotificationOperationQueue.add(op)
func application(_ application: UIApplication, didReceiveRemoteNotification userInfo: [AnyHashable : Any], fetchCompletionHandler completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
DispatchQueue.main.async {
self.resumeDatabaseProcessingIfNecessary()
AccountManager.shared.receiveRemoteNotification(userInfo: userInfo) {
self.suspendApplication()
completionHandler(.newData)
}
}
}
func applicationWillTerminate(_ application: UIApplication) {
@ -391,31 +395,3 @@ private extension AppDelegate {
}
}
class RemoteNotificationOperation: MainThreadOperation {
// MainThreadOperation
public var isCanceled = false
public var id: Int?
public weak var operationDelegate: MainThreadOperationDelegate?
public var name: String? = "RemoteNotificationOperation"
public var completionBlock: MainThreadOperation.MainThreadOperationCompletionBlock?
private var userInfo: [AnyHashable : Any]
private var completion: (UIBackgroundFetchResult) -> Void
init(userInfo: [AnyHashable : Any], completion: @escaping (UIBackgroundFetchResult) -> Void) {
self.userInfo = userInfo
self.completion = completion
}
func run() {
appDelegate.resumeDatabaseProcessingIfNecessary()
AccountManager.shared.receiveRemoteNotification(userInfo: userInfo) {
appDelegate.suspendApplication()
self.completion(.newData)
self.operationDelegate?.operationDidComplete(self)
}
}
}