Make MainThreadOperationQueue MainActor.

This commit is contained in:
Brent Simmons 2024-03-24 12:05:39 -07:00
parent e867487031
commit ae4dcc0b27
16 changed files with 165 additions and 144 deletions

View File

@ -42,7 +42,7 @@ final class CloudKitAccountDelegate: AccountDelegate {
private let accountZone: CloudKitAccountZone private let accountZone: CloudKitAccountZone
private let articlesZone: CloudKitArticlesZone private let articlesZone: CloudKitArticlesZone
private let mainThreadOperationQueue = MainThreadOperationQueue() @MainActor private let mainThreadOperationQueue = MainThreadOperationQueue()
private lazy var refresher: LocalAccountRefresher = { private lazy var refresher: LocalAccountRefresher = {
let refresher = LocalAccountRefresher() let refresher = LocalAccountRefresher()

View File

@ -27,7 +27,7 @@ class CloudKitReceiveStatusOperation: MainThreadOperation {
self.articlesZone = articlesZone self.articlesZone = articlesZone
} }
func run() { @MainActor func run() {
guard let articlesZone = articlesZone else { guard let articlesZone = articlesZone else {
self.operationDelegate?.operationDidComplete(self) self.operationDelegate?.operationDidComplete(self)
return return

View File

@ -31,7 +31,7 @@ class CloudKitRemoteNotificationOperation: MainThreadOperation {
self.userInfo = userInfo self.userInfo = userInfo
} }
func run() { @MainActor func run() {
guard let accountZone = accountZone, let articlesZone = articlesZone else { guard let accountZone = accountZone, let articlesZone = articlesZone else {
self.operationDelegate?.operationDidComplete(self) self.operationDelegate?.operationDidComplete(self)
return return

View File

@ -41,20 +41,22 @@ class CloudKitSendStatusOperation: MainThreadOperation {
self.database = database self.database = database
} }
func run() { @MainActor func run() {
os_log(.debug, log: log, "Sending article statuses...") os_log(.debug, log: log, "Sending article statuses...")
if showProgress { if showProgress {
database.selectPendingCount() { result in database.selectPendingCount() { result in
switch result { MainActor.assumeIsolated {
case .success(let count): switch result {
let ticks = count / self.blockSize case .success(let count):
self.refreshProgress?.addToNumberOfTasksAndRemaining(ticks) let ticks = count / self.blockSize
self.selectForProcessing() self.refreshProgress?.addToNumberOfTasksAndRemaining(ticks)
case .failure(let databaseError): self.selectForProcessing()
os_log(.error, log: self.log, "Send status count pending error: %@.", databaseError.localizedDescription) case .failure(let databaseError):
self.operationDelegate?.cancelOperation(self) os_log(.error, log: self.log, "Send status count pending error: %@.", databaseError.localizedDescription)
self.operationDelegate?.cancelOperation(self)
}
} }
} }
@ -72,33 +74,36 @@ private extension CloudKitSendStatusOperation {
func selectForProcessing() { func selectForProcessing() {
database.selectForProcessing(limit: blockSize) { result in database.selectForProcessing(limit: blockSize) { result in
switch result {
case .success(let syncStatuses): MainActor.assumeIsolated {
switch result {
func stopProcessing() { case .success(let syncStatuses):
if self.showProgress {
self.refreshProgress?.completeTask() @MainActor func stopProcessing() {
if self.showProgress {
self.refreshProgress?.completeTask()
}
os_log(.debug, log: self.log, "Done sending article statuses.")
self.operationDelegate?.operationDidComplete(self)
} }
os_log(.debug, log: self.log, "Done sending article statuses.")
self.operationDelegate?.operationDidComplete(self) guard syncStatuses.count > 0 else {
}
guard syncStatuses.count > 0 else {
stopProcessing()
return
}
self.processStatuses(syncStatuses) { stop in
if stop {
stopProcessing() stopProcessing()
} else { return
self.selectForProcessing()
} }
self.processStatuses(syncStatuses) { stop in
if stop {
stopProcessing()
} else {
self.selectForProcessing()
}
}
case .failure(let databaseError):
os_log(.error, log: self.log, "Send status error: %@.", databaseError.localizedDescription)
self.operationDelegate?.cancelOperation(self)
} }
case .failure(let databaseError):
os_log(.error, log: self.log, "Send status error: %@.", databaseError.localizedDescription)
self.operationDelegate?.cancelOperation(self)
} }
} }
} }

View File

@ -65,8 +65,8 @@ final class FeedlyAccountDelegate: AccountDelegate {
private let database: SyncDatabase private let database: SyncDatabase
private weak var currentSyncAllOperation: MainThreadOperation? private weak var currentSyncAllOperation: MainThreadOperation?
private let operationQueue = MainThreadOperationQueue() @MainActor private let operationQueue = MainThreadOperationQueue()
init(dataFolder: String, transport: Transport?, api: FeedlyAPICaller.API, secretsProvider: SecretsProvider) { init(dataFolder: String, transport: Transport?, api: FeedlyAPICaller.API, secretsProvider: SecretsProvider) {
// Many operations have their own operation queues, such as the sync all operation. // Many operations have their own operation queues, such as the sync all operation.
// Making this a serial queue at this higher level of abstraction means we can ensure, // Making this a serial queue at this higher level of abstraction means we can ensure,
@ -550,8 +550,10 @@ final class FeedlyAccountDelegate: AccountDelegate {
/// Suspend all network activity /// Suspend all network activity
func suspendNetwork() { func suspendNetwork() {
caller.suspend() MainActor.assumeIsolated {
operationQueue.cancelAllOperations() caller.suspend()
operationQueue.cancelAllOperations()
}
} }
/// Suspend the SQLLite databases /// Suspend the SQLLite databases
@ -572,7 +574,7 @@ final class FeedlyAccountDelegate: AccountDelegate {
extension FeedlyAccountDelegate: FeedlyAPICallerDelegate { extension FeedlyAccountDelegate: FeedlyAPICallerDelegate {
func reauthorizeFeedlyAPICaller(_ caller: FeedlyAPICaller, completionHandler: @escaping (Bool) -> ()) { @MainActor func reauthorizeFeedlyAPICaller(_ caller: FeedlyAPICaller, completionHandler: @escaping (Bool) -> ()) {
guard let account = initializedAccount else { guard let account = initializedAccount else {
completionHandler(false) completionHandler(false)
return return
@ -600,8 +602,6 @@ extension FeedlyAccountDelegate: FeedlyAPICallerDelegate {
completionHandler(refreshAccessTokenDelegate.didReauthorize && !operation.isCanceled) completionHandler(refreshAccessTokenDelegate.didReauthorize && !operation.isCanceled)
} }
Task { @MainActor in MainThreadOperationQueue.shared.add(refreshAccessToken)
MainThreadOperationQueue.shared.add(refreshAccessToken)
}
} }
} }

View File

@ -51,7 +51,7 @@ public enum OAuthAccountAuthorizationOperationError: LocalizedError {
self.oauthClient = Account.oauthAuthorizationClient(for: accountType, secretsProvider: secretsProvider) self.oauthClient = Account.oauthAuthorizationClient(for: accountType, secretsProvider: secretsProvider)
} }
public func run() { @MainActor public func run() {
assert(presentationAnchor != nil, "\(self) outlived presentation anchor.") assert(presentationAnchor != nil, "\(self) outlived presentation anchor.")
let request = Account.oauthAuthorizationCodeGrantRequest(for: accountType, secretsProvider: secretsProvider) let request = Account.oauthAuthorizationCodeGrantRequest(for: accountType, secretsProvider: secretsProvider)
@ -101,28 +101,30 @@ public enum OAuthAccountAuthorizationOperationError: LocalizedError {
} }
private func didEndAuthentication(url: URL?, error: Error?) { private func didEndAuthentication(url: URL?, error: Error?) {
guard !isCanceled else { MainActor.assumeIsolated {
didFinish() guard !isCanceled else {
return didFinish()
} return
}
do {
guard let url = url else { do {
if let error = error { guard let url = url else {
throw error if let error = error {
} throw error
throw URLError(.badURL) }
throw URLError(.badURL)
}
let response = try OAuthAuthorizationResponse(url: url, client: oauthClient)
Account.requestOAuthAccessToken(with: response, client: oauthClient, accountType: accountType, secretsProvider: secretsProvider, completion: didEndRequestingAccessToken(_:))
} catch is ASWebAuthenticationSessionError {
didFinish() // Primarily, cancellation.
} catch {
didFinish(error)
} }
let response = try OAuthAuthorizationResponse(url: url, client: oauthClient)
Account.requestOAuthAccessToken(with: response, client: oauthClient, accountType: accountType, secretsProvider: secretsProvider, completion: didEndRequestingAccessToken(_:))
} catch is ASWebAuthenticationSessionError {
didFinish() // Primarily, cancellation.
} catch {
didFinish(error)
} }
} }
@ -174,12 +176,12 @@ public enum OAuthAccountAuthorizationOperationError: LocalizedError {
// MARK: Managing Operation State // MARK: Managing Operation State
private func didFinish() { @MainActor private func didFinish() {
assert(Thread.isMainThread) assert(Thread.isMainThread)
operationDelegate?.operationDidComplete(self) operationDelegate?.operationDidComplete(self)
} }
private func didFinish(_ error: Error) { @MainActor private func didFinish(_ error: Error) {
assert(Thread.isMainThread) assert(Thread.isMainThread)
delegate?.oauthAccountAuthorizationOperation(self, didFailWith: error) delegate?.oauthAccountAuthorizationOperation(self, didFailWith: error)
didFinish() didFinish()

View File

@ -12,7 +12,7 @@ import RSWeb
import Secrets import Secrets
import Core import Core
class FeedlyAddExistingFeedOperation: FeedlyOperation, FeedlyOperationDelegate, FeedlyCheckpointOperationDelegate { @MainActor final class FeedlyAddExistingFeedOperation: FeedlyOperation, FeedlyOperationDelegate, FeedlyCheckpointOperationDelegate {
private let operationQueue = MainThreadOperationQueue() private let operationQueue = MainThreadOperationQueue()
var addCompletionHandler: ((Result<Void, Error>) -> ())? var addCompletionHandler: ((Result<Void, Error>) -> ())?

View File

@ -23,13 +23,15 @@ final class FeedlyFetchIdsForMissingArticlesOperation: FeedlyOperation, FeedlyEn
override func run() { override func run() {
account.fetchArticleIDsForStatusesWithoutArticlesNewerThanCutoffDate { result in account.fetchArticleIDsForStatusesWithoutArticlesNewerThanCutoffDate { result in
switch result { MainActor.assumeIsolated {
case .success(let articleIds): switch result {
self.entryIds.formUnion(articleIds) case .success(let articleIds):
self.didFinish() self.entryIds.formUnion(articleIds)
self.didFinish()
case .failure(let error):
self.didFinish(with: error) case .failure(let error):
self.didFinish(with: error)
}
} }
} }
} }

View File

@ -78,14 +78,16 @@ final class FeedlyIngestStarredArticleIdsOperation: FeedlyOperation {
} }
database.selectPendingStarredStatusArticleIDs { result in database.selectPendingStarredStatusArticleIDs { result in
switch result { MainActor.assumeIsolated {
case .success(let pendingArticleIds): switch result {
self.remoteEntryIds.subtract(pendingArticleIds) case .success(let pendingArticleIds):
self.remoteEntryIds.subtract(pendingArticleIds)
self.updateStarredStatuses()
self.updateStarredStatuses()
case .failure(let error):
self.didFinish(with: error) case .failure(let error):
self.didFinish(with: error)
}
} }
} }
} }
@ -97,12 +99,14 @@ final class FeedlyIngestStarredArticleIdsOperation: FeedlyOperation {
} }
account.fetchStarredArticleIDs { result in account.fetchStarredArticleIDs { result in
switch result { MainActor.assumeIsolated {
case .success(let localStarredArticleIDs): switch result {
self.processStarredArticleIDs(localStarredArticleIDs) case .success(let localStarredArticleIDs):
self.processStarredArticleIDs(localStarredArticleIDs)
case .failure(let error):
self.didFinish(with: error) case .failure(let error):
self.didFinish(with: error)
}
} }
} }
} }

View File

@ -52,19 +52,22 @@ class FeedlyIngestStreamArticleIdsOperation: FeedlyOperation {
switch result { switch result {
case .success(let streamIds): case .success(let streamIds):
account.createStatusesIfNeeded(articleIDs: Set(streamIds.ids)) { databaseError in account.createStatusesIfNeeded(articleIDs: Set(streamIds.ids)) { databaseError in
if let error = databaseError { MainActor.assumeIsolated {
self.didFinish(with: error) if let error = databaseError {
return self.didFinish(with: error)
return
}
guard let continuation = streamIds.continuation else {
os_log(.debug, log: self.log, "Reached end of stream for %@", self.resource.id)
self.didFinish()
return
}
self.getStreamIds(continuation)
} }
guard let continuation = streamIds.continuation else {
os_log(.debug, log: self.log, "Reached end of stream for %@", self.resource.id)
self.didFinish()
return
}
self.getStreamIds(continuation)
} }
case .failure(let error): case .failure(let error):
didFinish(with: error) didFinish(with: error)

View File

@ -79,14 +79,16 @@ final class FeedlyIngestUnreadArticleIdsOperation: FeedlyOperation {
} }
database.selectPendingReadStatusArticleIDs { result in database.selectPendingReadStatusArticleIDs { result in
switch result { MainActor.assumeIsolated {
case .success(let pendingArticleIds): switch result {
self.remoteEntryIds.subtract(pendingArticleIds) case .success(let pendingArticleIds):
self.remoteEntryIds.subtract(pendingArticleIds)
self.updateUnreadStatuses()
self.updateUnreadStatuses()
case .failure(let error):
self.didFinish(with: error) case .failure(let error):
self.didFinish(with: error)
}
} }
} }
} }
@ -98,12 +100,14 @@ final class FeedlyIngestUnreadArticleIdsOperation: FeedlyOperation {
} }
account.fetchUnreadArticleIDs { result in account.fetchUnreadArticleIDs { result in
switch result { MainActor.assumeIsolated {
case .success(let localUnreadArticleIDs): switch result {
self.processUnreadArticleIDs(localUnreadArticleIDs) case .success(let localUnreadArticleIDs):
self.processUnreadArticleIDs(localUnreadArticleIDs)
case .failure(let error):
self.didFinish(with: error) case .failure(let error):
self.didFinish(with: error)
}
} }
} }
} }

View File

@ -18,7 +18,7 @@ protocol FeedlyOperationDelegate: AnyObject {
/// ///
/// Normally we dont do inheritance but in this case /// Normally we dont do inheritance but in this case
/// its the best option. /// its the best option.
class FeedlyOperation: MainThreadOperation { @MainActor class FeedlyOperation: MainThreadOperation {
weak var delegate: FeedlyOperationDelegate? weak var delegate: FeedlyOperationDelegate?
var downloadProgress: DownloadProgress? { var downloadProgress: DownloadProgress? {

View File

@ -29,16 +29,18 @@ final class FeedlySendArticleStatusesOperation: FeedlyOperation {
os_log(.debug, log: log, "Sending article statuses...") os_log(.debug, log: log, "Sending article statuses...")
database.selectForProcessing { result in database.selectForProcessing { result in
if self.isCanceled { MainActor.assumeIsolated {
self.didFinish() if self.isCanceled {
return self.didFinish()
} return
}
switch result {
case .success(let syncStatuses): switch result {
self.processStatuses(syncStatuses) case .success(let syncStatuses):
case .failure: self.processStatuses(syncStatuses)
self.didFinish() case .failure:
self.didFinish()
}
} }
} }
} }

View File

@ -28,13 +28,15 @@ final class FeedlyUpdateAccountFeedsWithItemsOperation: FeedlyOperation {
let feedIDsAndItems = organisedItemsProvider.parsedItemsKeyedByFeedId let feedIDsAndItems = organisedItemsProvider.parsedItemsKeyedByFeedId
account.update(feedIDsAndItems: feedIDsAndItems, defaultRead: true) { databaseError in account.update(feedIDsAndItems: feedIDsAndItems, defaultRead: true) { databaseError in
if let error = databaseError { MainActor.assumeIsolated {
self.didFinish(with: error) if let error = databaseError {
return self.didFinish(with: error)
return
}
os_log(.debug, log: self.log, "Updated %i feeds for \"%@\"", feedIDsAndItems.count, self.organisedItemsProvider.parsedItemsByFeedProviderName)
self.didFinish()
} }
os_log(.debug, log: self.log, "Updated %i feeds for \"%@\"", feedIDsAndItems.count, self.organisedItemsProvider.parsedItemsByFeedProviderName)
self.didFinish()
} }
} }
} }

View File

@ -75,15 +75,15 @@ public protocol MainThreadOperation: AnyObject {
public extension MainThreadOperation { public extension MainThreadOperation {
func cancel() { @MainActor func cancel() {
operationDelegate?.cancelOperation(self) operationDelegate?.cancelOperation(self)
} }
func addDependency(_ parentOperation: MainThreadOperation) { @MainActor func addDependency(_ parentOperation: MainThreadOperation) {
operationDelegate?.make(self, dependOn: parentOperation) operationDelegate?.make(self, dependOn: parentOperation)
} }
func informOperationDelegateOfCompletion() { @MainActor func informOperationDelegateOfCompletion() {
guard !isCanceled else { guard !isCanceled else {
return return
} }

View File

@ -9,9 +9,10 @@
import Foundation import Foundation
public protocol MainThreadOperationDelegate: AnyObject { public protocol MainThreadOperationDelegate: AnyObject {
func operationDidComplete(_ operation: MainThreadOperation)
func cancelOperation(_ operation: MainThreadOperation) @MainActor func operationDidComplete(_ operation: MainThreadOperation)
func make(_ childOperation: MainThreadOperation, dependOn parentOperation: MainThreadOperation) @MainActor func cancelOperation(_ operation: MainThreadOperation)
@MainActor func make(_ childOperation: MainThreadOperation, dependOn parentOperation: MainThreadOperation)
} }
/// Manage a queue of MainThreadOperation tasks. /// Manage a queue of MainThreadOperation tasks.
@ -23,7 +24,7 @@ public protocol MainThreadOperationDelegate: AnyObject {
/// Use this only on the main thread. /// Use this only on the main thread.
/// The operation can be suspended and resumed. /// The operation can be suspended and resumed.
/// It is *not* suspended on creation. /// It is *not* suspended on creation.
public final class MainThreadOperationQueue { @MainActor public final class MainThreadOperationQueue {
/// Use the shared queue when you dont need to create a separate queue. /// Use the shared queue when you dont need to create a separate queue.
@MainActor public static let shared: MainThreadOperationQueue = { @MainActor public static let shared: MainThreadOperationQueue = {
@ -46,10 +47,6 @@ public final class MainThreadOperationQueue {
// Silence compiler complaint about init not being public. // Silence compiler complaint about init not being public.
} }
deinit {
cancelAllOperations()
}
/// Add an operation to the queue. /// Add an operation to the queue.
@MainActor public func add(_ operation: MainThreadOperation) { @MainActor public func add(_ operation: MainThreadOperation) {
precondition(Thread.isMainThread) precondition(Thread.isMainThread)
@ -132,7 +129,7 @@ public final class MainThreadOperationQueue {
extension MainThreadOperationQueue: MainThreadOperationDelegate { extension MainThreadOperationQueue: MainThreadOperationDelegate {
public func operationDidComplete(_ operation: MainThreadOperation) { @MainActor public func operationDidComplete(_ operation: MainThreadOperation) {
precondition(Thread.isMainThread) precondition(Thread.isMainThread)
operationDidFinish(operation) operationDidFinish(operation)
} }