Keep fetching episode actions until we get the current timestamp

Solves #23
This commit is contained in:
Bart De Vries 2021-12-15 13:54:51 +01:00
parent b3dac269b7
commit 91aa415908
2 changed files with 104 additions and 75 deletions

View File

@ -109,7 +109,7 @@ void SyncJob::doSync()
void SyncJob::doRegularSync()
{
setTotalAmount(KJob::Unit::Items, 7);
setTotalAmount(KJob::Unit::Items, 8);
setProcessedAmount(KJob::Unit::Items, 0);
Q_EMIT infoMessage(this, getProgressMessage(Started));
@ -385,17 +385,22 @@ void SyncJob::fetchModifiedSubscriptions()
return;
}
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
qCDebug(kastsSync) << "Done updating subscriptions and fetching updates";
// We're ready to sync the episode states now
QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates);
// increase the progress counter now already since fetchRemoteEpisodeActions
// can be executed multiple times
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
m_remoteEpisodeActionHash.clear();
QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions);
});
fetchFeedsJob->start();
}
void SyncJob::syncEpisodeStates()
void SyncJob::fetchRemoteEpisodeActions()
{
qCDebug(kastsSync) << "Start syncing episode states";
Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload));
qulonglong episodeTimestamp = 0;
@ -409,9 +414,6 @@ void SyncJob::syncEpisodeStates()
qCDebug(kastsSync) << "Previous gpodder episode timestamp" << episodeTimestamp;
}
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(EpisodeDownload));
if (!m_gpodder) {
setError(SyncJobError::InternalDataError);
Q_EMIT infoMessage(this, getProgressMessage(Error));
@ -436,96 +438,114 @@ void SyncJob::syncEpisodeStates()
}
qulonglong newEpisodeTimestamp = epRequest->timestamp();
QList<EpisodeAction> remoteEpisodeActionList;
QHash<QString, QHash<QString, EpisodeAction>> remoteEpisodeActionHash, localEpisodeActionHash;
qulonglong currentTimestamp = static_cast<qulonglong>(QDateTime::currentSecsSinceEpoch());
qCDebug(kastsSync) << newEpisodeTimestamp;
for (const EpisodeAction &action : epRequest->episodeActions()) {
addToHashIfNewer(remoteEpisodeActionHash, action);
addToHashIfNewer(m_remoteEpisodeActionHash, action);
qCDebug(kastsSync) << action.podcast << action.url << action.id << action.device << action.action << action.started << action.position
<< action.total << action.timestamp;
}
// store these actions in member variable to be able to delete these exact same changes from DB when processed
m_localEpisodeActions = getLocalEpisodeActions();
updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel);
for (const EpisodeAction &action : m_localEpisodeActions) {
addToHashIfNewer(localEpisodeActionHash, action);
// Check returned timestamp against current timestamp. If they aren't
// close enough (let's take 10 seconds), that means that there are still
// more episode actions to be fetched from the server.
if (newEpisodeTimestamp > (currentTimestamp - 10) || epRequest->episodeActions().isEmpty()) {
QTimer::singleShot(0, this, &SyncJob::syncEpisodeStates);
} else {
qCDebug(kastsSync) << "Fetching another batch of episode actions" << newEpisodeTimestamp << currentTimestamp;
QTimer::singleShot(0, this, &SyncJob::fetchRemoteEpisodeActions);
}
});
}
qCDebug(kastsSync) << "local hash";
debugEpisodeActionHash(localEpisodeActionHash);
void SyncJob::syncEpisodeStates()
{
// store the local actions in member variable to be able to delete these exact same changes from DB when processed
qCDebug(kastsSync) << "remote hash";
debugEpisodeActionHash(remoteEpisodeActionHash);
m_localEpisodeActions = getLocalEpisodeActions();
// now remove conflicts between local and remote episode actions
// based on the timestamp
removeEpisodeActionConflicts(localEpisodeActionHash, remoteEpisodeActionHash);
QHash<QString, QHash<QString, EpisodeAction>> localEpisodeActionHash;
for (const EpisodeAction &action : m_localEpisodeActions) {
addToHashIfNewer(localEpisodeActionHash, action);
}
qCDebug(kastsSync) << "local hash";
debugEpisodeActionHash(localEpisodeActionHash);
qCDebug(kastsSync) << "local hash";
debugEpisodeActionHash(localEpisodeActionHash);
qCDebug(kastsSync) << "remote hash";
debugEpisodeActionHash(remoteEpisodeActionHash);
qCDebug(kastsSync) << "remote hash";
debugEpisodeActionHash(m_remoteEpisodeActionHash);
// Now we update the feeds that need updating (don't update feeds that have
// already been updated after the subscriptions were updated).
for (const QString &url : getFeedsFromHash(remoteEpisodeActionHash)) {
if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) {
m_feedsToBeUpdatedEps += url;
// now remove conflicts between local and remote episode actions
// based on the timestamp
removeEpisodeActionConflicts(localEpisodeActionHash, m_remoteEpisodeActionHash);
qCDebug(kastsSync) << "local hash";
debugEpisodeActionHash(localEpisodeActionHash);
qCDebug(kastsSync) << "remote hash";
debugEpisodeActionHash(m_remoteEpisodeActionHash);
// Now we update the feeds that need updating (don't update feeds that have
// already been updated after the subscriptions were updated).
for (const QString &url : getFeedsFromHash(m_remoteEpisodeActionHash)) {
if (!m_feedsToBeUpdatedSubs.contains(url) && !m_feedsToBeUpdatedEps.contains(url)) {
m_feedsToBeUpdatedEps += url;
}
}
qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps;
m_feedUpdateTotal = m_feedsToBeUpdatedEps.count();
m_feedUpdateProgress = 0;
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this);
connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort);
connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) {
qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount;
Q_UNUSED(job);
Q_ASSERT(unit == KJob::Unit::Items);
m_feedUpdateProgress = amount;
if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) {
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
}
});
connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() {
qCDebug(kastsSync) << "Feed update finished";
if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) {
if (fetchFeedsJob->aborted()) {
Q_EMIT infoMessage(this, getProgressMessage(Aborted));
} else if (fetchFeedsJob->error()) {
// FetchFeedsJob takes care of its own error reporting
Q_EMIT infoMessage(this, getProgressMessage(Error));
}
emitResult();
return;
}
qCDebug(kastsSync) << "Feeds to be updated:" << m_feedsToBeUpdatedEps;
m_feedUpdateTotal = m_feedsToBeUpdatedEps.count();
m_feedUpdateProgress = 0;
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
FetchFeedsJob *fetchFeedsJob = new FetchFeedsJob(m_feedsToBeUpdatedEps, this);
connect(this, &SyncJob::aborting, fetchFeedsJob, &FetchFeedsJob::abort);
connect(fetchFeedsJob, &FetchFeedsJob::processedAmountChanged, this, [this, fetchFeedsJob](KJob *job, KJob::Unit unit, qulonglong amount) {
qCDebug(kastsSync) << "FetchFeedsJob::processedAmountChanged:" << amount;
Q_UNUSED(job);
Q_ASSERT(unit == KJob::Unit::Items);
m_feedUpdateProgress = amount;
if (!fetchFeedsJob->aborted() && !fetchFeedsJob->error()) {
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
}
// Apply the remote changes locally
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(ApplyEpisodeActions));
Sync::instance().applyEpisodeActionsLocally(m_remoteEpisodeActionHash);
// Upload the local changes to the server
QVector<EpisodeAction> localEpisodeActionList = createListFromHash(localEpisodeActionHash);
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload));
// Now upload the episode actions to the server
QTimer::singleShot(0, this, [this, localEpisodeActionList]() {
uploadEpisodeActions(localEpisodeActionList);
});
connect(fetchFeedsJob, &FetchFeedsJob::result, this, [=]() {
qCDebug(kastsSync) << "Feed update finished";
if (fetchFeedsJob->error() || fetchFeedsJob->aborted()) {
if (fetchFeedsJob->aborted()) {
Q_EMIT infoMessage(this, getProgressMessage(Aborted));
} else if (fetchFeedsJob->error()) {
// FetchFeedsJob takes care of its own error reporting
Q_EMIT infoMessage(this, getProgressMessage(Error));
}
emitResult();
return;
}
Q_EMIT infoMessage(this, getProgressMessage(SubscriptionFetch));
// Apply the remote changes locally
Sync::instance().applyEpisodeActionsLocally(remoteEpisodeActionHash);
updateDBTimestamp(newEpisodeTimestamp, episodeTimestampLabel);
// Upload the local changes to the server
QVector<EpisodeAction> localEpisodeActionList = createListFromHash(localEpisodeActionHash);
setProcessedAmount(KJob::Unit::Items, processedAmount(KJob::Unit::Items) + 1);
Q_EMIT infoMessage(this, getProgressMessage(EpisodeUpload));
// Now upload the episode actions to the server
QTimer::singleShot(0, this, [this, localEpisodeActionList]() {
uploadEpisodeActions(localEpisodeActionList);
});
});
fetchFeedsJob->start();
});
fetchFeedsJob->start();
}
void SyncJob::uploadEpisodeActions(const QVector<EpisodeAction> &episodeActions)
@ -582,6 +602,7 @@ void SyncJob::uploadEpisodeActionsPartial(const QVector<EpisodeAction> &episodeA
} else {
// All episodeActions have been uploaded
qCDebug(kastsSync) << "New uploadEpisodeTimestamp from server" << upEpRequest->timestamp();
updateDBTimestamp(upEpRequest->timestamp(), (m_syncStatus == SyncStatus::UploadOnlySync) ? uploadEpisodeTimestampLabel : episodeTimestampLabel);
removeAppliedEpisodeActionsFromDB();
@ -856,6 +877,9 @@ QString SyncJob::getProgressMessage(SyncJobStatus status) const
case EpisodeDownload:
return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Requesting Remote Episode Updates", processed, total);
break;
case ApplyEpisodeActions:
return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Applying Remote Episode Changes", processed, total);
break;
case EpisodeUpload:
return i18nc("Step in Subscription and Episode Syncing Progress", "(Step %1 of %2) Uploading Local Episode Updates", processed, total);
break;

View File

@ -6,7 +6,9 @@
#pragma once
#include <QHash>
#include <QObject>
#include <QStringList>
#include <KJob>
@ -33,6 +35,7 @@ public:
SubscriptionDownload,
SubscriptionUpload,
EpisodeDownload,
ApplyEpisodeActions,
EpisodeUpload,
SubscriptionFetch,
Finished,
@ -61,6 +64,7 @@ private:
void syncSubscriptions();
void uploadSubscriptions(const QStringList &localAddFeedUrlList, const QStringList &localRemoveFeedUrlList);
void fetchModifiedSubscriptions();
void fetchRemoteEpisodeActions();
void syncEpisodeStates();
void uploadEpisodeActions(const QVector<SyncUtils::EpisodeAction> &episodeActions);
void uploadEpisodeActionsPartial(const QVector<SyncUtils::EpisodeAction> &episodeActionList, const int startIndex);
@ -93,6 +97,7 @@ private:
int m_feedUpdateTotal = 0;
QPair<QStringList, QStringList> m_localSubscriptionChanges;
QVector<SyncUtils::EpisodeAction> m_localEpisodeActions;
QHash<QString, QHash<QString, SyncUtils::EpisodeAction>> m_remoteEpisodeActionHash;
// needed for UI notifications
QString getProgressMessage(SyncJobStatus status) const;