Fix concurrency warning.

This commit is contained in:
Brent Simmons 2024-05-03 10:27:27 -07:00
parent 325f8061de
commit 5c31993b90
1 changed files with 62 additions and 74 deletions

View File

@ -24,7 +24,6 @@ final class CloudKitArticlesZoneDelegate: CloudKitZoneDelegate {
weak var account: Account? weak var account: Account?
var database: SyncDatabase var database: SyncDatabase
weak var articlesZone: CloudKitArticlesZone? weak var articlesZone: CloudKitArticlesZone?
var compressionQueue = DispatchQueue(label: "Articles Zone Delegate Compression Queue")
init(account: Account, database: SyncDatabase, articlesZone: CloudKitArticlesZone) { init(account: Account, database: SyncDatabase, articlesZone: CloudKitArticlesZone) {
self.account = account self.account = account
@ -42,13 +41,13 @@ final class CloudKitArticlesZoneDelegate: CloudKitZoneDelegate {
await self.delete(recordKeys: deleted, pendingStarredStatusArticleIDs: pendingStarredStatusArticleIDs) await self.delete(recordKeys: deleted, pendingStarredStatusArticleIDs: pendingStarredStatusArticleIDs)
self.update(records: changed, try await self.update(records: changed,
pendingReadStatusArticleIDs: pendingReadStatusArticleIDs, pendingReadStatusArticleIDs: pendingReadStatusArticleIDs,
pendingStarredStatusArticleIDs: pendingStarredStatusArticleIDs, pendingStarredStatusArticleIDs: pendingStarredStatusArticleIDs)
completion: completion) completion(.success(()))
} catch { } catch {
os_log(.error, log: self.log, "Error occurred getting pending status records: %@", error.localizedDescription) os_log(.error, log: self.log, "Error in CloudKitArticlesZoneDelegate.cloudKitDidModify: %@", error.localizedDescription)
completion(.failure(CloudKitZoneError.unknown)) completion(.failure(CloudKitZoneError.unknown))
} }
} }
@ -71,7 +70,7 @@ private extension CloudKitArticlesZoneDelegate {
try? await account?.delete(articleIDs: deletableArticleIDs) try? await account?.delete(articleIDs: deletableArticleIDs)
} }
@MainActor func update(records: [CKRecord], pendingReadStatusArticleIDs: Set<String>, pendingStarredStatusArticleIDs: Set<String>, completion: @escaping (Result<Void, Error>) -> Void) { @MainActor private func update(records: [CKRecord], pendingReadStatusArticleIDs: Set<String>, pendingStarredStatusArticleIDs: Set<String>) async throws {
let receivedUnreadArticleIDs = Set(records.filter({ $0[CloudKitArticlesZone.CloudKitArticleStatus.Fields.read] == "0" }).map({ stripPrefix($0.externalID) })) let receivedUnreadArticleIDs = Set(records.filter({ $0[CloudKitArticlesZone.CloudKitArticleStatus.Fields.read] == "0" }).map({ stripPrefix($0.externalID) }))
let receivedReadArticleIDs = Set(records.filter({ $0[CloudKitArticlesZone.CloudKitArticleStatus.Fields.read] == "1" }).map({ stripPrefix($0.externalID) })) let receivedReadArticleIDs = Set(records.filter({ $0[CloudKitArticlesZone.CloudKitArticleStatus.Fields.read] == "1" }).map({ stripPrefix($0.externalID) }))
@ -82,87 +81,76 @@ private extension CloudKitArticlesZoneDelegate {
let updateableReadArticleIDs = receivedReadArticleIDs.subtracting(pendingReadStatusArticleIDs) let updateableReadArticleIDs = receivedReadArticleIDs.subtracting(pendingReadStatusArticleIDs)
let updateableUnstarredArticleIDs = receivedUnstarredArticleIDs.subtracting(pendingStarredStatusArticleIDs) let updateableUnstarredArticleIDs = receivedUnstarredArticleIDs.subtracting(pendingStarredStatusArticleIDs)
let updateableStarredArticleIDs = receivedStarredArticleIDs.subtracting(pendingStarredStatusArticleIDs) let updateableStarredArticleIDs = receivedStarredArticleIDs.subtracting(pendingStarredStatusArticleIDs)
Task { @MainActor in
var errorOccurred = false var errorOccurred = false
do {
try await account?.markAsUnread(updateableUnreadArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing unread statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsRead(updateableReadArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing read statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsUnstarred(updateableUnstarredArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing unstarred statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsStarred(updateableStarredArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing starred statuses: %@", error.localizedDescription)
}
let parsedItems = await Self.makeParsedItems(records)
let feedIDsAndItems = Dictionary(grouping: parsedItems, by: { item in item.feedURL } ).mapValues { Set($0) }
for (feedID, parsedItems) in feedIDsAndItems {
do { do {
try await account?.markAsUnread(updateableUnreadArticleIDs) let articleChanges = try await self.account?.update(feedID: feedID, with: parsedItems, deleteOlder: false)
} catch { guard let deletes = articleChanges?.deletedArticles, !deletes.isEmpty else {
errorOccurred = true continue
os_log(.error, log: self.log, "Error occurred while storing unread statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsRead(updateableReadArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing read statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsUnstarred(updateableUnstarredArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing unstarred statuses: %@", error.localizedDescription)
}
do {
try await account?.markAsStarred(updateableStarredArticleIDs)
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing starred statuses: %@", error.localizedDescription)
}
let group = DispatchGroup()
group.enter()
compressionQueue.async {
let parsedItems = records.compactMap { Self.makeParsedItem($0) }
let feedIDsAndItems = Dictionary(grouping: parsedItems, by: { item in item.feedURL } ).mapValues { Set($0) }
Task { @MainActor in
for (feedID, parsedItems) in feedIDsAndItems {
group.enter()
do {
let articleChanges = try await self.account?.update(feedID: feedID, with: parsedItems, deleteOlder: false)
guard let deletes = articleChanges?.deletedArticles, !deletes.isEmpty else {
group.leave()
return
}
let syncStatuses = deletes.map { SyncStatus(articleID: $0.articleID, key: .deleted, flag: true) }
try? await self.database.insertStatuses(syncStatuses)
group.leave()
} catch {
errorOccurred = true
os_log(.error, log: self.log, "Error occurred while storing articles: %@", error.localizedDescription)
group.leave()
}
}
group.leave()
} }
}
group.notify(queue: DispatchQueue.main) { let syncStatuses = deletes.map { SyncStatus(articleID: $0.articleID, key: .deleted, flag: true) }
if errorOccurred { try? await self.database.insertStatuses(syncStatuses)
completion(.failure(CloudKitZoneError.unknown))
} else { } catch {
completion(.success(())) errorOccurred = true
} os_log(.error, log: self.log, "Error occurred while storing articles: %@", error.localizedDescription)
} }
} }
if errorOccurred {
throw CloudKitZoneError.unknown
}
} }
func stripPrefix(_ externalID: String) -> String { func stripPrefix(_ externalID: String) -> String {
return String(externalID[externalID.index(externalID.startIndex, offsetBy: 2)..<externalID.endIndex]) return String(externalID[externalID.index(externalID.startIndex, offsetBy: 2)..<externalID.endIndex])
} }
private static func makeParsedItems(_ articleRecords: [CKRecord]) async -> Set<ParsedItem> {
let task = Task.detached { () -> Set<ParsedItem> in
let parsedItems = articleRecords.compactMap { makeParsedItem($0) }
return Set(parsedItems)
}
return await task.value
}
static func makeParsedItem(_ articleRecord: CKRecord) -> ParsedItem? { static func makeParsedItem(_ articleRecord: CKRecord) -> ParsedItem? {
guard articleRecord.recordType == CloudKitArticlesZone.CloudKitArticle.recordType else { guard articleRecord.recordType == CloudKitArticlesZone.CloudKitArticle.recordType else {
return nil return nil