Prevent newly added sync status records from being deleted and process them incrementally
This commit is contained in:
parent
e681fd6402
commit
578d22f3c2
|
@ -16,6 +16,7 @@ import SyncDatabase
|
|||
class CloudKitSendStatusOperation: MainThreadOperation {
|
||||
|
||||
private var log = OSLog(subsystem: Bundle.main.bundleIdentifier!, category: "CloudKit")
|
||||
private let blockSize = 300
|
||||
|
||||
// MainThreadOperation
|
||||
public var isCanceled = false
|
||||
|
@ -41,50 +42,60 @@ class CloudKitSendStatusOperation: MainThreadOperation {
|
|||
func run() {
|
||||
os_log(.debug, log: log, "Sending article statuses...")
|
||||
|
||||
database.selectForProcessing { result in
|
||||
if showProgress {
|
||||
|
||||
database.selectPendingCount() { result in
|
||||
switch result {
|
||||
case .success(let count):
|
||||
let ticks = count / self.blockSize
|
||||
self.refreshProgress?.addToNumberOfTasksAndRemaining(ticks)
|
||||
self.selectForProcessing()
|
||||
case .failure(let databaseError):
|
||||
os_log(.error, log: self.log, "Send status count pending error: %@.", databaseError.localizedDescription)
|
||||
self.operationDelegate?.cancelOperation(self)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
selectForProcessing()
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private extension CloudKitSendStatusOperation {
|
||||
|
||||
func selectForProcessing() {
|
||||
database.selectForProcessing(limit: blockSize) { result in
|
||||
switch result {
|
||||
case .success(let syncStatuses):
|
||||
|
||||
func processStatuses(_ syncStatuses: [SyncStatus]) {
|
||||
guard syncStatuses.count > 0 else {
|
||||
if self.showProgress {
|
||||
self.refreshProgress?.completeTask()
|
||||
}
|
||||
os_log(.debug, log: self.log, "Done sending article statuses.")
|
||||
self.operationDelegate?.operationDidComplete(self)
|
||||
return
|
||||
}
|
||||
|
||||
let group = DispatchGroup()
|
||||
let syncStatusChunks = syncStatuses.chunked(into: 300)
|
||||
|
||||
if self.showProgress {
|
||||
self.refreshProgress?.addToNumberOfTasksAndRemaining(syncStatusChunks.count)
|
||||
self.processStatuses(syncStatuses) {
|
||||
self.selectForProcessing()
|
||||
}
|
||||
|
||||
for syncStatusChunk in syncStatusChunks {
|
||||
group.enter()
|
||||
self.sendArticleStatusChunk(syncStatuses: syncStatusChunk) {
|
||||
group.leave()
|
||||
}
|
||||
}
|
||||
|
||||
group.notify(queue: DispatchQueue.global(qos: .background)) {
|
||||
os_log(.debug, log: self.log, "Done sending article statuses.")
|
||||
DispatchQueue.main.async {
|
||||
self.operationDelegate?.operationDidComplete(self)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
switch result {
|
||||
case .success(let syncStatuses):
|
||||
processStatuses(syncStatuses)
|
||||
case .failure(let databaseError):
|
||||
|
||||
os_log(.error, log: self.log, "Send status error: %@.", databaseError.localizedDescription)
|
||||
self.operationDelegate?.cancelOperation(self)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func sendArticleStatusChunk(syncStatuses: [SyncStatus], completion: @escaping () -> Void) {
|
||||
func processStatuses(_ syncStatuses: [SyncStatus], completion: @escaping () -> Void) {
|
||||
guard let account = account, let articlesZone = articlesZone else {
|
||||
completion()
|
||||
return
|
||||
|
@ -107,7 +118,8 @@ class CloudKitSendStatusOperation: MainThreadOperation {
|
|||
switch result {
|
||||
case .success:
|
||||
self.database.deleteSelectedForProcessing(syncStatuses.map({ $0.articleID })) { _ in
|
||||
if self.showProgress {
|
||||
// Don't clear the last one since we might have had additional ticks added
|
||||
if self.showProgress && self.refreshProgress?.numberRemaining ?? 0 > 1 {
|
||||
self.refreshProgress?.completeTask()
|
||||
}
|
||||
os_log(.debug, log: self.log, "Done sending article status block...")
|
||||
|
|
|
@ -36,8 +36,8 @@ public struct SyncDatabase {
|
|||
try syncStatusTable.insertStatuses(statuses)
|
||||
}
|
||||
|
||||
public func selectForProcessing(completion: @escaping SyncStatusesCompletionBlock) {
|
||||
return syncStatusTable.selectForProcessing(completion)
|
||||
public func selectForProcessing(limit: Int? = nil, completion: @escaping SyncStatusesCompletionBlock) {
|
||||
return syncStatusTable.selectForProcessing(limit: limit, completion: completion)
|
||||
}
|
||||
|
||||
public func selectPendingCount(completion: @escaping DatabaseIntCompletionBlock) {
|
||||
|
|
|
@ -20,7 +20,7 @@ struct SyncStatusTable: DatabaseTable {
|
|||
self.queue = queue
|
||||
}
|
||||
|
||||
func selectForProcessing(_ completion: @escaping SyncStatusesCompletionBlock) {
|
||||
func selectForProcessing(limit: Int?, completion: @escaping SyncStatusesCompletionBlock) {
|
||||
queue.runInTransaction { databaseResult in
|
||||
var statuses = Set<SyncStatus>()
|
||||
var error: DatabaseError?
|
||||
|
@ -29,7 +29,10 @@ struct SyncStatusTable: DatabaseTable {
|
|||
let updateSQL = "update syncStatus set selected = true"
|
||||
database.executeUpdate(updateSQL, withArgumentsIn: nil)
|
||||
|
||||
let selectSQL = "select * from syncStatus where selected == true"
|
||||
var selectSQL = "select * from syncStatus where selected == true"
|
||||
if let limit = limit {
|
||||
selectSQL = "\(selectSQL) limit \(limit)"
|
||||
}
|
||||
if let resultSet = database.executeQuery(selectSQL, withArgumentsIn: nil) {
|
||||
statuses = resultSet.mapToSet(self.statusWithRow)
|
||||
}
|
||||
|
@ -135,7 +138,7 @@ struct SyncStatusTable: DatabaseTable {
|
|||
func makeDatabaseCall(_ database: FMDatabase) {
|
||||
let parameters = articleIDs.map { $0 as AnyObject }
|
||||
let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))!
|
||||
let deleteSQL = "delete from syncStatus where articleID in \(placeholders)"
|
||||
let deleteSQL = "delete from syncStatus where selected = true and articleID in \(placeholders)"
|
||||
database.executeUpdate(deleteSQL, withArgumentsIn: parameters)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue