Experimental rewrite of feed update mechanism, message insertions to DB are now also done in separate worker thread, logic is simpler, easily maintainable.

This commit is contained in:
Martin Rotter 2020-06-03 09:36:01 +02:00
parent 01c29b99c0
commit 2c6afa1bb7
12 changed files with 76 additions and 137 deletions

View File

@ -7,18 +7,15 @@
#include "services/abstract/feed.h"
#include <QDebug>
#include <QMessageBox>
#include <QMessageLogger>
#include <QMutexLocker>
#include <QRegularExpression>
#include <QString>
#include <QThread>
#include <QThreadPool>
#include <QUrl>
FeedDownloader::FeedDownloader(QObject* parent)
: QObject(parent), m_mutex(new QMutex()), m_threadPool(new QThreadPool(this)),
m_feedsUpdated(0), m_feedsUpdating(0), m_feedsOriginalCount(0) {
FeedDownloader::FeedDownloader()
: QObject(), m_mutex(new QMutex()), m_feedsUpdated(0), m_feedsOriginalCount(0) {
qRegisterMetaType<FeedDownloadResults>("FeedDownloadResults");
m_threadPool->setMaxThreadCount(2);
}
FeedDownloader::~FeedDownloader() {
@ -29,11 +26,11 @@ FeedDownloader::~FeedDownloader() {
}
bool FeedDownloader::isUpdateRunning() const {
return !m_feeds.isEmpty() || m_feedsUpdating > 0;
return !m_feeds.isEmpty();
}
void FeedDownloader::updateAvailableFeeds() {
foreach (const Feed* feed, m_feeds) {
for (const Feed* feed : m_feeds) {
auto* cache = dynamic_cast<CacheForServiceRoot*>(feed->getParentServiceRoot());
if (cache != nullptr) {
@ -43,22 +40,7 @@ void FeedDownloader::updateAvailableFeeds() {
}
while (!m_feeds.isEmpty()) {
connect(m_feeds.first(),
&Feed::messagesObtained,
this,
&FeedDownloader::oneFeedUpdateFinished,
Qt::ConnectionType(Qt::UniqueConnection | Qt::AutoConnection));
if (m_threadPool->tryStart(m_feeds.first())) {
m_feeds.removeFirst();
m_feedsUpdating++;
}
else {
qCritical("User wanted to update some feeds but all working threads are occupied.");
// We want to start update of some feeds but all working threads are occupied.
break;
}
updateOneFeed(m_feeds.takeFirst());
}
}
@ -67,45 +49,64 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
if (feeds.isEmpty()) {
qDebug("No feeds to update in worker thread, aborting update.");
finalizeUpdate();
}
else {
qDebug().nospace() << "Starting feed updates from worker in thread: \'" << QThread::currentThreadId() << "\'.";
m_feeds = feeds;
m_feedsOriginalCount = m_feeds.size();
m_results.clear();
m_feedsUpdated = m_feedsUpdating = 0;
m_feedsUpdated = 0;
// Job starts now.
emit updateStarted();
updateAvailableFeeds();
}
finalizeUpdate();
}
void FeedDownloader::stopRunningUpdate() {
m_threadPool->clear();
m_feeds.clear();
m_feedsOriginalCount = m_feedsUpdated = 0;
}
void FeedDownloader::oneFeedUpdateFinished(const QList<Message>& messages, bool error_during_obtaining) {
QMutexLocker locker(m_mutex);
void FeedDownloader::updateOneFeed(Feed* feed) {
qDebug().nospace() << "Downloading new messages for feed ID "
<< feed->customId() << " URL: " << feed->url() << " title: " << feed->title() << " in thread: \'"
<< QThread::currentThreadId() << "\'.";
bool error_during_obtaining = false;
QList<Message> msgs = feed->obtainNewMessages(&error_during_obtaining);
qDebug().nospace() << "Downloaded " << msgs.size() << " messages for feed ID "
<< feed->customId() << " URL: " << feed->url() << " title: " << feed->title() << " in thread: \'"
<< QThread::currentThreadId() << "\'.";
// Now, do some general operations on messages (tweak encoding etc.).
for (auto& msg : msgs) {
// Also, make sure that HTML encoding, encoding of special characters, etc., is fixed.
msg.m_contents = QUrl::fromPercentEncoding(msg.m_contents.toUtf8());
msg.m_author = msg.m_author.toUtf8();
// Sanitize title. Remove newlines etc.
msg.m_title = QUrl::fromPercentEncoding(msg.m_title.toUtf8())
// Replace all continuous white space.
.replace(QRegularExpression(QSL("[\\s]{2,}")), QSL(" "))
// Remove all newlines and leading white space.
.remove(QRegularExpression(QSL("([\\n\\r])|(^\\s)")));
}
m_feedsUpdated++;
m_feedsUpdating--;
Feed* feed = qobject_cast<Feed*>(sender());
disconnect(feed, &Feed::messagesObtained, this, &FeedDownloader::oneFeedUpdateFinished);
// Now, we check if there are any feeds we would like to update too.
updateAvailableFeeds();
// Now make sure, that messages are actually stored to SQL in a locked state.
qDebug().nospace() << "Saving messages of feed ID "
<< feed->customId() << " URL: " << feed->url() << " title: " << feed->title() << " in thread: \'"
<< QThread::currentThreadId() << "\'.";
int updated_messages = feed->updateMessages(messages, error_during_obtaining);
int updated_messages = feed->updateMessages(msgs, error_during_obtaining);
qDebug("%d messages for feed %s stored in DB.", updated_messages, qPrintable(feed->customId()));
@ -115,10 +116,6 @@ void FeedDownloader::oneFeedUpdateFinished(const QList<Message>& messages, bool
qDebug("Made progress in feed updates, total feeds count %d/%d (id of feed is %d).", m_feedsUpdated, m_feedsOriginalCount, feed->id());
emit updateProgress(feed, m_feedsUpdated, m_feedsOriginalCount);
if (m_feeds.isEmpty() && m_feedsUpdating <= 0) {
finalizeUpdate();
}
}
void FeedDownloader::finalizeUpdate() {

View File

@ -10,7 +10,9 @@
#include "core/message.h"
class Feed;
class QThreadPool;
class QMutex;
// Represents results of batch feed updates.
@ -37,41 +39,22 @@ class FeedDownloader : public QObject {
public:
// Constructors and destructors.
explicit FeedDownloader(QObject* parent = 0);
explicit FeedDownloader();
virtual ~FeedDownloader();
bool isUpdateRunning() const;
public slots:
// Performs update of all feeds from the "feeds" parameter.
// New messages are downloaded for each feed and they
// are stored persistently in the database.
// Appropriate signals are emitted.
void updateFeeds(const QList<Feed*>& feeds);
// Stops running update.
void stopRunningUpdate();
private slots:
void oneFeedUpdateFinished(const QList<Message>& messages, bool error_during_obtaining);
signals:
// Emitted if feed updates started.
void updateStarted();
// Emitted if all items from update queue are
// processed.
void updateFinished(FeedDownloadResults updated_feeds);
// Emitted if any item is processed.
// "Current" number indicates count of processed feeds
// and "total" number indicates total number of feeds
// which were in the initial queue.
void updateProgress(const Feed* feed, int current, int total);
private:
void updateOneFeed(Feed* feed);
void updateAvailableFeeds();
void finalizeUpdate();
@ -80,7 +63,6 @@ class FeedDownloader : public QObject {
QThreadPool* m_threadPool;
FeedDownloadResults m_results;
int m_feedsUpdated;
int m_feedsUpdating;
int m_feedsOriginalCount;
};

View File

@ -452,13 +452,6 @@ void Application::onAboutToQuit() {
system()->removeTrolltechJunkRegistryKeys();
#endif
qApp->feedReader()->quit();
database()->saveDatabase();
if (mainForm() != nullptr) {
mainForm()->saveSize();
}
if (locked_safely) {
// Application obtained permission to close in a safe way.
qDebug("Close lock was obtained safely.");
@ -472,6 +465,13 @@ void Application::onAboutToQuit() {
qDebug("Close lock timed-out.");
}
qApp->feedReader()->quit();
database()->saveDatabase();
if (mainForm() != nullptr) {
mainForm()->saveSize();
}
// Now, we can check if application should just quit or restart itself.
if (m_shouldRestart) {
finish();

View File

@ -61,25 +61,33 @@ void FeedReader::updateFeeds(const QList<Feed*>& feeds) {
if (!qApp->feedUpdateLock()->tryLock()) {
qApp->showGuiMessage(tr("Cannot update all items"),
tr("You cannot update all items because another critical operation is ongoing."),
QSystemTrayIcon::Warning, qApp->mainFormWidget(), true);
QSystemTrayIcon::MessageIcon::Warning, qApp->mainFormWidget(), true);
return;
}
if (m_feedDownloader == nullptr) {
qDebug("Creating FeedDownloader singleton.");
m_feedDownloaderThread = new QThread();
m_feedDownloader = new FeedDownloader();
// Downloader setup.
qRegisterMetaType<QList<Feed*>>("QList<Feed*>");
m_feedDownloader->moveToThread(m_feedDownloaderThread);
connect(m_feedDownloaderThread, &QThread::finished, m_feedDownloaderThread, &QThread::deleteLater);
connect(m_feedDownloaderThread, &QThread::finished, m_feedDownloader, &FeedDownloader::deleteLater);
connect(m_feedDownloader, &FeedDownloader::updateFinished, this, &FeedReader::feedUpdatesFinished);
connect(m_feedDownloader, &FeedDownloader::updateProgress, this, &FeedReader::feedUpdatesProgress);
connect(m_feedDownloader, &FeedDownloader::updateStarted, this, &FeedReader::feedUpdatesStarted);
connect(m_feedDownloader, &FeedDownloader::updateFinished, qApp->feedUpdateLock(), &Mutex::unlock);
m_feedDownloaderThread->start();
}
QMetaObject::invokeMethod(m_feedDownloader, "updateFeeds", Q_ARG(QList<Feed*>, feeds));
QMetaObject::invokeMethod(m_feedDownloader, "updateFeeds",
Qt::ConnectionType::QueuedConnection, Q_ARG(QList<Feed*>, feeds));
}
void FeedReader::updateAutoUpdateStatus() {
@ -123,8 +131,6 @@ void FeedReader::updateAllFeeds() {
void FeedReader::stopRunningFeedUpdate() {
if (m_feedDownloader != nullptr) {
m_feedDownloader->stopRunningUpdate();
//QMetaObject::invokeMethod(m_feedDownloader, "stopRunningUpdate");
}
}
@ -173,6 +179,7 @@ void FeedReader::executeNextAutoUpdate() {
// should be updated in this pass.
QList<Feed*> feeds_for_update = m_feedsModel->feedsForScheduledUpdate(m_globalAutoUpdateEnabled &&
m_globalAutoUpdateRemainingInterval == 0);
qApp->feedUpdateLock()->unlock();
if (!feeds_for_update.isEmpty()) {
@ -224,12 +231,9 @@ void FeedReader::quit() {
connect(m_feedDownloader, &FeedDownloader::updateFinished, &loop, &QEventLoop::quit);
loop.exec();
}
}
// Close workers.
if (m_feedDownloader != nullptr) {
qDebug("Feed downloader exists. Deleting it from memory.");
m_feedDownloader->deleteLater();
// Both thread and downloader are auto-deleted when worker thread exits.
m_feedDownloaderThread->quit();
}
if (qApp->settings()->value(GROUP(Messages), SETTING(Messages::ClearReadOnExit)).toBool()) {

View File

@ -11,12 +11,19 @@
#include <QFutureWatcher>
class FeedsModel;
class MessagesModel;
class MessagesProxyModel;
class FeedsProxyModel;
class ServiceEntryPoint;
class QTimer;
class QThread;
class RSSGUARD_DLLSPEC FeedReader : public QObject {
Q_OBJECT
@ -70,7 +77,6 @@ class RSSGUARD_DLLSPEC FeedReader : public QObject {
private:
QList<ServiceEntryPoint*> m_feedServices;
FeedsModel* m_feedsModel;
FeedsProxyModel* m_feedsProxyModel;
MessagesModel* m_messagesModel;
@ -82,6 +88,7 @@ class RSSGUARD_DLLSPEC FeedReader : public QObject {
bool m_globalAutoUpdateOnlyUnfocused{};
int m_globalAutoUpdateInitialInterval{};
int m_globalAutoUpdateRemainingInterval{};
QThread* m_feedDownloaderThread;
FeedDownloader* m_feedDownloader;
};

View File

@ -19,7 +19,6 @@ Feed::Feed(RootItem* parent)
: RootItem(parent), m_url(QString()), m_status(Normal), m_autoUpdateType(DefaultAutoUpdate),
m_autoUpdateInitialInterval(DEFAULT_AUTO_UPDATE_INTERVAL), m_autoUpdateRemainingInterval(DEFAULT_AUTO_UPDATE_INTERVAL) {
setKind(RootItemKind::Feed);
setAutoDelete(false);
}
Feed::Feed(const QSqlRecord& record) : Feed(nullptr) {
@ -43,7 +42,6 @@ Feed::Feed(const QSqlRecord& record) : Feed(nullptr) {
Feed::Feed(const Feed& other) : RootItem(other) {
setKind(RootItemKind::Feed);
setAutoDelete(false);
setCountOfAllMessages(other.countOfAllMessages());
setCountOfUnreadMessages(other.countOfUnreadMessages());
@ -161,37 +159,6 @@ void Feed::updateCounts(bool including_total_count) {
setCountOfUnreadMessages(DatabaseQueries::getMessageCountsForFeed(database, customId(), account_id, false));
}
void Feed::run() {
qDebug().nospace() << "Downloading new messages for feed ID "
<< customId() << " URL: " << url() << " title: " << title() << " in thread: \'"
<< QThread::currentThreadId() << "\'.";
bool error_during_obtaining = false;
QList<Message> msgs = obtainNewMessages(&error_during_obtaining);
qDebug().nospace() << "Downloaded " << msgs.size() << " messages for feed ID "
<< customId() << " URL: " << url() << " title: " << title() << " in thread: \'"
<< QThread::currentThreadId() << "\'.";
// Now, do some general operations on messages (tweak encoding etc.).
for (auto& msg : msgs) {
// Also, make sure that HTML encoding, encoding of special characters, etc., is fixed.
msg.m_contents = QUrl::fromPercentEncoding(msg.m_contents.toUtf8());
msg.m_author = msg.m_author.toUtf8();
// Sanitize title. Remove newlines etc.
msg.m_title = QUrl::fromPercentEncoding(msg.m_title.toUtf8())
// Replace all continuous white space.
.replace(QRegularExpression(QSL("[\\s]{2,}")), QSL(" "))
// Remove all newlines and leading white space.
.remove(QRegularExpression(QSL("([\\n\\r])|(^\\s)")));
}
emit messagesObtained(msgs, error_during_obtaining);
}
bool Feed::cleanMessages(bool clean_read_only) {
return getParentServiceRoot()->cleanFeeds(QList<Feed*>() << this, clean_read_only);
}

View File

@ -7,11 +7,10 @@
#include "core/message.h"
#include <QRunnable>
#include <QVariant>
// Base class for "feed" nodes.
class Feed : public RootItem, public QRunnable {
class Feed : public RootItem {
Q_OBJECT
public:
@ -68,12 +67,11 @@ class Feed : public RootItem, public QRunnable {
QString url() const;
void setUrl(const QString& url);
// Runs update in thread (thread pooled).
void run();
bool markAsReadUnread(ReadStatus status);
bool cleanMessages(bool clean_read_only);
virtual QList<Message> obtainNewMessages(bool* error_during_obtaining) = 0;
public slots:
void updateCounts(bool including_total_count);
int updateMessages(const QList<Message>& messages, bool error_during_obtaining);
@ -82,14 +80,6 @@ class Feed : public RootItem, public QRunnable {
QString getAutoUpdateStatusDescription() const;
QString getStatusDescription() const;
signals:
void messagesObtained(QList<Message> messages, bool error_during_obtaining);
private:
// Performs synchronous obtaining of new messages for this feed.
virtual QList<Message> obtainNewMessages(bool* error_during_obtaining) = 0;
private:
QString m_url;
Status m_status;

View File

@ -14,8 +14,6 @@ class GmailFeed : public Feed {
explicit GmailFeed(const QSqlRecord& record);
GmailServiceRoot* serviceRoot() const;
private:
QList<Message> obtainNewMessages(bool* error_during_obtaining);
};

View File

@ -13,8 +13,6 @@ class InoreaderFeed : public Feed {
explicit InoreaderFeed(const QSqlRecord& record);
InoreaderServiceRoot* serviceRoot() const;
private:
QList<Message> obtainNewMessages(bool* error_during_obtaining);
};

View File

@ -24,8 +24,6 @@ class OwnCloudFeed : public Feed {
bool removeItself();
OwnCloudServiceRoot* serviceRoot() const;
private:
QList<Message> obtainNewMessages(bool* error_during_obtaining);
};

View File

@ -67,6 +67,8 @@ class StandardFeed : public Feed {
QNetworkReply::NetworkError networkError() const;
QList<Message> obtainNewMessages(bool* error_during_obtaining);
// Tries to guess feed hidden under given URL
// and uses given credentials.
// Returns pointer to guessed feed (if at least partially
@ -82,9 +84,6 @@ class StandardFeed : public Feed {
public slots:
void fetchMetadataForItself();
private:
QList<Message> obtainNewMessages(bool* error_during_obtaining);
private:
bool m_passwordProtected{};
QString m_username;

View File

@ -27,7 +27,6 @@ class TtRssFeed : public Feed {
bool editItself(TtRssFeed* new_feed_data);
bool removeItself();
private:
QList<Message> obtainNewMessages(bool* error_during_obtaining);
};