initial PoC work - 160 feeds in 1 minute

This commit is contained in:
Martin Rotter 2023-01-04 10:27:01 +01:00
parent 0e73a3c061
commit f60c6a2bc2
8 changed files with 103 additions and 78 deletions

View File

@ -16,20 +16,17 @@
#include <QDebug>
#include <QJSEngine>
#include <QMutexLocker>
#include <QString>
#include <QThread>
#include <QtConcurrentMap>
FeedDownloader::FeedDownloader()
: QObject(), m_isCacheSynchronizationRunning(false), m_stopCacheSynchronization(false), m_mutex(new QMutex()),
m_feedsUpdated(0), m_feedsOriginalCount(0) {
: QObject(), m_isCacheSynchronizationRunning(false), m_stopCacheSynchronization(false), m_feedsUpdated(0),
m_feedsOriginalCount(0) {
qRegisterMetaType<FeedDownloadResults>("FeedDownloadResults");
}
FeedDownloader::~FeedDownloader() {
m_mutex->tryLock();
m_mutex->unlock();
delete m_mutex;
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Destroying FeedDownloader instance.";
}
@ -62,14 +59,12 @@ void FeedDownloader::synchronizeAccountCaches(const QList<CacheForServiceRoot*>&
}
void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
QMutexLocker locker(m_mutex);
m_erroredAccounts.clear();
m_results.clear();
m_feeds = feeds;
m_feedsOriginalCount = m_feeds.size();
m_feedsUpdated = 0;
m_feeds.clear();
const QDateTime update_time = QDateTime::currentDateTimeUtc();
m_feedsOriginalCount = feeds.size();
m_feedsUpdated = 0;
if (feeds.isEmpty()) {
qDebugNN << LOGSEC_FEEDDOWNLOADER << "No feeds to update in worker thread, aborting update.";
@ -104,24 +99,25 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
synchronizeAccountCaches(caches.values(), false);
QHash<ServiceRoot*, ApplicationException> errored_roots;
auto roots = feeds_per_root.uniqueKeys();
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
for (auto* rt : roots) {
auto fds = feeds_per_root.values(rt);
// Obtain lists of local IDs.
if (rt->wantsBaggedIdsOfExistingMessages()) {
// Tagged messages for the account.
tagged_messages.insert(rt, DatabaseQueries::bagsOfMessages(database, rt->labelsNode()->labels()));
// Tags per account.
auto per_acc_tags = DatabaseQueries::bagsOfMessages(database, rt->labelsNode()->labels());
tagged_messages.insert(rt, per_acc_tags);
QHash<QString, QHash<ServiceRoot::BagOfMessages, QStringList>> per_acc_states;
// This account has activated intelligent downloading of messages.
// Prepare bags.
auto fds = feeds_per_root.values(rt);
for (Feed* fd : fds) {
QHash<ServiceRoot::BagOfMessages, QStringList> per_feed_states;
@ -131,42 +127,65 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
DatabaseQueries::bagOfMessages(database, ServiceRoot::BagOfMessages::Unread, fd));
per_feed_states.insert(ServiceRoot::BagOfMessages::Starred,
DatabaseQueries::bagOfMessages(database, ServiceRoot::BagOfMessages::Starred, fd));
per_acc_states.insert(fd->customId(), per_feed_states);
FeedUpdate fu;
fu.account = rt;
fu.feed = fd;
fu.stated_messages = per_feed_states;
fu.tagged_messages = per_acc_tags;
m_feeds.append(fu);
}
stated_messages.insert(rt, per_acc_states);
}
else {
for (Feed* fd : fds) {
FeedUpdate fu;
fu.account = rt;
fu.feed = fd;
m_feeds.append(fu);
}
}
try {
rt->aboutToBeginFeedFetching(feeds_per_root.values(rt), stated_messages.value(rt), tagged_messages.value(rt));
rt->aboutToBeginFeedFetching(fds, stated_messages.value(rt), tagged_messages.value(rt));
}
catch (const ApplicationException& ex) {
// Common error showed, all feeds from the root are errored now!
errored_roots.insert(rt, ex);
m_erroredAccounts.insert(rt, ex);
}
}
while (!m_feeds.isEmpty()) {
auto n_f = m_feeds.takeFirst();
auto n_r = n_f->getParentServiceRoot();
std::function<void(const FeedUpdate&)> func = [=](const FeedUpdate& fd) -> void {
updateThreadedFeed(fd);
};
if (errored_roots.contains(n_r)) {
// This feed is errored because its account errored when preparing feed update.
ApplicationException root_ex = errored_roots.value(n_r);
skipFeedUpdateWithError(n_r, n_f, root_ex);
}
else {
updateOneFeed(n_r, n_f, stated_messages.value(n_r).value(n_f->customId()), tagged_messages.value(n_r));
}
n_f->setLastUpdated(QDateTime::currentDateTimeUtc());
}
QtConcurrent::blockingMap(m_feeds, func);
}
finalizeUpdate();
}
void FeedDownloader::updateThreadedFeed(const FeedUpdate& fd) {
if (m_erroredAccounts.contains(fd.account)) {
// This feed is errored because its account errored when preparing feed update.
ApplicationException root_ex = m_erroredAccounts.value(fd.account);
skipFeedUpdateWithError(fd.account, fd.feed, root_ex);
}
else {
updateOneFeed(fd.account, fd.feed, fd.stated_messages, fd.tagged_messages);
}
fd.feed->setLastUpdated(QDateTime::currentDateTimeUtc());
}
void FeedDownloader::skipFeedUpdateWithError(ServiceRoot* acc, Feed* feed, const ApplicationException& ex) {
const FeedFetchException* fetch_ex = dynamic_cast<const FeedFetchException*>(&ex);
@ -178,8 +197,6 @@ void FeedDownloader::skipFeedUpdateWithError(ServiceRoot* acc, Feed* feed, const
}
acc->itemChanged({feed});
emit updateProgress(feed, ++m_feedsUpdated, m_feedsOriginalCount);
}
void FeedDownloader::stopRunningUpdate() {
@ -192,17 +209,19 @@ void FeedDownloader::updateOneFeed(ServiceRoot* acc,
Feed* feed,
const QHash<ServiceRoot::BagOfMessages, QStringList>& stated_messages,
const QHash<QString, QStringList>& tagged_messages) {
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloading new messages for feed ID '" << feed->customId() << "' URL: '"
<< feed->source() << "' title: '" << feed->title() << "' in thread: '" << QThread::currentThreadId() << "'.";
qlonglong thread_id = qlonglong(QThread::currentThreadId());
int acc_id = feed->getParentServiceRoot()->accountId();
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloading new messages for feed ID '" << feed->customId() << "' URL: '"
<< feed->source() << "' title: '" << feed->title() << "' in thread: '" << thread_id << "'.";
int acc_id = acc->accountId();
QElapsedTimer tmr;
tmr.start();
try {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
QList<Message> msgs = feed->getParentServiceRoot()->obtainNewMessages(feed, stated_messages, tagged_messages);
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloaded " << msgs.size() << " messages for feed ID '" << feed->customId()

View File

@ -13,7 +13,6 @@
#include "services/abstract/feed.h"
class MessageFilter;
class QMutex;
// Represents results of batch feed updates.
class FeedDownloadResults {
@ -30,6 +29,13 @@ class FeedDownloadResults {
QList<QPair<Feed*, int>> m_updatedFeeds;
};
struct FeedUpdate {
Feed* feed = nullptr;
ServiceRoot* account = nullptr;
QHash<ServiceRoot::BagOfMessages, QStringList> stated_messages;
QHash<QString, QStringList> tagged_messages;
};
// This class offers means to "update" feeds and "special" categories.
// NOTE: This class is used within separate thread.
class FeedDownloader : public QObject {
@ -62,11 +68,13 @@ class FeedDownloader : public QObject {
void finalizeUpdate();
void removeDuplicateMessages(QList<Message>& messages);
void updateThreadedFeed(const FeedUpdate& fd);
private:
bool m_isCacheSynchronizationRunning;
bool m_stopCacheSynchronization;
QList<Feed*> m_feeds = {};
QMutex* m_mutex;
QHash<ServiceRoot*, ApplicationException> m_erroredAccounts;
QList<FeedUpdate> m_feeds = {};
FeedDownloadResults m_results;
int m_feedsUpdated;
int m_feedsOriginalCount;

View File

@ -197,8 +197,10 @@ void Feed::appendMessageFilter(MessageFilter* filter) {
void Feed::updateCounts(bool including_total_count) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
int account_id = getParentServiceRoot()->accountId();
if (including_total_count) {

View File

@ -26,9 +26,9 @@ QList<Message> ImportantNode::undeletedMessages() const {
void ImportantNode::updateCounts(bool including_total_count) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
int account_id = getParentServiceRoot()->accountId();
if (including_total_count) {

View File

@ -2,10 +2,10 @@
#include "services/abstract/label.h"
#include "gui/dialogs/formaddeditlabel.h"
#include "miscellaneous/application.h"
#include "database/databasefactory.h"
#include "database/databasequeries.h"
#include "gui/dialogs/formaddeditlabel.h"
#include "miscellaneous/application.h"
#include "services/abstract/cacheforserviceroot.h"
#include "services/abstract/labelsnode.h"
#include "services/abstract/serviceroot.h"
@ -77,9 +77,9 @@ bool Label::deleteViaGui() {
void Label::updateCounts(bool including_total_count) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
int account_id = getParentServiceRoot()->accountId();
if (including_total_count) {
@ -111,27 +111,25 @@ QIcon Label::generateIcon(const QColor& color) {
void Label::assignToMessage(const Message& msg) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
if (getParentServiceRoot()->onBeforeLabelMessageAssignmentChanged({ this }, { msg }, true)) {
if (getParentServiceRoot()->onBeforeLabelMessageAssignmentChanged({this}, {msg}, true)) {
DatabaseQueries::assignLabelToMessage(database, this, msg);
getParentServiceRoot()->onAfterLabelMessageAssignmentChanged({ this }, { msg }, true);
getParentServiceRoot()->onAfterLabelMessageAssignmentChanged({this}, {msg}, true);
}
}
void Label::deassignFromMessage(const Message& msg) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
if (getParentServiceRoot()->onBeforeLabelMessageAssignmentChanged({ this }, { msg }, false)) {
if (getParentServiceRoot()->onBeforeLabelMessageAssignmentChanged({this}, {msg}, false)) {
DatabaseQueries::deassignLabelFromMessage(database, this, msg);
getParentServiceRoot()->onAfterLabelMessageAssignmentChanged({ this }, { msg }, false);
getParentServiceRoot()->onAfterLabelMessageAssignmentChanged({this}, {msg}, false);
}
}

View File

@ -11,8 +11,7 @@
#include <QThread>
RecycleBin::RecycleBin(RootItem* parent_item) : RootItem(parent_item), m_totalCount(0),
m_unreadCount(0) {
RecycleBin::RecycleBin(RootItem* parent_item) : RootItem(parent_item), m_totalCount(0), m_unreadCount(0) {
setKind(RootItem::Kind::Bin);
setId(ID_RECYCLE_BIN);
setIcon(qApp->icons()->fromTheme(QSL("user-trash")));
@ -34,9 +33,9 @@ int RecycleBin::countOfAllMessages() const {
void RecycleBin::updateCounts(bool update_total_count) {
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
m_unreadCount = DatabaseQueries::getMessageCountsForBin(database, getParentServiceRoot()->accountId(), false);
@ -47,12 +46,9 @@ void RecycleBin::updateCounts(bool update_total_count) {
QList<QAction*> RecycleBin::contextMenuFeedsList() {
if (m_contextMenu.isEmpty()) {
QAction* restore_action = new QAction(qApp->icons()->fromTheme(QSL("view-refresh")),
tr("Restore recycle bin"),
this);
QAction* empty_action = new QAction(qApp->icons()->fromTheme(QSL("edit-clear")),
tr("Empty recycle bin"),
this);
QAction* restore_action =
new QAction(qApp->icons()->fromTheme(QSL("view-refresh")), tr("Restore recycle bin"), this);
QAction* empty_action = new QAction(qApp->icons()->fromTheme(QSL("edit-clear")), tr("Empty recycle bin"), this);
connect(restore_action, &QAction::triggered, this, &RecycleBin::restore);
connect(empty_action, &QAction::triggered, this, &RecycleBin::empty);
@ -99,7 +95,8 @@ bool RecycleBin::cleanMessages(bool clear_only_read) {
updateCounts(true);
parent_root->itemChanged(QList<RootItem*>() << this);
parent_root->requestReloadMessageList(true);
return true;;
return true;
;
}
else {
return false;

View File

@ -959,8 +959,9 @@ QPair<int, int> ServiceRoot::updateMessages(QList<Message>& messages, Feed* feed
qDebugNN << LOGSEC_CORE << "Updating messages in DB. Main thread:" << QUOTE_W_SPACE_DOT(is_main_thread);
bool ok = false;
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd"));
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
updated_messages = DatabaseQueries::updateMessages(database, messages, feed, force_update, &ok);

View File

@ -26,9 +26,9 @@ void UnreadNode::updateCounts(bool including_total_count) {
Q_UNUSED(including_total_count)
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database = is_main_thread ?
qApp->database()->driver()->connection(metaObject()->className()) :
qApp->database()->driver()->connection(QSL("feed_upd"));
qlonglong thread_id = qlonglong(QThread::currentThreadId());
QSqlDatabase database = is_main_thread ? qApp->database()->driver()->connection(metaObject()->className())
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
int account_id = getParentServiceRoot()->accountId();
m_totalCount = m_unreadCount = DatabaseQueries::getUnreadMessageCounts(database, account_id);