Follows the continuation until the starred articles stream has been consumed.

This commit is contained in:
Kiel Gillard 2019-10-16 22:12:37 +11:00
parent 16be64c440
commit 05cb3773de
6 changed files with 151 additions and 90 deletions

View File

@ -89,12 +89,7 @@ final class FeedlyAPICaller {
} }
} }
func getStream(for collection: FeedlyCollection, newerThan: Date? = nil, unreadOnly: Bool? = nil, completionHandler: @escaping (Result<FeedlyStream, Error>) -> ()) { func getStream(for resource: FeedlyResourceId, continuation: String? = nil, newerThan: Date?, unreadOnly: Bool?, completionHandler: @escaping (Result<FeedlyStream, Error>) -> ()) {
let id = FeedlyCategoryResourceId(id: collection.id)
getStream(for: id, newerThan: newerThan, unreadOnly: unreadOnly, completionHandler: completionHandler)
}
func getStream(for resource: FeedlyResourceId, newerThan: Date?, unreadOnly: Bool?, completionHandler: @escaping (Result<FeedlyStream, Error>) -> ()) {
guard let accessToken = credentials?.secret else { guard let accessToken = credentials?.secret else {
return DispatchQueue.main.async { return DispatchQueue.main.async {
completionHandler(.failure(CredentialsError.incompleteCredentials)) completionHandler(.failure(CredentialsError.incompleteCredentials))
@ -103,7 +98,7 @@ final class FeedlyAPICaller {
var components = baseUrlComponents var components = baseUrlComponents
components.path = "/v3/streams/contents" components.path = "/v3/streams/contents"
// If you change these, check AccountFeedlySyncTest.set(testFiles:with:).
var queryItems = [URLQueryItem]() var queryItems = [URLQueryItem]()
if let date = newerThan { if let date = newerThan {
@ -118,6 +113,11 @@ final class FeedlyAPICaller {
queryItems.append(queryItem) queryItems.append(queryItem)
} }
if let value = continuation, !value.isEmpty {
let queryItem = URLQueryItem(name: "continuation", value: value)
queryItems.append(queryItem)
}
queryItems.append(contentsOf: [ queryItems.append(contentsOf: [
URLQueryItem(name: "count", value: "1000"), URLQueryItem(name: "count", value: "1000"),
URLQueryItem(name: "streamId", value: resource.id), URLQueryItem(name: "streamId", value: resource.id),

View File

@ -11,11 +11,28 @@ import Foundation
/// An operation with a queue of its own. /// An operation with a queue of its own.
final class FeedlyCompoundOperation: FeedlyOperation { final class FeedlyCompoundOperation: FeedlyOperation {
private let operationQueue = OperationQueue() private let operationQueue = OperationQueue()
private let operations: [Operation] private var finishOperation: BlockOperation?
init(operations: [Operation]) { init(operations: [Operation]) {
assert(!operations.isEmpty) assert(!operations.isEmpty)
self.operations = operations operationQueue.isSuspended = true
finishOperation = nil
super.init()
let finish = BlockOperation {
self.didFinish()
}
finishOperation = finish
for operation in operations {
finish.addDependency(operation)
}
var initialOperations = operations
initialOperations.append(finish)
operationQueue.addOperations(initialOperations, waitUntilFinished: false)
} }
convenience init(operationsBlock: () -> ([Operation])) { convenience init(operationsBlock: () -> ([Operation])) {
@ -24,18 +41,17 @@ final class FeedlyCompoundOperation: FeedlyOperation {
} }
override func main() { override func main() {
let finishOperation = BlockOperation { [weak self] in guard !isCancelled else {
self?.didFinish() didFinish()
return
}
operationQueue.isSuspended = false
} }
for operation in operations { func addAnotherOperation(_ operation: Operation) {
finishOperation.addDependency(operation) guard !isCancelled else { return }
} finishOperation?.addDependency(operation)
operationQueue.addOperation(operation)
var operationsWithFinish = operations
operationsWithFinish.append(finishOperation)
operationQueue.addOperations(operationsWithFinish, waitUntilFinished: false)
} }
override func cancel() { override func cancel() {

View File

@ -19,11 +19,13 @@ class FeedlyOperation: Operation {
weak var delegate: FeedlyOperationDelegate? weak var delegate: FeedlyOperationDelegate?
func didFinish() { func didFinish() {
assert(Thread.isMainThread)
self.isExecutingOperation = false self.isExecutingOperation = false
self.isFinishedOperation = true self.isFinishedOperation = true
} }
func didFinish(_ error: Error) { func didFinish(_ error: Error) {
assert(Thread.isMainThread)
delegate?.feedlyOperation(self, didFailWith: error) delegate?.feedlyOperation(self, didFailWith: error)
didFinish() didFinish()
} }

View File

@ -15,6 +15,10 @@ protocol FeedlyEntryProviding: class {
var parsedEntries: Set<ParsedItem> { get } var parsedEntries: Set<ParsedItem> { get }
} }
protocol FeedlyGetStreamOperationDelegate: class {
func feedlyGetStreamOperation(_ operation: FeedlyGetStreamOperation, didGet stream: FeedlyStream)
}
/// Single responsibility is to get the stream content of a Collection from Feedly. /// Single responsibility is to get the stream content of a Collection from Feedly.
final class FeedlyGetStreamOperation: FeedlyOperation, FeedlyEntryProviding { final class FeedlyGetStreamOperation: FeedlyOperation, FeedlyEntryProviding {
@ -30,7 +34,8 @@ final class FeedlyGetStreamOperation: FeedlyOperation, FeedlyEntryProviding {
var entries: [FeedlyEntry] { var entries: [FeedlyEntry] {
guard let entries = storedStream?.items else { guard let entries = storedStream?.items else {
assertionFailure("Has a prior operation finished too early? Is the operation included in \(self.dependencies)?") assert(isFinished, "This should only be called when the operation finishes without error.")
assertionFailure("Has this operation been addeded as a dependency on the caller?")
return [] return []
} }
return entries return entries
@ -55,16 +60,19 @@ final class FeedlyGetStreamOperation: FeedlyOperation, FeedlyEntryProviding {
private var storedParsedEntries: Set<ParsedItem>? private var storedParsedEntries: Set<ParsedItem>?
let account: Account let account: Account
let caller: FeedlyAPICaller let caller: FeedlyAPICaller
let unreadOnly: Bool? let unreadOnly: Bool?
let newerThan: Date? let newerThan: Date?
let continuation: String?
init(account: Account, resource: FeedlyResourceId, caller: FeedlyAPICaller, newerThan: Date?, unreadOnly: Bool? = nil) { weak var streamDelegate: FeedlyGetStreamOperationDelegate?
init(account: Account, resource: FeedlyResourceId, caller: FeedlyAPICaller, continuation: String? = nil, newerThan: Date?, unreadOnly: Bool? = nil) {
self.account = account self.account = account
self.resourceProvider = ResourceProvider(resource: resource) self.resourceProvider = ResourceProvider(resource: resource)
self.caller = caller self.caller = caller
self.continuation = continuation
self.unreadOnly = unreadOnly self.unreadOnly = unreadOnly
self.newerThan = newerThan self.newerThan = newerThan
} }
@ -79,11 +87,15 @@ final class FeedlyGetStreamOperation: FeedlyOperation, FeedlyEntryProviding {
return return
} }
caller.getStream(for: resourceProvider.resource, newerThan: newerThan, unreadOnly: unreadOnly) { result in caller.getStream(for: resourceProvider.resource, continuation: continuation, newerThan: newerThan, unreadOnly: unreadOnly) { result in
switch result { switch result {
case .success(let stream): case .success(let stream):
self.storedStream = stream self.storedStream = stream
self.streamDelegate?.feedlyGetStreamOperation(self, didGet: stream)
self.didFinish() self.didFinish()
case .failure(let error): case .failure(let error):
self.didFinish(error) self.didFinish(error)
} }

View File

@ -8,18 +8,86 @@
import Foundation import Foundation
import os.log import os.log
import RSParser
final class FeedlySyncStarredArticlesOperation: FeedlyOperation { final class FeedlySyncStarredArticlesOperation: FeedlyOperation, FeedlyOperationDelegate, FeedlyGetStreamOperationDelegate {
private let account: Account private let account: Account
private let operationQueue: OperationQueue private let operationQueue: OperationQueue
private let caller: FeedlyAPICaller private let caller: FeedlyAPICaller
private let log: OSLog private let log: OSLog
init(account: Account, caller: FeedlyAPICaller, log: OSLog) { private let setStatuses: FeedlySetStarredArticlesOperation
/// Buffers every starred/saved entry from every page.
private class StarredEntryProvider: FeedlyEntryProviding {
var resource: FeedlyResourceId
private(set) var parsedEntries = Set<ParsedItem>()
private(set) var entries = [FeedlyEntry]()
init(resource: FeedlyResourceId) {
self.resource = resource
}
func addEntries(from provider: FeedlyEntryProviding) {
entries.append(contentsOf: provider.entries)
parsedEntries.formUnion(provider.parsedEntries)
}
}
private let entryProvider: StarredEntryProvider
init(account: Account, credentials: Credentials, caller: FeedlyAPICaller, log: OSLog) {
self.account = account self.account = account
self.caller = caller self.caller = caller
self.operationQueue = OperationQueue() self.operationQueue = OperationQueue()
self.operationQueue.isSuspended = true
self.log = log self.log = log
let saved = FeedlyTagResourceId.saved(for: credentials.username)
let provider = StarredEntryProvider(resource: saved)
self.entryProvider = provider
self.setStatuses = FeedlySetStarredArticlesOperation(account: account,
allStarredEntriesProvider: provider,
log: log)
super.init()
let getFirstPage = FeedlyGetStreamOperation(account: account,
resource: saved,
caller: caller,
newerThan: nil)
let organiseByFeed = FeedlyOrganiseParsedItemsByFeedOperation(account: account,
entryProvider: provider,
log: log)
let updateAccount = FeedlyUpdateAccountFeedsWithItemsOperation(account: account,
organisedItemsProvider: organiseByFeed,
log: log)
getFirstPage.delegate = self
getFirstPage.streamDelegate = self
setStatuses.addDependency(getFirstPage)
setStatuses.delegate = self
organiseByFeed.addDependency(setStatuses)
organiseByFeed.delegate = self
updateAccount.addDependency(organiseByFeed)
updateAccount.delegate = self
let finishOperation = BlockOperation { [weak self] in
DispatchQueue.main.async {
self?.didFinish()
}
}
finishOperation.addDependency(updateAccount)
let operations = [getFirstPage, setStatuses, organiseByFeed, updateAccount, finishOperation]
operationQueue.addOperations(operations, waitUntilFinished: false)
} }
override func cancel() { override func cancel() {
@ -33,73 +101,31 @@ final class FeedlySyncStarredArticlesOperation: FeedlyOperation {
return return
} }
guard let user = caller.credentials?.username else { operationQueue.isSuspended = false
didFinish(FeedlyAccountDelegateError.notLoggedIn) }
func feedlyGetStreamOperation(_ operation: FeedlyGetStreamOperation, didGet stream: FeedlyStream) {
entryProvider.addEntries(from: operation)
os_log(.debug, log: log, "Collecting %i items from %@", stream.items.count, stream.id)
guard let continuation = stream.continuation else {
return return
} }
class Delegate: FeedlyOperationDelegate { let nextPageOperation = FeedlyGetStreamOperation(account: operation.account,
var error: Error? resource: operation.resource,
weak var compoundOperation: FeedlyCompoundOperation? caller: operation.caller,
continuation: continuation,
newerThan: operation.newerThan)
nextPageOperation.delegate = self
nextPageOperation.streamDelegate = self
setStatuses.addDependency(nextPageOperation)
operationQueue.addOperation(nextPageOperation)
}
func feedlyOperation(_ operation: FeedlyOperation, didFailWith error: Error) { func feedlyOperation(_ operation: FeedlyOperation, didFailWith error: Error) {
compoundOperation?.cancel() operationQueue.cancelAllOperations()
self.error = error didFinish(error)
} }
}
let delegate = Delegate()
let syncSaved = FeedlyCompoundOperation {
let saved = FeedlyTagResourceId.saved(for: user)
os_log(.debug, log: log, "Getting starred articles from \"%@\".", saved.id)
let getSavedStream = FeedlyGetStreamOperation(account: account,
resource: saved,
caller: caller,
newerThan: nil)
getSavedStream.delegate = delegate
// set statuses
let setStatuses = FeedlySetStarredArticlesOperation(account: account,
allStarredEntriesProvider: getSavedStream,
log: log)
setStatuses.delegate = delegate
setStatuses.addDependency(getSavedStream)
// ingest articles
let organiseByFeed = FeedlyOrganiseParsedItemsByFeedOperation(account: account,
entryProvider: getSavedStream,
log: log)
organiseByFeed.delegate = delegate
organiseByFeed.addDependency(setStatuses)
let updateAccount = FeedlyUpdateAccountFeedsWithItemsOperation(account: account,
organisedItemsProvider: organiseByFeed,
log: log)
updateAccount.delegate = delegate
updateAccount.addDependency(organiseByFeed)
return [getSavedStream, setStatuses, organiseByFeed, updateAccount]
}
delegate.compoundOperation = syncSaved
let finalOperation = BlockOperation { [weak self] in
guard let self = self else {
return
}
if let error = delegate.error {
self.didFinish(error)
} else {
self.didFinish()
}
os_log(.debug, log: self.log, "Done syncing starred articles.")
}
finalOperation.addDependency(syncSaved)
operationQueue.addOperations([syncSaved, finalOperation], waitUntilFinished: false)
}
} }

View File

@ -49,6 +49,11 @@ final class FeedlySyncStrategy {
return return
} }
guard let credentials = caller.credentials else {
completionHandler(.failure(FeedlyAccountDelegateError.notLoggedIn))
return
}
let sendArticleStatuses = FeedlySendArticleStatusesOperation(database: database, caller: caller, log: log) let sendArticleStatuses = FeedlySendArticleStatusesOperation(database: database, caller: caller, log: log)
sendArticleStatuses.delegate = self sendArticleStatuses.delegate = self
@ -85,7 +90,7 @@ final class FeedlySyncStrategy {
getCollectionStreams.queueDelegate = self getCollectionStreams.queueDelegate = self
getCollectionStreams.addDependency(getCollections) getCollectionStreams.addDependency(getCollections)
let syncStarred = FeedlySyncStarredArticlesOperation(account: account, caller: caller, log: log) let syncStarred = FeedlySyncStarredArticlesOperation(account: account, credentials: credentials, caller: caller, log: log)
syncStarred.addDependency(getCollections) syncStarred.addDependency(getCollections)
syncStarred.addDependency(mirrorCollectionsAsFolders) syncStarred.addDependency(mirrorCollectionsAsFolders)
syncStarred.addDependency(createFeedsOperation) syncStarred.addDependency(createFeedsOperation)