From 91aa4159080901104dc5364dfab7a49ad6cb6049 Mon Sep 17 00:00:00 2001 From: Bart De Vries Date: Wed, 15 Dec 2021 13:54:51 +0100 Subject: [PATCH] Keep fetching episode actions until we get the current timestamp Solves #23 --- src/sync/syncjob.cpp | 174 ++++++++++++++++++++++++------------------- src/sync/syncjob.h | 5 ++ 2 files changed, 104 insertions(+), 75 deletions(-) diff --git a/src/sync/syncjob.cpp b/src/sync/syncjob.cpp index c1b00a51..e5ced21e 100644 --- a/src/sync/syncjob.cpp +++ b/src/sync/syncjob.cpp @@ -109,7 +109,7 @@ void SyncJob::doSync() void SyncJob::doRegularSync() { - setTotalAmount(KJob::Unit::Items, 7); + setTotalAmount(KJob::Unit::Items, 8); setProcessedAmount(KJob::Unit::Items, 0); Q_EMIT infoMessage(this, getProgressMessage(Started)); @@ -385,17 +385,22 @@ void SyncJob::fetchModifiedSubscriptions() return; } Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); - qCDebug(kastsSync) << "Done updating subscriptions and fetching updates"; + // We're ready to sync the episode states now - QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates); + // increase the progress counter now already since fetchRemoteEpisodeActions + // can be executed multiple times + setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); + m_remoteEpisodeActionHash.clear(); + QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions); }); fetchFeedsJob->start(); } -void SyncJob::syncEpisodeStates() +void SyncJob::fetchRemoteEpisodeActions() { qCDebug(kastsSync) << "Start syncing episode states"; + Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload)); qulonglong episodeTimestamp = 0; @@ -409,9 +414,6 @@ void SyncJob::syncEpisodeStates() qCDebug(kastsSync) << "Previous gpodder episode timestamp" << episodeTimestamp; } - setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); - Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload)); - if (!m_gpodder) { setError(SyncJobError::InternalDataError); Q_EMIT infoMessage(this, getProgressMessage(Error)); @@ -436,96 +438,114 @@ void SyncJob::syncEpisodeStates() } qulonglong newEpisodeTimestamp = epRequest->timestamp(); - QList remoteEpisodeActionList; - QHash> remoteEpisodeActionHash, localEpisodeActionHash; + qulonglong currentTimestamp = static_cast(QDateTime::currentSecsSinceEpoch()); + qCDebug(kastsSync) << newEpisodeTimestamp; for (const EpisodeAction &action : epRequest->episodeActions()) { - addToHashIfNewer(remoteEpisodeActionHash, action); + addToHashIfNewer(m_remoteEpisodeActionHash, action); qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position << action.total << action.timestamp; } - // store these actions in member variable to be able to delete these exact same changes from DB when processed - m_localEpisodeActions = getLocalEpisodeActions(); + updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel); - for (const EpisodeAction &action : m_localEpisodeActions) { - addToHashIfNewer(localEpisodeActionHash, action); + // Check returned timestamp against current timestamp. If they aren't + // close enough (let's take 10 seconds), that means that there are still + // more episode actions to be fetched from the server. + if (newEpisodeTimestamp > (currentTimestamp - 10) || epRequest->episodeActions().isEmpty()) { + QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates); + } else { + qCDebug(kastsSync) << "Fetching another batch of episode actions" << newEpisodeTimestamp << currentTimestamp; + QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions); } + }); +} - qCDebug(kastsSync) << "local hash"; - debugEpisodeActionHash(localEpisodeActionHash); +void SyncJob::syncEpisodeStates() +{ + // store the local actions in member variable to be able to delete these exact same changes from DB when processed - qCDebug(kastsSync) << "remote hash"; - debugEpisodeActionHash(remoteEpisodeActionHash); + m_localEpisodeActions = getLocalEpisodeActions(); - // now remove conflicts between local and remote episode actions - // based on the timestamp - removeEpisodeActionConflicts(localEpisodeActionHash, remoteEpisodeActionHash); + QHash> localEpisodeActionHash; + for (const EpisodeAction &action : m_localEpisodeActions) { + addToHashIfNewer(localEpisodeActionHash, action); + } - qCDebug(kastsSync) << "local hash"; - debugEpisodeActionHash(localEpisodeActionHash); + qCDebug(kastsSync) << "local hash"; + debugEpisodeActionHash(localEpisodeActionHash); - qCDebug(kastsSync) << "remote hash"; - debugEpisodeActionHash(remoteEpisodeActionHash); + qCDebug(kastsSync) << "remote hash"; + debugEpisodeActionHash(m_remoteEpisodeActionHash); - // Now we update the feeds that need updating (don't update feeds that have - // already been updated after the subscriptions were updated). - for (const QString &url : getFeedsFromHash(remoteEpisodeActionHash)) { - if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) { - m_feedsToBeUpdatedEps += url; + // now remove conflicts between local and remote episode actions + // based on the timestamp + removeEpisodeActionConflicts(localEpisodeActionHash, m_remoteEpisodeActionHash); + + qCDebug(kastsSync) << "local hash"; + debugEpisodeActionHash(localEpisodeActionHash); + + qCDebug(kastsSync) << "remote hash"; + debugEpisodeActionHash(m_remoteEpisodeActionHash); + + // Now we update the feeds that need updating (don't update feeds that have + // already been updated after the subscriptions were updated). + for (const QString &url : getFeedsFromHash(m_remoteEpisodeActionHash)) { + if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) { + m_feedsToBeUpdatedEps += url; + } + } + qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps; + m_feedUpdateTotal = m_feedsToBeUpdatedEps.count(); + m_feedUpdateProgress = 0; + + setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); + Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); + + FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this); + connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort); + connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) { + qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount; + Q_UNUSED(job); + Q_ASSERT(unit == KJob::Unit::Items); + m_feedUpdateProgress = amount; + if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) { + Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); + } + }); + connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() { + qCDebug(kastsSync) << "Feed update finished"; + if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) { + if (fetchFeedsJob->aborted()) { + Q_EMIT infoMessage(this, getProgressMessage(Aborted)); + } else if (fetchFeedsJob->error()) { + // FetchFeedsJob takes care of its own error reporting + Q_EMIT infoMessage(this, getProgressMessage(Error)); } + emitResult(); + return; } - qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps; - m_feedUpdateTotal = m_feedsToBeUpdatedEps.count(); - m_feedUpdateProgress = 0; - - setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); - FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this); - connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort); - connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) { - qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount; - Q_UNUSED(job); - Q_ASSERT(unit == KJob::Unit::Items); - m_feedUpdateProgress = amount; - if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) { - Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); - } + // Apply the remote changes locally + setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); + Q_EMIT infoMessage(this, getProgressMessage(ApplyEpisodeActions)); + + Sync::instance().applyEpisodeActionsLocally(m_remoteEpisodeActionHash); + + // Upload the local changes to the server + QVector localEpisodeActionList = createListFromHash(localEpisodeActionHash); + + setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); + Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload)); + // Now upload the episode actions to the server + QTimer::singleShot(0, this, [this, localEpisodeActionList]() { + uploadEpisodeActions(localEpisodeActionList); }); - connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() { - qCDebug(kastsSync) << "Feed update finished"; - if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) { - if (fetchFeedsJob->aborted()) { - Q_EMIT infoMessage(this, getProgressMessage(Aborted)); - } else if (fetchFeedsJob->error()) { - // FetchFeedsJob takes care of its own error reporting - Q_EMIT infoMessage(this, getProgressMessage(Error)); - } - emitResult(); - return; - } - Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch)); - - // Apply the remote changes locally - Sync::instance().applyEpisodeActionsLocally(remoteEpisodeActionHash); - - updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel); - - // Upload the local changes to the server - QVector localEpisodeActionList = createListFromHash(localEpisodeActionHash); - - setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1); - Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload)); - // Now upload the episode actions to the server - QTimer::singleShot(0, this, [this, localEpisodeActionList]() { - uploadEpisodeActions(localEpisodeActionList); - }); - }); - fetchFeedsJob->start(); }); + fetchFeedsJob->start(); } void SyncJob::uploadEpisodeActions(const QVector &episodeActions) @@ -582,6 +602,7 @@ void SyncJob::uploadEpisodeActionsPartial(const QVector &episodeA } else { // All episodeActions have been uploaded + qCDebug(kastsSync) << "New uploadEpisodeTimestamp from server" << upEpRequest->timestamp(); updateDBTimestamp(upEpRequest->timestamp(), (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadEpisodeTimestampLabel : episodeTimestampLabel); removeAppliedEpisodeActionsFromDB(); @@ -856,6 +877,9 @@ QString SyncJob::getProgressMessage(SyncJobStatus status) const case EpisodeDownload: return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Requesting Remote Episode Updates", processed, total); break; + case ApplyEpisodeActions: + return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Applying Remote Episode Changes", processed, total); + break; case EpisodeUpload: return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Uploading Local Episode Updates", processed, total); break; diff --git a/src/sync/syncjob.h b/src/sync/syncjob.h index 45b3d458..19f6aa8b 100644 --- a/src/sync/syncjob.h +++ b/src/sync/syncjob.h @@ -6,7 +6,9 @@ #pragma once +#include #include +#include #include @@ -33,6 +35,7 @@ public: SubscriptionDownload, SubscriptionUpload, EpisodeDownload, + ApplyEpisodeActions, EpisodeUpload, SubscriptionFetch, Finished, @@ -61,6 +64,7 @@ private: void syncSubscriptions(); void uploadSubscriptions(const QStringList &localAddFeedUrlList, const QStringList &localRemoveFeedUrlList); void fetchModifiedSubscriptions(); + void fetchRemoteEpisodeActions(); void syncEpisodeStates(); void uploadEpisodeActions(const QVector &episodeActions); void uploadEpisodeActionsPartial(const QVector &episodeActionList, const int startIndex); @@ -93,6 +97,7 @@ private: int m_feedUpdateTotal = 0; QPair m_localSubscriptionChanges; QVector m_localEpisodeActions; + QHash> m_remoteEpisodeActionHash; // needed for UI notifications QString getProgressMessage(SyncJobStatus status) const;