Rework how feed subscriptions are managed.

This commit is contained in:
Maurice Parker 2020-04-05 10:49:15 -05:00
parent 0b87acec1e
commit f289735b50
4 changed files with 170 additions and 134 deletions

View File

@ -166,6 +166,10 @@ public final class Account: DisplayNameProvider, UnreadCountProvider, Container,
}
return _externalIDToWebFeedDictionary
}
var flattenedWebFeedURLs: Set<String> {
return Set(flattenedWebFeeds().map({ $0.url }))
}
var username: String? {
get {

View File

@ -178,7 +178,9 @@ final class CloudKitAccountDelegate: AccountDelegate {
let normalizedItems = OPMLNormalizer.normalize(opmlItems)
var webFeedURLs = Set<String>()
// Combine all existing web feed URLs with all the new ones
var webFeedURLs = account.flattenedWebFeedURLs
for opmlItem in normalizedItems {
if let webFeedURL = opmlItem.feedSpecifier?.feedURL {
webFeedURLs.insert(webFeedURL)
@ -193,37 +195,18 @@ final class CloudKitAccountDelegate: AccountDelegate {
}
}
refreshProgress.addToNumberOfTasksAndRemaining(webFeedURLs.count + 1)
var errorOccurred = false
// os_log(.error, log: self.log, "Error while subscribing to the feed: %@", error.localizedDescription)
// You have to single thread these or CloudKit gets overwhelmed and freaks out
func takeOneAndPassItOn(_ webFeedURLs: [String], completion: @escaping () -> Void) {
var remainingWebFeedURLS = webFeedURLs
if let webFeedURL = remainingWebFeedURLS.popLast() {
publicZone.createSubscription(webFeedURL) { result in
self.refreshProgress.completeTask()
if case .failure(let error) = result {
os_log(.error, log: self.log, "Error while subscribing to the feed: %@", error.localizedDescription)
errorOccurred = true
}
takeOneAndPassItOn(remainingWebFeedURLS, completion: completion)
}
} else {
completion()
}
}
takeOneAndPassItOn(Array(webFeedURLs)) {
if errorOccurred {
self.refreshProgress.completeTask()
completion(.failure(CloudKitZoneError.unknown))
} else {
refreshProgress.addToNumberOfTasksAndRemaining(2)
publicZone.manageSubscriptions(webFeedURLs) { result in
self.refreshProgress.completeTask()
switch result {
case .success:
self.accountZone.importOPML(rootExternalID: rootExternalID, items: normalizedItems) { _ in
self.refreshProgress.completeTask()
completion(.success(()))
self.refreshAll(for: account, completion: completion)
}
case .failure(let error):
completion(.failure(error))
}
}
@ -269,13 +252,13 @@ final class CloudKitAccountDelegate: AccountDelegate {
feed.externalID = externalID
container.addWebFeed(feed)
self.publicZone.createSubscription(feed.url) { result in
self.publicZone.manageSubscriptions(account.flattenedWebFeedURLs) { result in
self.refreshProgress.completeTask()
if case .failure(let error) = result {
os_log(.error, log: self.log, "An error occurred while creating the subscription: %@", error.localizedDescription)
}
}
InitialFeedDownloader.download(url) { parsedFeed in
self.refreshProgress.completeTask()
@ -328,7 +311,7 @@ final class CloudKitAccountDelegate: AccountDelegate {
case .success(let deleted):
container.removeWebFeed(feed)
if deleted {
self.publicZone.removeSubscription(feed, completion: completion)
self.publicZone.manageSubscriptions(account.flattenedWebFeedURLs, completion: completion)
} else {
completion(.success(()))
}
@ -490,7 +473,7 @@ final class CloudKitAccountDelegate: AccountDelegate {
}
func accountDidInitialize(_ account: Account) {
accountZone.delegate = CloudKitAcountZoneDelegate(account: account, publicZone: publicZone, refreshProgress: refreshProgress)
accountZone.delegate = CloudKitAcountZoneDelegate(account: account, refreshProgress: refreshProgress)
articlesZone.delegate = CloudKitArticlesZoneDelegate(account: account, database: database, articlesZone: articlesZone)
// Check to see if this is a new account and initialize anything we need

View File

@ -59,106 +59,47 @@ final class CloudKitPublicZone: CloudKitZone {
completion()
}
/// Create a CloudKit subscription for the webfeed and any other supporting records that we need
func createSubscription(_ webFeedURL: String, completion: @escaping (Result<Void, Error>) -> Void) {
let webFeedURLMD5String = webFeedURL.md5String
/// Create any new subscriptions and delete any old ones
func manageSubscriptions(_ webFeedURLs: Set<String>, completion: @escaping (Result<Void, Error>) -> Void) {
func createSubscription(_ webFeedRecordRef: CKRecord.Reference) {
let predicate = NSPredicate(format: "webFeed = %@", webFeedRecordRef)
let subscription = CKQuerySubscription(recordType: CloudKitWebFeed.recordType, predicate: predicate, options: [.firesOnRecordUpdate])
let info = CKSubscription.NotificationInfo()
info.shouldSendContentAvailable = true
info.desiredKeys = [CloudKitWebFeed.Fields.httpLastModified, CloudKitWebFeed.Fields.httpEtag]
subscription.notificationInfo = info
self.save(subscription) { result in
switch result {
case .success(let subscription):
let userSubscriptionRecord = CKRecord(recordType: CloudKitUserSubscription.recordType, recordID: self.generateRecordID())
userSubscriptionRecord[CloudKitUserSubscription.Fields.userRecordID] = self.container?.userRecordID
userSubscriptionRecord[CloudKitUserSubscription.Fields.webFeed] = webFeedRecordRef
userSubscriptionRecord[CloudKitUserSubscription.Fields.subscriptionID] = subscription.subscriptionID
var webFeedRecords = [CKRecord]()
for webFeedURL in webFeedURLs {
let webFeedRecordID = CKRecord.ID(recordName: webFeedURL.md5String, zoneID: Self.zoneID)
let webFeedRecord = CKRecord(recordType: CloudKitWebFeed.recordType, recordID: webFeedRecordID)
webFeedRecord[CloudKitWebFeed.Fields.url] = webFeedURL
webFeedRecord[CloudKitWebFeed.Fields.httpLastModified] = ""
webFeedRecord[CloudKitWebFeed.Fields.httpEtag] = ""
webFeedRecords.append(webFeedRecord)
}
self.save(userSubscriptionRecord, completion: completion)
self.saveIfNew(webFeedRecords) { _ in
var subscriptions = [CKSubscription]()
let webFeedURLChunks = Array(webFeedURLs).chunked(into: 20)
for webFeedURLChunk in webFeedURLChunks {
let predicate = NSPredicate(format: "url in %@", webFeedURLChunk)
let subscription = CKQuerySubscription(recordType: CloudKitWebFeed.recordType, predicate: predicate, options: [.firesOnRecordUpdate])
let info = CKSubscription.NotificationInfo()
info.shouldSendContentAvailable = true
info.desiredKeys = [CloudKitWebFeed.Fields.httpLastModified, CloudKitWebFeed.Fields.httpEtag]
subscription.notificationInfo = info
subscriptions.append(subscription)
}
self.fetchAllUserSubscriptions() { result in
switch result {
case .success(let subscriptionsToDelete):
let subscriptionToDeleteIDs = subscriptionsToDelete.map({ $0.subscriptionID })
self.modify(subscriptionsToSave: subscriptions, subscriptionIDsToDelete: subscriptionToDeleteIDs, completion: completion)
case .failure(let error):
completion(.failure(error))
}
}
}
fetch(externalID: webFeedURLMD5String) { result in
switch result {
case .success(let record):
let webFeedRecordRef = CKRecord.Reference(record: record, action: .none)
createSubscription(webFeedRecordRef)
case .failure:
let webFeedRecordID = CKRecord.ID(recordName: webFeedURLMD5String, zoneID: Self.zoneID)
let webFeedRecordRef = CKRecord.Reference(recordID: webFeedRecordID, action: .none)
let webFeedRecord = CKRecord(recordType: CloudKitWebFeed.recordType, recordID: webFeedRecordID)
webFeedRecord[CloudKitWebFeed.Fields.url] = webFeedURL
webFeedRecord[CloudKitWebFeed.Fields.httpLastModified] = ""
webFeedRecord[CloudKitWebFeed.Fields.httpEtag] = ""
let webFeedCheckRecord = CKRecord(recordType: CloudKitWebFeedCheck.recordType, recordID: self.generateRecordID())
webFeedRecord[CloudKitWebFeedCheck.Fields.webFeed] = webFeedRecordRef
webFeedRecord[CloudKitWebFeedCheck.Fields.lastCheck] = Date.distantPast
self.save([webFeedRecord, webFeedCheckRecord]) { result in
switch result {
case .success:
createSubscription(webFeedRecordRef)
case .failure(let error):
completion(.failure(error))
}
}
}
}
}
/// Remove the subscription for the given feed along with its supporting record
func removeSubscription(_ webFeed: WebFeed, completion: @escaping (Result<Void, Error>) -> Void) {
guard let userRecordID = self.container?.userRecordID else {
completion(.failure(CloudKitZoneError.invalidParameter))
return
}
let webFeedRecordID = CKRecord.ID(recordName: webFeed.url.md5String, zoneID: Self.zoneID)
let webFeedRecordRef = CKRecord.Reference(recordID: webFeedRecordID, action: .none)
let predicate = NSPredicate(format: "userRecordID = %@ AND webFeed = %@", userRecordID, webFeedRecordRef)
let ckQuery = CKQuery(recordType: CloudKitUserSubscription.recordType, predicate: predicate)
query(ckQuery) { result in
switch result {
case .success(let records):
if records.count > 0, let subscriptionID = records[0][CloudKitUserSubscription.Fields.subscriptionID] as? String {
self.delete(subscriptionID: subscriptionID) { result in
switch result {
case .success:
self.delete(recordID: records[0].recordID, completion: completion)
case .failure(let error):
completion(.failure(error))
}
}
} else {
os_log(.error, log: self.log, "Remove subscription error. The subscription wasn't found.")
completion(.success(()))
}
case .failure(let error):
completion(.failure(error))
}
}
}
}

View File

@ -200,6 +200,76 @@ extension CloudKitZone {
modify(recordsToSave: records, recordIDsToDelete: [], completion: completion)
}
/// Saves or modifies the records as long as they are unchanged relative to the local version
func saveIfNew(_ records: [CKRecord], completion: @escaping (Result<Void, Error>) -> Void) {
let op = CKModifyRecordsOperation(recordsToSave: records, recordIDsToDelete: [CKRecord.ID]())
op.savePolicy = .ifServerRecordUnchanged
op.isAtomic = false
op.modifyRecordsCompletionBlock = { [weak self] (_, _, error) in
guard let self = self else { return }
switch CloudKitZoneResult.resolve(error) {
case .success:
DispatchQueue.main.async {
completion(.success(()))
}
case .zoneNotFound:
self.createZoneRecord() { result in
switch result {
case .success:
self.saveIfNew(records, completion: completion)
case .failure(let error):
DispatchQueue.main.async {
completion(.failure(error))
}
}
}
case .userDeletedZone:
DispatchQueue.main.async {
completion(.failure(CloudKitZoneError.userDeletedZone))
}
case .retry(let timeToWait):
self.retryIfPossible(after: timeToWait) {
self.saveIfNew(records, completion: completion)
}
case .limitExceeded:
let chunkedRecords = records.chunked(into: 300)
let group = DispatchGroup()
var errorOccurred = false
for chunk in chunkedRecords {
group.enter()
self.saveIfNew(chunk) { result in
if case .failure(let error) = result {
os_log(.error, log: self.log, "%@ zone modify records error: %@", Self.zoneID.zoneName, error.localizedDescription)
errorOccurred = true
}
group.leave()
}
}
group.notify(queue: DispatchQueue.main) {
if errorOccurred {
completion(.failure(CloudKitZoneError.unknown))
} else {
completion(.success(()))
}
}
default:
DispatchQueue.main.async {
completion(.failure(CloudKitError(error!)))
}
}
}
database?.add(op)
}
/// Save the CKSubscription
func save(_ subscription: CKSubscription, completion: @escaping (Result<CKSubscription, Error>) -> Void) {
database?.save(subscription) { savedSubscription, error in
@ -266,18 +336,37 @@ extension CloudKitZone {
}
}
}
/// Bulk add (or modify I suppose) and delete of subscriptions
func modify(subscriptionsToSave: [CKSubscription], subscriptionIDsToDelete: [CKSubscription.ID], completion: @escaping (Result<Void, Error>) -> Void) {
let op = CKModifySubscriptionsOperation(subscriptionsToSave: subscriptionsToSave, subscriptionIDsToDelete: subscriptionIDsToDelete)
op.modifySubscriptionsCompletionBlock = { [weak self] (_, _, error) in
guard let self = self else { return }
switch CloudKitZoneResult.resolve(error) {
case .success:
DispatchQueue.main.async {
completion(.success(()))
}
case .retry(let timeToWait):
self.retryIfPossible(after: timeToWait) {
self.modify(subscriptionsToSave: subscriptionsToSave, subscriptionIDsToDelete: subscriptionIDsToDelete, completion: completion)
}
default:
DispatchQueue.main.async {
completion(.failure(CloudKitError(error!)))
}
}
}
database?.add(op)
}
/// Modify and delete the supplied CKRecords and CKRecord.IDs
func modify(recordsToSave: [CKRecord], recordIDsToDelete: [CKRecord.ID], completion: @escaping (Result<Void, Error>) -> Void) {
let op = CKModifyRecordsOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete)
// We use .changedKeys savePolicy to do unlocked changes here cause my app is contentious and off-line first
// Apple suggests using .ifServerRecordUnchanged save policy
// For more, see Advanced CloudKit(https://developer.apple.com/videos/play/wwdc2014/231/)
op.savePolicy = .changedKeys
// To avoid CKError.partialFailure, make the operation atomic (if one record fails to get modified, they all fail)
// If you want to handle partial failures, set .isAtomic to false and implement CKOperationResultType .fail(reason: .partialFailure) where appropriate
op.isAtomic = true
op.modifyRecordsCompletionBlock = { [weak self] (_, _, error) in
@ -309,7 +398,6 @@ extension CloudKitZone {
self.modify(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete, completion: completion)
}
case .limitExceeded:
let chunkedRecords = recordsToSave.chunked(into: 300)
let group = DispatchGroup()
@ -343,7 +431,27 @@ extension CloudKitZone {
database?.add(op)
}
/// Fetch all the subscriptions that a user has in the current database in all zones
func fetchAllUserSubscriptions(completion: @escaping (Result<[CKSubscription], Error>) -> Void) {
database?.fetchAllSubscriptions() { subscriptions, error in
switch CloudKitZoneResult.resolve(error) {
case .success:
DispatchQueue.main.async {
completion(.success((subscriptions!)))
}
case .retry(let timeToWait):
self.retryIfPossible(after: timeToWait) {
self.fetchAllUserSubscriptions(completion: completion)
}
default:
DispatchQueue.main.async {
completion(.failure(CloudKitError(error!)))
}
}
}
}
/// Fetch all the changes in the CKZone since the last time we checked
func fetchChangesInZone(completion: @escaping (Result<Void, Error>) -> Void) {